This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new a334ee4 KYLIN-3442 Fact distinct columns in Spark
a334ee4 is described below
commit a334ee4ff1f6609ab15bd7d4744206750e77ec21
Author: chao long <[email protected]>
AuthorDate: Fri Aug 24 15:07:17 2018 +0800
KYLIN-3442 Fact distinct columns in Spark
---
engine-spark/pom.xml | 41 +
.../kylin/engine/spark/MultipleOutputsRDD.scala | 114 +++
.../engine/spark/SparkBatchCubingJobBuilder2.java | 34 +-
.../kylin/engine/spark/SparkCubingByLayer.java | 39 +-
.../apache/kylin/engine/spark/SparkExecutable.java | 6 +-
.../kylin/engine/spark/SparkFactDistinct.java | 866 +++++++++++++++++++++
.../org/apache/kylin/engine/spark/SparkUtil.java | 45 +-
kylin-it/pom.xml | 3 -
pom.xml | 21 +
9 files changed, 1123 insertions(+), 46 deletions(-)
diff --git a/engine-spark/pom.xml b/engine-spark/pom.xml
index 700aeb5..ceb9337 100644
--- a/engine-spark/pom.xml
+++ b/engine-spark/pom.xml
@@ -100,6 +100,47 @@
<version>0.9.10</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ </dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>add-source</goal>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>scala-test-compile</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
diff --git
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/MultipleOutputsRDD.scala
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/MultipleOutputsRDD.scala
new file mode 100644
index 0000000..cb5458d
--- /dev/null
+++
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/MultipleOutputsRDD.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark
+
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale}
+
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.io.{DataInputBuffer, Writable}
+import org.apache.hadoop.mapred.RawKeyValueIterator
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.counters.GenericCounter
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.DummyReporter
+import org.apache.hadoop.mapreduce.task.{ReduceContextImpl,
TaskAttemptContextImpl}
+import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, TaskAttemptID,
TaskType}
+import org.apache.hadoop.util.Progress
+import org.apache.spark._
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+
+import scala.reflect.ClassTag
+
+class MultipleOutputsRDD[K, V](self: RDD[(String, (K, V, String))])
+ (implicit kt: ClassTag[K], vt: ClassTag[V])
extends Serializable {
+
+ def saveAsNewAPIHadoopDatasetWithMultipleOutputs(conf: Configuration) {
+ val hadoopConf = conf
+ val job = NewAPIHadoopJob.getInstance(hadoopConf)
+ val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
+ val jobtrackerID = formatter.format(new Date())
+ val stageId = self.id
+ val jobConfiguration = job.getConfiguration
+ val wrappedConf = new SerializableWritable(jobConfiguration)
+ val outfmt = job.getOutputFormatClass
+ val jobFormat = outfmt.newInstance
+
+ if (conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
+ jobFormat.checkOutputSpecs(job)
+ }
+
+ val writeShard = (context: TaskContext, itr: Iterator[(String, (K, V,
String))]) => {
+ val config = wrappedConf.value
+
+ val attemptId = new TaskAttemptID(jobtrackerID, stageId,
TaskType.REDUCE, context.partitionId,
+ context.attemptNumber)
+ val hadoopContext = new TaskAttemptContextImpl(config, attemptId)
+ val format = outfmt.newInstance
+
+ format match {
+ case c: Configurable => c.setConf(wrappedConf.value)
+ case _ => ()
+ }
+
+ val committer = format.getOutputCommitter(hadoopContext)
+ committer.setupTask(hadoopContext)
+
+ val recordWriter =
format.getRecordWriter(hadoopContext).asInstanceOf[RecordWriter[K, V]]
+
+ val taskInputOutputContext = new ReduceContextImpl(wrappedConf.value,
attemptId, new InputIterator(itr), new GenericCounter, new GenericCounter,
+ recordWriter, committer, new DummyReporter, null, kt.runtimeClass,
vt.runtimeClass)
+
+ // use hadoop MultipleOutputs
+ val writer = new MultipleOutputs(taskInputOutputContext)
+
+ try {
+ while (itr.hasNext) {
+ val pair = itr.next()
+ writer.write(pair._1, pair._2._1, pair._2._2, pair._2._3)
+ }
+ } finally {
+ writer.close()
+ }
+ committer.commitTask(hadoopContext)
+ 1
+ }: Int
+
+ val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.MAP,
0, 0)
+ val jobTaskContext = new TaskAttemptContextImpl(wrappedConf.value,
jobAttemptId)
+ val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
+ jobCommitter.setupJob(jobTaskContext)
+ self.context.runJob(self, writeShard)
+ jobCommitter.commitJob(jobTaskContext)
+ }
+
+ class InputIterator(itr: Iterator[_]) extends RawKeyValueIterator {
+ def getKey: DataInputBuffer = null
+ def getValue: DataInputBuffer = null
+ def getProgress: Progress = null
+ def next = itr.hasNext
+ def close() { }
+ }
+}
+
+object MultipleOutputsRDD {
+ def rddToMultipleOutputsRDD[K, V](rdd: JavaPairRDD[String, (Writable,
Writable, String)]) = {
+ new MultipleOutputsRDD(rdd)
+ }
+}
diff --git
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index e545166..5fd7213 100644
---
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -18,6 +18,9 @@
package org.apache.kylin.engine.spark;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.util.StringUtil;
@@ -32,9 +35,6 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.Map;
-
/**
*/
public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
@@ -61,7 +61,7 @@ public class SparkBatchCubingJobBuilder2 extends
JobBuilderSupport {
inputSide.addStepPhase1_CreateFlatTable(result);
// Phase 2: Build Dictionary
- result.addTask(createFactDistinctColumnsStep(jobId));
+ result.addTask(createFactDistinctColumnsSparkStep(jobId));
if (isEnableUHCDictStep()) {
result.addTask(createBuildUHCDictStep(jobId));
@@ -87,6 +87,32 @@ public class SparkBatchCubingJobBuilder2 extends
JobBuilderSupport {
return result;
}
+ public SparkExecutable createFactDistinctColumnsSparkStep(String jobId) {
+ final SparkExecutable sparkExecutable = new SparkExecutable();
+ final IJoinedFlatTableDesc flatTableDesc =
EngineFactory.getJoinedFlatTableDesc(seg);
+ final String tablePath = JoinedFlatTable.getTableDir(flatTableDesc,
getJobWorkingDir(jobId));
+
+ sparkExecutable.setClassName(SparkFactDistinct.class.getName());
+ sparkExecutable.setParam(SparkFactDistinct.OPTION_CUBE_NAME.getOpt(),
seg.getRealization().getName());
+ sparkExecutable.setParam(SparkFactDistinct.OPTION_META_URL.getOpt(),
getSegmentMetadataUrl(seg.getConfig(), jobId));
+
sparkExecutable.setParam(SparkFactDistinct.OPTION_INPUT_TABLE.getOpt(),
seg.getConfig().getHiveDatabaseForIntermediateTable() + "." +
flatTableDesc.getTableName());
+ sparkExecutable.setParam(SparkFactDistinct.OPTION_INPUT_PATH.getOpt(),
tablePath);
+
sparkExecutable.setParam(SparkFactDistinct.OPTION_OUTPUT_PATH.getOpt(),
getFactDistinctColumnsPath(jobId));
+ sparkExecutable.setParam(SparkFactDistinct.OPTION_SEGMENT_ID.getOpt(),
seg.getUuid());
+
sparkExecutable.setParam(SparkFactDistinct.OPTION_STATS_SAMPLING_PERCENT.getOpt(),
String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
+
+ sparkExecutable.setJobId(jobId);
+
sparkExecutable.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
+ sparkExecutable.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," +
CubingJob.SOURCE_SIZE_BYTES);
+
+ StringBuilder jars = new StringBuilder();
+
+ StringUtil.appendWithSeparator(jars,
seg.getConfig().getSparkAdditionalJars());
+
+ sparkExecutable.setJars(jars.toString());
+
+ return sparkExecutable;
+ }
protected void addLayerCubingSteps(final CubingJob result, final String
jobId, final String cuboidRootPath) {
final SparkExecutable sparkExecutable = new SparkExecutable();
diff --git
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index cb3af31..9f4ae34 100644
---
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -27,13 +27,10 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.cube.CubeDescManager;
@@ -68,9 +65,6 @@ import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparkSession;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -169,37 +163,8 @@ public class SparkCubingByLayer extends
AbstractApplication implements Serializa
boolean isSequenceFile =
JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
- final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD;
-
- if (isSequenceFile) {
- encodedBaseRDD = sc.sequenceFile(inputPath, BytesWritable.class,
Text.class).values()
- .map(new Function<Text, String[]>() {
- @Override
- public String[] call(Text text) throws Exception {
- String s = Bytes.toString(text.getBytes(), 0,
text.getLength());
- return
s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
- }
- }).mapToPair(new EncodeBaseCuboid(cubeName, segmentId,
metaUrl, sConf));
- } else {
- SparkSession sparkSession =
SparkSession.builder().config(conf).enableHiveSupport().getOrCreate();
- final Dataset intermediateTable = sparkSession.table(hiveTable);
- encodedBaseRDD = intermediateTable.javaRDD().map(new Function<Row,
String[]>() {
- @Override
- public String[] call(Row row) throws Exception {
- String[] result = new String[row.size()];
- for (int i = 0; i < row.size(); i++) {
- final Object o = row.get(i);
- if (o != null) {
- result[i] = o.toString();
- } else {
- result[i] = null;
- }
- }
- return result;
- }
- }).mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl,
sConf));
-
- }
+ final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD =
SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable)
+ .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl,
sConf));
Long totalCount = 0L;
if (envConfig.isSparkSanityCheckEnabled()) {
diff --git
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 637382c..6122397 100644
---
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -375,7 +375,11 @@ public class SparkExecutable extends AbstractExecutable {
Set<String> dumpList = new LinkedHashSet<>();
dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance()));
dumpList.addAll(segment.getDictionaryPaths());
- dumpList.add(segment.getStatisticsResourcePath());
+ ResourceStore rs = ResourceStore.getStore(segment.getConfig());
+ if (rs.exists(segment.getStatisticsResourcePath())) {
+ // cube statistics is not available for new segment
+ dumpList.add(segment.getStatisticsResourcePath());
+ }
JobRelatedMetaUtil.dumpAndUploadKylinPropsAndMetadata(dumpList,
(KylinConfigExt) segment.getConfig(),
this.getParam(SparkCubingByLayer.OPTION_META_URL.getOpt()));
}
diff --git
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
new file mode 100644
index 0000000..61e2e53
--- /dev/null
+++
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
@@ -0,0 +1,866 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayPrimitiveWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinVersion;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
+import org.apache.kylin.cube.cuboid.CuboidUtil;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.IDictionaryBuilder;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
+import
org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper.DictColDeduper;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducerMapping;
+import org.apache.kylin.engine.mr.steps.SelfDefineSortableKey;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.measure.hllc.RegisterType;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.spark.Partitioner;
+import org.apache.spark.SparkConf;
+import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.util.LongAccumulator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
+import scala.Tuple2;
+import scala.Tuple3;
+
+public class SparkFactDistinct extends AbstractApplication implements
Serializable {
+
+ protected static final Logger logger =
LoggerFactory.getLogger(SparkFactDistinct.class);
+
+ public static final Option OPTION_CUBE_NAME =
OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+ .isRequired(true).withDescription("Cube
Name").create(BatchConstants.ARG_CUBE_NAME);
+ public static final Option OPTION_META_URL =
OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+ .withDescription("HDFS metadata url").create("metaUrl");
+ public static final Option OPTION_OUTPUT_PATH =
OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+ .isRequired(true).withDescription("Cube output
path").create(BatchConstants.ARG_OUTPUT);
+ public static final Option OPTION_SEGMENT_ID =
OptionBuilder.withArgName("segmentId").hasArg().isRequired(true)
+ .withDescription("Cube Segment Id").create("segmentId");
+ public static final Option OPTION_STATS_SAMPLING_PERCENT = OptionBuilder
+
.withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(true)
+ .withDescription("Statistics sampling
percent").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT);
+ public static final Option OPTION_INPUT_TABLE =
OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true)
+ .withDescription("Hive Intermediate Table").create("hiveTable");
+ public static final Option OPTION_INPUT_PATH =
OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+ .isRequired(true).withDescription("Hive Intermediate Table
PATH").create(BatchConstants.ARG_INPUT);
+
+ private Options options;
+
+ public SparkFactDistinct() {
+ options = new Options();
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_META_URL);
+ options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_INPUT_TABLE);
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_STATS_SAMPLING_PERCENT);
+ }
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ @Override
+ protected void execute(OptionsHelper optionsHelper) throws Exception {
+ String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+ String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+ String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+ String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
+ String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+ String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+ int samplingPercent =
Integer.parseInt(optionsHelper.getOptionValue(OPTION_STATS_SAMPLING_PERCENT));
+
+ Class[] kryoClassArray = new Class[] {
Class.forName("scala.reflect.ClassTag$$anon$1"),
Class.forName("org.apache.kylin.engine.mr.steps.SelfDefineSortableKey") };
+
+ SparkConf conf = new SparkConf().setAppName("Fact distinct columns
for:" + cubeName + " segment " + segmentId);
+ //serialization conf
+ conf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
+ conf.set("spark.kryo.registrator",
"org.apache.kylin.engine.spark.KylinKryoRegistrator");
+ conf.set("spark.kryo.registrationRequired",
"true").registerKryoClasses(kryoClassArray);
+
+ KylinSparkJobListener jobListener = new KylinSparkJobListener();
+ JavaSparkContext sc = new JavaSparkContext(conf);
+ sc.sc().addSparkListener(jobListener);
+ HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+
+ final SerializableConfiguration sConf = new
SerializableConfiguration(sc.hadoopConfiguration());
+ KylinConfig envConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+ final CubeInstance cubeInstance =
CubeManager.getInstance(envConfig).getCube(cubeName);
+
+ final Job job = Job.getInstance(sConf.get());
+
+ final FactDistinctColumnsReducerMapping reducerMapping = new
FactDistinctColumnsReducerMapping(cubeInstance);
+
+ logger.info("RDD Output path: {}", outputPath);
+ logger.info("getTotalReducerNum: {}",
reducerMapping.getTotalReducerNum());
+ logger.info("getCuboidRowCounterReducerNum: {}",
reducerMapping.getCuboidRowCounterReducerNum());
+
+ boolean isSequenceFile =
JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
+
+ // calculate source record bytes size
+ final LongAccumulator bytesWritten = sc.sc().longAccumulator();
+
+ final JavaRDD<String[]> recordRDD =
SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable);
+
+ JavaPairRDD<SelfDefineSortableKey, Text> flatOutputRDD =
recordRDD.mapPartitionsToPair(new FlatOutputFucntion(cubeName, segmentId,
metaUrl, sConf, samplingPercent, bytesWritten));
+
+ JavaPairRDD<SelfDefineSortableKey, Iterable<Text>> aggredRDD =
flatOutputRDD.groupByKey(new FactDistinctPartitioner(cubeName, metaUrl, sConf,
reducerMapping.getTotalReducerNum()));
+
+ JavaPairRDD<String, Tuple3<Writable, Writable, String>> outputRDD =
aggredRDD.mapPartitionsToPair(new MultiOutputFunction(cubeName, metaUrl, sConf,
samplingPercent));
+
+ // make each reducer output to respective dir
+ MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN,
SequenceFileOutputFormat.class, NullWritable.class, Text.class);
+ MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT,
SequenceFileOutputFormat.class, NullWritable.class,
ArrayPrimitiveWritable.class);
+ MultipleOutputs.addNamedOutput(job,
BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class,
LongWritable.class, BytesWritable.class);
+ MultipleOutputs.addNamedOutput(job,
BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class,
NullWritable.class, LongWritable.class);
+
+ FileOutputFormat.setOutputPath(job, new Path(outputPath));
+
+ // prevent to create zero-sized default output
+ LazyOutputFormat.setOutputFormatClass(job,
SequenceFileOutputFormat.class);
+
+ MultipleOutputsRDD multipleOutputsRDD =
MultipleOutputsRDD.rddToMultipleOutputsRDD(outputRDD);
+
+
multipleOutputsRDD.saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration());
+
+ // only work for client mode, not work when
spark.submit.deployMode=cluster
+ logger.info("Map input records={}", recordRDD.count());
+ logger.info("HDFS Read: {} HDFS Write", bytesWritten.value());
+
+ HadoopUtil.deleteHDFSMeta(metaUrl);
+ }
+
+ static class FlatOutputFucntion implements
PairFlatMapFunction<Iterator<String[]>, SelfDefineSortableKey, Text> {
+ private volatile transient boolean initialized = false;
+ private String cubeName;
+ private String segmentId;
+ private String metaUrl;
+ private SerializableConfiguration conf;
+ private int samplingPercent;
+ private CuboidStatCalculator cuboidStatCalculator;
+ private FactDistinctColumnsReducerMapping reducerMapping;
+ private List<TblColRef> allCols;
+ private int[] columnIndex;
+ private DictColDeduper dictColDeduper;
+ private Map<Integer, DimensionRangeInfo> dimensionRangeInfoMap;
+ private ByteBuffer tmpbuf;
+ private LongAccumulator bytesWritten;
+
+ public FlatOutputFucntion(String cubeName, String segmentId, String
metaurl, SerializableConfiguration conf, int samplingPercent, LongAccumulator
bytesWritten) {
+ this.cubeName = cubeName;
+ this.segmentId = segmentId;
+ this.metaUrl = metaurl;
+ this.conf = conf;
+ this.samplingPercent = samplingPercent;
+ this.dimensionRangeInfoMap = Maps.newHashMap();
+ this.bytesWritten = bytesWritten;
+ }
+
+ private void init() {
+ KylinConfig kConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
+ CubeInstance cubeInstance =
CubeManager.getInstance(kConfig).getCube(cubeName);
+ CubeDesc cubeDesc = cubeInstance.getDescriptor();
+ CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+ CubeJoinedFlatTableEnrich intermediateTableDesc = new
CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment),
cubeDesc);
+
+ reducerMapping = new
FactDistinctColumnsReducerMapping(cubeInstance);
+ tmpbuf = ByteBuffer.allocate(4096);
+
+ int[] rokeyColumnIndexes =
intermediateTableDesc.getRowKeyColumnIndexes();
+
+ Long[] cuboidIds = getCuboidIds(cubeSegment);
+
+ Integer[][] cuboidsBitSet = CuboidUtil.getCuboidBitSet(cuboidIds,
rokeyColumnIndexes.length);
+
+ boolean isNewAlgorithm = isUsePutRowKeyToHllNewAlgorithm(cubeDesc);
+
+ HLLCounter[] cuboidsHLL = getInitCuboidsHLL(cuboidIds.length,
cubeDesc.getConfig().getCubeStatsHLLPrecision());
+
+ cuboidStatCalculator = new
CuboidStatCalculator(rokeyColumnIndexes, cuboidIds, cuboidsBitSet,
isNewAlgorithm, cuboidsHLL);
+ allCols = reducerMapping.getAllDimDictCols();
+
+ initDictColDeduper(cubeDesc);
+ initColumnIndex(intermediateTableDesc);
+
+ initialized = true;
+ }
+
+ @Override
+ public Iterator<Tuple2<SelfDefineSortableKey, Text>>
call(Iterator<String[]> rowIterator) throws Exception {
+ if (initialized == false) {
+ synchronized (SparkFactDistinct.class) {
+ if (initialized == false) {
+ init();
+ }
+ }
+ }
+
+ List<String[]> rows = Lists.newArrayList(rowIterator);
+ List<Tuple2<SelfDefineSortableKey, Text>> result =
Lists.newArrayList();
+
+ int rowCount = 0;
+
+ for (String[] row : rows) {
+ bytesWritten.add(countSizeInBytes(row));
+
+ for (int i = 0; i < allCols.size(); i++) {
+ String fieldValue = row[columnIndex[i]];
+ if (fieldValue == null)
+ continue;
+
+ final DataType type = allCols.get(i).getType();
+
+ //for dic column, de dup before write value; for dim not
dic column, hold util doCleanup()
+ if (dictColDeduper.isDictCol(i)) {
+ if (dictColDeduper.add(i, fieldValue)) {
+ addFieldValue(type, i, fieldValue, result);
+ }
+ } else {
+ DimensionRangeInfo old = dimensionRangeInfoMap.get(i);
+ if (old == null) {
+ old = new DimensionRangeInfo(fieldValue,
fieldValue);
+ dimensionRangeInfoMap.put(i, old);
+ } else {
+ old.setMax(type.getOrder().max(old.getMax(),
fieldValue));
+ old.setMin(type.getOrder().min(old.getMin(),
fieldValue));
+ }
+ }
+ }
+
+ if (rowCount % 100 < samplingPercent) {
+ cuboidStatCalculator.putRow(row);
+ }
+
+ if (rowCount % 100 == 0) {
+ dictColDeduper.resetIfShortOfMem();
+ }
+
+ rowCount++;
+ }
+
+ ByteBuffer hllBuf =
ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+
+ // output each cuboid's hll to reducer, key is 0 - cuboidId
+ Long[] cuboidIds = cuboidStatCalculator.getCuboidIds();
+ HLLCounter[] cuboidsHLL = cuboidStatCalculator.getHLLCounters();
+ HLLCounter hll;
+
+ for (int i = 0; i < cuboidIds.length; i++) {
+ hll = cuboidsHLL[i];
+ tmpbuf.clear();
+ tmpbuf.put((byte)
FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER); // one byte
+ tmpbuf.putLong(cuboidIds[i]);
+ Text outputKey = new Text();
+ Text outputValue = new Text();
+ SelfDefineSortableKey sortableKey = new
SelfDefineSortableKey();
+
+ outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+ hllBuf.clear();
+ hll.writeRegisters(hllBuf);
+ outputValue.set(hllBuf.array(), 0, hllBuf.position());
+
+ sortableKey.init(outputKey, (byte) 0);
+
+ result.add(new Tuple2<SelfDefineSortableKey,
Text>(sortableKey, outputValue));
+ }
+
+ for (Integer colIndex : dimensionRangeInfoMap.keySet()) {
+ DimensionRangeInfo rangeInfo =
dimensionRangeInfoMap.get(colIndex);
+ DataType dataType = allCols.get(colIndex).getType();
+ addFieldValue(dataType, colIndex, rangeInfo.getMin(), result);
+ addFieldValue(dataType, colIndex, rangeInfo.getMax(), result);
+ }
+
+ return result.iterator();
+ }
+
+ private boolean isUsePutRowKeyToHllNewAlgorithm(CubeDesc cubeDesc) {
+ boolean isUsePutRowKeyToHllNewAlgorithm;
+ if (KylinVersion.isBefore200(cubeDesc.getVersion())) {
+ isUsePutRowKeyToHllNewAlgorithm = false;
+ logger.info("Found KylinVersion: {}. Use old algorithm for
cuboid sampling.", cubeDesc.getVersion());
+ } else {
+ isUsePutRowKeyToHllNewAlgorithm = true;
+ logger.info(
+ "Found KylinVersion: {}. Use new algorithm for cuboid
sampling. About the details of the new algorithm, please refer to KYLIN-2518",
+ cubeDesc.getVersion());
+ }
+ return isUsePutRowKeyToHllNewAlgorithm;
+ }
+
+ private Long[] getCuboidIds(CubeSegment cubeSegment) {
+ Set<Long> cuboidIdSet =
Sets.newHashSet(cubeSegment.getCuboidScheduler().getAllCuboidIds());
+ if
(StatisticsDecisionUtil.isAbleToOptimizeCubingPlan(cubeSegment)) {
+ // For cube planner, for every prebuilt cuboid, its related
row count stats should be calculated
+ // If the precondition for trigger cube planner phase one is
satisfied, we need to calculate row count stats for mandatory cuboids.
+
cuboidIdSet.addAll(cubeSegment.getCubeDesc().getMandatoryCuboids());
+ }
+
+ return cuboidIdSet.toArray(new Long[cuboidIdSet.size()]);
+ }
+
+ private HLLCounter[] getInitCuboidsHLL(int cuboidSize, int
hllPrecision) {
+ HLLCounter[] cuboidsHLL = new HLLCounter[cuboidSize];
+ for (int i = 0; i < cuboidSize; i++) {
+ cuboidsHLL[i] = new HLLCounter(hllPrecision,
RegisterType.DENSE);
+ }
+ return cuboidsHLL;
+ }
+
+ private void initDictColDeduper(CubeDesc cubeDesc) {
+ // setup dict col deduper
+ dictColDeduper = new DictColDeduper();
+ Set<TblColRef> dictCols =
cubeDesc.getAllColumnsNeedDictionaryBuilt();
+ for (int i = 0; i < allCols.size(); i++) {
+ if (dictCols.contains(allCols.get(i)))
+ dictColDeduper.setIsDictCol(i);
+ }
+ }
+
+ private void initColumnIndex(CubeJoinedFlatTableEnrich
intermediateTableDesc) {
+ columnIndex = new int[allCols.size()];
+ for (int i = 0; i < allCols.size(); i++) {
+ TblColRef colRef = allCols.get(i);
+ int columnIndexOnFlatTbl =
intermediateTableDesc.getColumnIndex(colRef);
+ columnIndex[i] = columnIndexOnFlatTbl;
+ }
+ }
+
+ private void addFieldValue(DataType type, Integer colIndex, String
value,
+ List<Tuple2<SelfDefineSortableKey, Text>> result) {
+ int reducerIndex = reducerMapping.getReducerIdForCol(colIndex,
value);
+ tmpbuf.clear();
+ byte[] valueBytes = Bytes.toBytes(value);
+ int size = valueBytes.length + 1;
+ if (size >= tmpbuf.capacity()) {
+ tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(),
size));
+ }
+ tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
+ tmpbuf.put(valueBytes);
+
+ Text outputKey = new Text();
+ SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
+
+ outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+ sortableKey.init(outputKey, type);
+
+ result.add(new Tuple2<SelfDefineSortableKey, Text>(sortableKey,
new Text()));
+
+ // log a few rows for troubleshooting
+ if (result.size() < 10) {
+ logger.info("Sample output: " + allCols.get(colIndex) + " '" +
value + "' => reducer " + reducerIndex);
+ }
+ }
+
+ private int countNewSize(int oldSize, int dataSize) {
+ int newSize = oldSize * 2;
+ while (newSize < dataSize) {
+ newSize = newSize * 2;
+ }
+ return newSize;
+ }
+
+ private int countSizeInBytes(String[] row) {
+ int size = 0;
+ for (String s : row) {
+ size += s == null ? 1 : StringUtil.utf8Length(s);
+ size++; // delimiter
+ }
+ return size;
+ }
+ }
+
+ static class CuboidStatCalculator {
+ private final int nRowKey;
+ private final int[] rowkeyColIndex;
+ private final Long[] cuboidIds;
+ private final Integer[][] cuboidsBitSet;
+ private volatile HLLCounter[] cuboidsHLL;
+
+ //about details of the new algorithm, please see KYLIN-2518
+ private final boolean isNewAlgorithm;
+ private final HashFunction hf;
+ private long[] rowHashCodesLong;
+
+ public CuboidStatCalculator(int[] rowkeyColIndex, Long[] cuboidIds,
Integer[][] cuboidsBitSet,
+ boolean isUsePutRowKeyToHllNewAlgorithm, HLLCounter[]
cuboidsHLL) {
+ this.nRowKey = rowkeyColIndex.length;
+ this.rowkeyColIndex = rowkeyColIndex;
+ this.cuboidIds = cuboidIds;
+ this.cuboidsBitSet = cuboidsBitSet;
+ this.isNewAlgorithm = isUsePutRowKeyToHllNewAlgorithm;
+ if (!isNewAlgorithm) {
+ this.hf = Hashing.murmur3_32();
+ } else {
+ rowHashCodesLong = new long[nRowKey];
+ this.hf = Hashing.murmur3_128();
+ }
+ this.cuboidsHLL = cuboidsHLL;
+ }
+
+ public void putRow(final String[] row) {
+ String[] copyRow = Arrays.copyOf(row, row.length);
+
+ if (isNewAlgorithm) {
+ putRowKeyToHLLNew(copyRow);
+ } else {
+ putRowKeyToHLLOld(copyRow);
+ }
+ }
+
+ private void putRowKeyToHLLOld(String[] row) {
+ //generate hash for each row key column
+ byte[][] rowHashCodes = new byte[nRowKey][];
+ for (int i = 0; i < nRowKey; i++) {
+ Hasher hc = hf.newHasher();
+ String colValue = row[rowkeyColIndex[i]];
+ if (colValue != null) {
+ rowHashCodes[i] = hc.putString(colValue).hash().asBytes();
+ } else {
+ rowHashCodes[i] = hc.putInt(0).hash().asBytes();
+ }
+ }
+
+ // user the row key column hash to get a consolidated hash for
each cuboid
+ for (int i = 0, n = cuboidsBitSet.length; i < n; i++) {
+ Hasher hc = hf.newHasher();
+ for (int position = 0; position < cuboidsBitSet[i].length;
position++) {
+ hc.putBytes(rowHashCodes[cuboidsBitSet[i][position]]);
+ }
+
+ cuboidsHLL[i].add(hc.hash().asBytes());
+ }
+ }
+
+ private void putRowKeyToHLLNew(String[] row) {
+ //generate hash for each row key column
+ for (int i = 0; i < nRowKey; i++) {
+ Hasher hc = hf.newHasher();
+ String colValue = row[rowkeyColIndex[i]];
+ if (colValue == null)
+ colValue = "0";
+ byte[] bytes = hc.putString(colValue).hash().asBytes();
+ rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column
ordinal to the hash value to distinguish between (a,b) and (b,a)
+ }
+
+ // user the row key column hash to get a consolidated hash for
each cuboid
+ for (int i = 0, n = cuboidsBitSet.length; i < n; i++) {
+ long value = 0;
+ for (int position = 0; position < cuboidsBitSet[i].length;
position++) {
+ value += rowHashCodesLong[cuboidsBitSet[i][position]];
+ }
+ cuboidsHLL[i].addHashDirectly(value);
+ }
+ }
+
+ public HLLCounter[] getHLLCounters() {
+ return cuboidsHLL;
+ }
+
+ public Long[] getCuboidIds() {
+ return cuboidIds;
+ }
+ }
+
+ static class FactDistinctPartitioner extends Partitioner {
+ private volatile transient boolean initialized = false;
+ private String cubeName;
+ private String metaUrl;
+ private SerializableConfiguration conf;
+ private int totalReducerNum;
+ private FactDistinctColumnsReducerMapping reducerMapping;
+
+ public FactDistinctPartitioner(String cubeName, String metaUrl,
SerializableConfiguration conf, int totalReducerNum) {
+ this.cubeName = cubeName;
+ this.metaUrl = metaUrl;
+ this.conf = conf;
+ this.totalReducerNum = totalReducerNum;
+ }
+
+ private void init() {
+ KylinConfig kConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
+ CubeInstance cubeInstance =
CubeManager.getInstance(kConfig).getCube(cubeName);
+ reducerMapping = new
FactDistinctColumnsReducerMapping(cubeInstance);
+
+ initialized = true;
+ }
+
+ @Override
+ public int numPartitions() {
+ return totalReducerNum;
+ }
+
+ @Override
+ public int getPartition(Object o) {
+ if (initialized == false) {
+ synchronized (SparkFactDistinct.class) {
+ if (initialized == false) {
+ init();
+ }
+ }
+ }
+
+ SelfDefineSortableKey skey = (SelfDefineSortableKey) o;
+ Text key = skey.getText();
+ if (key.getBytes()[0] ==
FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER) {
+ Long cuboidId = Bytes.toLong(key.getBytes(), 1,
Bytes.SIZEOF_LONG);
+ return reducerMapping.getReducerIdForCuboidRowCount(cuboidId);
+ } else {
+ return BytesUtil.readUnsigned(key.getBytes(), 0, 1);
+ }
+ }
+ }
+
+ static class MultiOutputFunction implements
+ PairFlatMapFunction<Iterator<Tuple2<SelfDefineSortableKey,
Iterable<Text>>>, String, Tuple3<Writable, Writable, String>> {
+ private volatile transient boolean initialized = false;
+ private String DICT_FILE_POSTFIX = ".rldict";
+ private String DIMENSION_COL_INFO_FILE_POSTFIX = ".dci";
+ private String cubeName;
+ private String metaUrl;
+ private SerializableConfiguration conf;
+ private int samplingPercent;
+ private FactDistinctColumnsReducerMapping reducerMapping;
+ private int taskId;
+ private boolean isStatistics = false;
+ private long baseCuboidId;
+ private List<Long> baseCuboidRowCountInMappers;
+ private Map<Long, HLLCounter> cuboidHLLMap;
+ private TblColRef col;
+ private boolean buildDictInReducer;
+ private IDictionaryBuilder builder;
+ private int rowCount = 0;
+ private long totalRowsBeforeMerge = 0;
+ private KylinConfig cubeConfig;
+ private CubeDesc cubeDesc;
+ private String maxValue = null;
+ private String minValue = null;
+ private List<Tuple2<String, Tuple3<Writable, Writable, String>>>
result;
+
+ public MultiOutputFunction(String cubeName, String metaurl,
SerializableConfiguration conf, int samplingPercent) {
+ this.cubeName = cubeName;
+ this.metaUrl = metaurl;
+ this.conf = conf;
+ this.samplingPercent = samplingPercent;
+ }
+
+ private void init() throws IOException {
+ taskId = TaskContext.getPartitionId();
+ KylinConfig kConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
+ CubeInstance cubeInstance =
CubeManager.getInstance(kConfig).getCube(cubeName);
+ cubeDesc = cubeInstance.getDescriptor();
+ cubeConfig = cubeInstance.getConfig();
+ reducerMapping = new
FactDistinctColumnsReducerMapping(cubeInstance);
+
+ result = Lists.newArrayList();
+
+ if (reducerMapping.isCuboidRowCounterReducer(taskId)) {
+ // hll
+ isStatistics = true;
+ baseCuboidId =
cubeInstance.getCuboidScheduler().getBaseCuboidId();
+ baseCuboidRowCountInMappers = Lists.newArrayList();
+ cuboidHLLMap = Maps.newHashMap();
+
+ logger.info("Partition " + taskId + " handling stats");
+ } else {
+ // normal col
+ col = reducerMapping.getColForReducer(taskId);
+ Preconditions.checkNotNull(col);
+
+ // local build dict
+ buildDictInReducer = kConfig.isBuildDictInReducerEnabled();
+ if (cubeDesc.getDictionaryBuilderClass(col) != null) { // only
works with default dictionary builder
+ buildDictInReducer = false;
+ }
+
+ if (reducerMapping.getReducerNumForDimCol(col) > 1) {
+ buildDictInReducer = false; // only works if this is the
only reducer of a dictionary column
+ }
+
+ if (buildDictInReducer) {
+ builder =
DictionaryGenerator.newDictionaryBuilder(col.getType());
+ builder.init(null, 0, null);
+ }
+ logger.info("Partition " + taskId + " handling column " + col
+ ", buildDictInReducer=" + buildDictInReducer);
+ }
+
+ initialized = true;
+ }
+
+ private void logAFewRows(String value) {
+ if (rowCount < 10) {
+ logger.info("Received value: " + value);
+ }
+ }
+
+ @Override
+ public Iterator<Tuple2<String, Tuple3<Writable, Writable, String>>>
call(
+ Iterator<Tuple2<SelfDefineSortableKey, Iterable<Text>>>
tuple2Iterator) throws Exception {
+ if (initialized == false) {
+ synchronized (SparkFactDistinct.class) {
+ if (initialized == false) {
+ init();
+ }
+ }
+ }
+
+ List<Tuple2<SelfDefineSortableKey, Iterable<Text>>> tuples =
Lists.newArrayList(tuple2Iterator);
+
+ for (Tuple2<SelfDefineSortableKey, Iterable<Text>> tuple : tuples)
{
+ Text key = tuple._1.getText();
+
+ if (isStatistics) {
+ // for hll
+ long cuboidId = Bytes.toLong(key.getBytes(), 1,
Bytes.SIZEOF_LONG);
+
+ for (Text value : tuple._2) {
+ HLLCounter hll = new
HLLCounter(cubeConfig.getCubeStatsHLLPrecision());
+ ByteBuffer bf = ByteBuffer.wrap(value.getBytes(), 0,
value.getLength());
+ hll.readRegisters(bf);
+
+ totalRowsBeforeMerge += hll.getCountEstimate();
+
+ if (cuboidId == baseCuboidId) {
+
baseCuboidRowCountInMappers.add(hll.getCountEstimate());
+ }
+
+ if (cuboidHLLMap.get(cuboidId) != null) {
+ cuboidHLLMap.get(cuboidId).merge(hll);
+ } else {
+ cuboidHLLMap.put(cuboidId, hll);
+ }
+ }
+
+ } else {
+ String value = Bytes.toString(key.getBytes(), 1,
key.getLength() - 1);
+ logAFewRows(value);
+ // if dimension col, compute max/min value
+ if
(cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+ if (minValue == null ||
col.getType().compare(minValue, value) > 0) {
+ minValue = value;
+ }
+ if (maxValue == null ||
col.getType().compare(maxValue, value) < 0) {
+ maxValue = value;
+ }
+ }
+
+ //if dict column
+ if
(cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) {
+ if (buildDictInReducer) {
+ builder.addValue(value);
+ } else {
+ byte[] keyBytes = Bytes.copy(key.getBytes(), 1,
key.getLength() - 1);
+ // output written to baseDir/colName/-r-00000 (etc)
+ String fileName = col.getIdentity() + "/";
+ result.add(new Tuple2<String, Tuple3<Writable,
Writable, String>>(
+ BatchConstants.CFG_OUTPUT_COLUMN, new
Tuple3<Writable, Writable, String>(
+ NullWritable.get(), new Text(keyBytes),
fileName)));
+ }
+ }
+ }
+
+ rowCount++;
+ }
+
+ if (isStatistics) {
+ //output the hll info;
+ List<Long> allCuboids = Lists.newArrayList();
+ allCuboids.addAll(cuboidHLLMap.keySet());
+ Collections.sort(allCuboids);
+
+ logMapperAndCuboidStatistics(allCuboids); // for human check
+ outputStatistics(allCuboids, result);
+ } else {
+ //dimension col
+ if
(cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+ outputDimRangeInfo(result);
+ }
+ // dic col
+ if (buildDictInReducer) {
+ Dictionary<String> dict = builder.build();
+ outputDict(col, dict, result);
+ }
+ }
+
+ return result.iterator();
+ }
+
+ private void logMapperAndCuboidStatistics(List<Long> allCuboids)
throws IOException {
+ logger.info("Cuboid number for task: " + taskId + "\t" +
allCuboids.size());
+ logger.info("Samping percentage: \t" + samplingPercent);
+ logger.info("The following statistics are collected based on
sampling data. ");
+ logger.info("Number of Mappers: " +
baseCuboidRowCountInMappers.size());
+
+ for (int i = 0; i < baseCuboidRowCountInMappers.size(); i++) {
+ if (baseCuboidRowCountInMappers.get(i) > 0) {
+ logger.info("Base Cuboid in Mapper " + i + " row count: \t
" + baseCuboidRowCountInMappers.get(i));
+ }
+ }
+
+ long grantTotal = 0;
+ for (long i : allCuboids) {
+ grantTotal += cuboidHLLMap.get(i).getCountEstimate();
+ logger.info("Cuboid " + i + " row count is: \t " +
cuboidHLLMap.get(i).getCountEstimate());
+ }
+
+ logger.info("Sum of row counts (before merge) is: \t " +
totalRowsBeforeMerge);
+ logger.info("After merge, the row count: \t " + grantTotal);
+ }
+
+ private void outputDimRangeInfo(List<Tuple2<String, Tuple3<Writable,
Writable, String>>> result) {
+ if (col != null && minValue != null) {
+ // output written to baseDir/colName/colName.dci-r-00000 (etc)
+ String dimRangeFileName = col.getIdentity() + "/" +
col.getName() + DIMENSION_COL_INFO_FILE_POSTFIX;
+
+ result.add(new Tuple2<String, Tuple3<Writable, Writable,
String>>(BatchConstants.CFG_OUTPUT_PARTITION,
+ new Tuple3<Writable, Writable,
String>(NullWritable.get(), new Text(minValue.getBytes()),
+ dimRangeFileName)));
+ result.add(new Tuple2<String, Tuple3<Writable, Writable,
String>>(BatchConstants.CFG_OUTPUT_PARTITION,
+ new Tuple3<Writable, Writable,
String>(NullWritable.get(), new Text(maxValue.getBytes()),
+ dimRangeFileName)));
+ logger.info("write dimension range info for col : " +
col.getName() + " minValue:" + minValue
+ + " maxValue:" + maxValue);
+ }
+ }
+
+ private void outputDict(TblColRef col, Dictionary<String> dict,
List<Tuple2<String, Tuple3<Writable, Writable, String>>> result)
+ throws IOException, InterruptedException {
+ // output written to baseDir/colName/colName.rldict-r-00000 (etc)
+ String dictFileName = col.getIdentity() + "/" + col.getName() +
DICT_FILE_POSTFIX;
+
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream outputStream = new DataOutputStream(baos)) {
+ outputStream.writeUTF(dict.getClass().getName());
+ dict.write(outputStream);
+
+ result.add(new Tuple2<String, Tuple3<Writable, Writable,
String>>(BatchConstants.CFG_OUTPUT_DICT,
+ new Tuple3<Writable, Writable,
String>(NullWritable.get(),
+ new
ArrayPrimitiveWritable(baos.toByteArray()), dictFileName)));
+ }
+ }
+
+ private void outputStatistics(List<Long> allCuboids,
List<Tuple2<String, Tuple3<Writable, Writable, String>>> result)
+ throws IOException, InterruptedException {
+ // output written to baseDir/statistics/statistics-r-00000 (etc)
+ String statisticsFileName = BatchConstants.CFG_OUTPUT_STATISTICS +
"/" + BatchConstants.CFG_OUTPUT_STATISTICS;
+
+ // mapper overlap ratio at key -1
+ long grandTotal = 0;
+
+ for (HLLCounter hll : cuboidHLLMap.values()) {
+ grandTotal += hll.getCountEstimate();
+ }
+
+ double mapperOverlapRatio = grandTotal == 0 ? 0 : (double)
totalRowsBeforeMerge / grandTotal;
+ result.add(new Tuple2<String, Tuple3<Writable, Writable,
String>>(BatchConstants.CFG_OUTPUT_STATISTICS,
+ new Tuple3<Writable, Writable, String>(new
LongWritable(-1),
+ new
BytesWritable(Bytes.toBytes(mapperOverlapRatio)), statisticsFileName)));
+
+ // mapper number at key -2
+ result.add(new Tuple2<String, Tuple3<Writable, Writable,
String>>(BatchConstants.CFG_OUTPUT_STATISTICS,
+ new Tuple3<Writable, Writable, String>(new
LongWritable(-2),
+ new
BytesWritable(Bytes.toBytes(baseCuboidRowCountInMappers.size())),
statisticsFileName)));
+
+ // sampling percentage at key 0
+ result.add(new Tuple2<String, Tuple3<Writable, Writable,
String>>(BatchConstants.CFG_OUTPUT_STATISTICS,
+ new Tuple3<Writable, Writable, String>(new
LongWritable(0L),
+ new BytesWritable(Bytes.toBytes(samplingPercent)),
statisticsFileName)));
+
+ ByteBuffer valueBuf =
ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+
+ for (long i : allCuboids) {
+ valueBuf.clear();
+ cuboidHLLMap.get(i).writeRegisters(valueBuf);
+ valueBuf.flip();
+
+ byte[] valueCopy = new byte[valueBuf.limit()];
+ System.arraycopy(valueBuf.array(), 0, valueCopy, 0,
valueBuf.limit());
+
+ result.add(new Tuple2<String, Tuple3<Writable, Writable,
String>>(BatchConstants.CFG_OUTPUT_STATISTICS,
+ new Tuple3<Writable, Writable, String>(new
LongWritable(i),
+ new BytesWritable(valueCopy,
valueCopy.length), statisticsFileName)));
+ }
+ }
+ }
+}
diff --git
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index 31eebc8..82a1a9b 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -24,23 +24,31 @@ import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.source.SourceManager;
import org.apache.kylin.storage.StorageFactory;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import com.google.common.collect.Lists;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
public class SparkUtil {
@@ -130,6 +138,41 @@ public class SparkUtil {
sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress",
"true");
sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.type",
"BLOCK");
sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.codec",
"org.apache.hadoop.io.compress.DefaultCodec"); // or
org.apache.hadoop.io.compress.SnappyCodec
- }
+ }
+
+ public static JavaRDD<String[]> hiveRecordInputRDD(boolean isSequenceFile,
JavaSparkContext sc, String inputPath, String hiveTable) {
+ JavaRDD<String[]> recordRDD;
+
+ if (isSequenceFile) {
+ recordRDD = sc.sequenceFile(inputPath, BytesWritable.class,
Text.class).values()
+ .map(new Function<Text, String[]>() {
+ @Override
+ public String[] call(Text text) throws Exception {
+ String s = Bytes.toString(text.getBytes(), 0,
text.getLength());
+ return
s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
+ }
+ });
+ } else {
+ SparkSession sparkSession =
SparkSession.builder().config(sc.getConf()).enableHiveSupport().getOrCreate();
+ final Dataset intermediateTable = sparkSession.table(hiveTable);
+ recordRDD = intermediateTable.javaRDD().map(new Function<Row,
String[]>() {
+ @Override
+ public String[] call(Row row) throws Exception {
+ String[] result = new String[row.size()];
+ for (int i = 0; i < row.size(); i++) {
+ final Object o = row.get(i);
+ if (o != null) {
+ result[i] = o.toString();
+ } else {
+ result[i] = null;
+ }
+ }
+ return result;
+ }
+ });
+ }
+
+ return recordRDD;
+ }
}
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index 16bedb5..a3e7e68 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -307,17 +307,14 @@
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
- <version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
- <version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
- <version>2.11.0</version>
</dependency>
</dependencies>
diff --git a/pom.xml b/pom.xml
index d9b9efe..cd18659 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,6 +62,9 @@
<spark.version>2.1.2</spark.version>
<kryo.version>4.0.0</kryo.version>
+ <!-- Scala versions -->
+ <scala.version>2.11.0</scala.version>
+
<!-- <reflections.version>0.9.10</reflections.version> -->
<!-- Calcite Version -->
@@ -895,6 +898,24 @@
<version>${tomcat.version}</version>
<scope>provided</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>