This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch engine-flink
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/engine-flink by this push:
new d4f8eb2 KYLIN-3848 Flink cubing step : build by layer
d4f8eb2 is described below
commit d4f8eb28c05438daa68abfea28b4376d5b076668
Author: yanghua <[email protected]>
AuthorDate: Mon Mar 11 22:46:53 2019 +0800
KYLIN-3848 Flink cubing step : build by layer
---
.../org/apache/kylin/common/KylinConfigBase.java | 48 ++-
.../kylin/engine/flink/FlinkCubingByLayer.java | 434 +++++++++++++++++++++
.../org/apache/kylin/engine/flink/FlinkUtil.java | 156 ++++++++
3 files changed, 637 insertions(+), 1 deletion(-)
diff --git
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 81979dc..daae366 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -105,6 +105,22 @@ public abstract class KylinConfigBase implements
Serializable {
return getKylinHome() + File.separator + "spark";
}
+ public static String getFlinkHome() {
+ String flinkHome = System.getenv("FLINK_HOME");
+ if (StringUtils.isNotEmpty(flinkHome)) {
+ logger.info("FLINK_HOME was set to {}", flinkHome);
+ return flinkHome;
+ }
+
+ flinkHome = System.getProperty("FLINK_HOME");
+ if (StringUtils.isNotEmpty(flinkHome)) {
+ logger.info("FLINK_HOME was set to {}", flinkHome);
+ return flinkHome;
+ }
+
+ return getKylinHome() + File.separator + "flink";
+ }
+
public static String getTempDir() {
return System.getProperty("java.io.tmpdir");
}
@@ -1228,6 +1244,8 @@ public abstract class KylinConfigBase implements
Serializable {
r.put(0, "org.apache.kylin.engine.mr.MRBatchCubingEngine");
//IEngineAware.ID_MR_V1
r.put(2, "org.apache.kylin.engine.mr.MRBatchCubingEngine2");
//IEngineAware.ID_MR_V2
r.put(4, "org.apache.kylin.engine.spark.SparkBatchCubingEngine2");
//IEngineAware.ID_SPARK
+ r.put(5, "org.apache.kylin.engine.flink.FlinkBatchCubingEngine2");
//IEngineAware.ID_FLINK
+
r.putAll(convertKeyToInteger(getPropertiesByPrefix("kylin.engine.provider.")));
return r;
}
@@ -1278,10 +1296,18 @@ public abstract class KylinConfigBase implements
Serializable {
return getPropertiesByPrefix("kylin.engine.spark-conf.");
}
+ public Map<String, String> getFlinkConfigOverride() {
+ return getPropertiesByPrefix("kylin.engine.flink-conf.");
+ }
+
public Map<String, String> getSparkConfigOverrideWithSpecificName(String
configName) {
return getPropertiesByPrefix("kylin.engine.spark-conf-" + configName +
".");
}
+ public Map<String, String> getFlinkConfigOverrideWithSpecificName(String
configName) {
+ return getPropertiesByPrefix("kylin.engine.flink-conf-" + configName +
".");
+ }
+
public double getDefaultHadoopJobReducerInputMB() {
return
Double.parseDouble(getOptional("kylin.engine.mr.reduce-input-mb", "500"));
}
@@ -1353,18 +1379,34 @@ public abstract class KylinConfigBase implements
Serializable {
return getOptional("kylin.engine.spark.additional-jars", "");
}
+ public String getFlinkAdditionalJars() {
+ return getOptional("kylin.engine.flink.additional-jars", "");
+ }
+
public float getSparkRDDPartitionCutMB() {
return
Float.parseFloat(getOptional("kylin.engine.spark.rdd-partition-cut-mb",
"10.0"));
}
+ public float getFlinkPartitionCutMB() {
+ return
Float.parseFloat(getOptional("kylin.engine.flink.partition-cut-mb", "10.0"));
+ }
+
public int getSparkMinPartition() {
return
Integer.parseInt(getOptional("kylin.engine.spark.min-partition", "1"));
}
+ public int getFlinkMinPartition() {
+ return
Integer.parseInt(getOptional("kylin.engine.flink.min-partition", "1"));
+ }
+
public int getSparkMaxPartition() {
return
Integer.parseInt(getOptional("kylin.engine.spark.max-partition", "5000"));
}
+ public int getFlinkMaxPartition() {
+ return
Integer.parseInt(getOptional("kylin.engine.spark.max-partition", "5000"));
+ }
+
public String getSparkStorageLevel() {
return getOptional("kylin.engine.spark.storage-level",
"MEMORY_AND_DISK_SER");
}
@@ -1372,6 +1414,10 @@ public abstract class KylinConfigBase implements
Serializable {
public boolean isSparkSanityCheckEnabled() {
return
Boolean.parseBoolean(getOptional("kylin.engine.spark.sanity-check-enabled",
FALSE));
}
+
+ public boolean isFlinkSanityCheckEnabled() {
+ return
Boolean.parseBoolean(getOptional("kylin.engine.flink.sanity-check-enabled",
FALSE));
+ }
//
============================================================================
// ENGINE.LIVY
@@ -1396,7 +1442,7 @@ public abstract class KylinConfigBase implements
Serializable {
public Map<String, String> getLivyMap() {
return getPropertiesByPrefix("kylin.engine.livy-conf.livy-map.");
}
-
+
//
============================================================================
// QUERY
//
============================================================================
diff --git
a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
new file mode 100644
index 0000000..ba1f233
--- /dev/null
+++
b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.flink;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
+import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BaseCuboidBuilder;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.engine.mr.common.NDCuboidBuilder;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Locale;
+
+/**
+ * Flink application to build cube with the "by-layer" algorithm. Only support
source data from Hive; Metadata in HBase.
+ */
+public class FlinkCubingByLayer extends AbstractApplication implements
Serializable {
+
+ protected static final Logger logger =
LoggerFactory.getLogger(FlinkCubingByLayer.class);
+
+ public static final Option OPTION_CUBE_NAME =
OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+ .isRequired(true).withDescription("Cube
Name").create(BatchConstants.ARG_CUBE_NAME);
+ public static final Option OPTION_SEGMENT_ID =
OptionBuilder.withArgName("segment").hasArg().isRequired(true)
+ .withDescription("Cube Segment Id").create("segmentId");
+ public static final Option OPTION_META_URL =
OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+ .withDescription("HDFS metadata url").create("metaUrl");
+ public static final Option OPTION_OUTPUT_PATH =
OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+ .isRequired(true).withDescription("Cube output
path").create(BatchConstants.ARG_OUTPUT);
+ public static final Option OPTION_INPUT_TABLE =
OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true)
+ .withDescription("Hive Intermediate Table").create("hiveTable");
+ public static final Option OPTION_INPUT_PATH =
OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+ .isRequired(true).withDescription("Hive Intermediate Table
PATH").create(BatchConstants.ARG_INPUT);
+
+ private Options options;
+
+ public FlinkCubingByLayer() {
+ options = new Options();
+ options.addOption(OPTION_INPUT_TABLE);
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_META_URL);
+ options.addOption(OPTION_OUTPUT_PATH);
+ }
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ @Override
+ protected void execute(OptionsHelper optionsHelper) throws Exception {
+ String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+ String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
+ String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+ String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+ String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+ String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+
+ Job job = Job.getInstance();
+
+ HadoopUtil.deletePath(job.getConfiguration(), new Path(outputPath));
+
+ final SerializableConfiguration sConf = new
SerializableConfiguration(job.getConfiguration());
+ KylinConfig envConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+ final CubeInstance cubeInstance =
CubeManager.getInstance(envConfig).getCube(cubeName);
+ final CubeDesc cubeDesc = cubeInstance.getDescriptor();
+ final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+
+ logger.info("DataSet input path : {}", inputPath);
+ logger.info("DataSet output path : {}", outputPath);
+
+ FlinkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl);
+
+ int countMeasureIndex = 0;
+ for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+ if (measureDesc.getFunction().isCount() == true) {
+ break;
+ } else {
+ countMeasureIndex++;
+ }
+ }
+
+ final CubeStatsReader cubeStatsReader = new
CubeStatsReader(cubeSegment, envConfig);
+ boolean[] needAggr = new boolean[cubeDesc.getMeasures().size()];
+ boolean allNormalMeasure = true;
+ for (int i = 0; i < cubeDesc.getMeasures().size(); i++) {
+ needAggr[i] =
!cubeDesc.getMeasures().get(i).getFunction().getMeasureType().onlyAggrInBaseCuboid();
+ allNormalMeasure = allNormalMeasure && needAggr[i];
+ }
+
+ logger.info("All measure are normal (agg on all cuboids) ? : " +
allNormalMeasure);
+
+ boolean isSequenceFile =
JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
+
+ ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<String[]> hiveDataSet =
FlinkUtil.readHiveRecords(isSequenceFile, env, inputPath, hiveTable, job);
+
+ DataSet<Tuple2<ByteArray, Object[]>> encodedBaseDataSet =
hiveDataSet.map(
+ new EncodeBaseCuboidMapFunction(cubeName, segmentId, metaUrl,
sConf));
+
+ Long totalCount = 0L;
+ if (envConfig.isFlinkSanityCheckEnabled()) {
+ totalCount = encodedBaseDataSet.count();
+ }
+
+ final BaseCuboidReduceFunction baseCuboidReducerFunction = new
BaseCuboidReduceFunction(cubeName, metaUrl, sConf);
+
+ BaseCuboidReduceFunction reducerFunction = baseCuboidReducerFunction;
+ if (!allNormalMeasure) {
+ reducerFunction = new CuboidReduceFunction(cubeName, metaUrl,
sConf, needAggr);
+ }
+
+ final int totalLevels =
cubeSegment.getCuboidScheduler().getBuildLevel();
+ DataSet<Tuple2<ByteArray, Object[]>>[] allDataSets = new
DataSet[totalLevels + 1];
+ int level = 0;
+ int partition = FlinkUtil.estimateLayerPartitionNum(level,
cubeStatsReader, envConfig);
+
+ // aggregate to calculate base cuboid
+ allDataSets[0] =
encodedBaseDataSet.groupBy(0).reduce(baseCuboidReducerFunction).setParallelism(partition);
+
+ sinkToHDFS(allDataSets[0], metaUrl, cubeName, cubeSegment, outputPath,
0, Job.getInstance(), envConfig);
+
+ CuboidFlatMapFunction flatMapFunction = new
CuboidFlatMapFunction(cubeName, segmentId, metaUrl, sConf);
+
+ for (level = 1; level <= totalLevels; level++) {
+ partition = FlinkUtil.estimateLayerPartitionNum(level,
cubeStatsReader, envConfig);
+
+ allDataSets[level] = allDataSets[level -
1].flatMap(flatMapFunction).groupBy(0).reduce(reducerFunction).setParallelism(partition);
+ if (envConfig.isFlinkSanityCheckEnabled()) {
+ sanityCheck(allDataSets[level], totalCount, level,
cubeStatsReader, countMeasureIndex);
+ }
+ sinkToHDFS(allDataSets[level], metaUrl, cubeName, cubeSegment,
outputPath, level, Job.getInstance(), envConfig);
+ }
+
+ env.execute("Cubing for : " + cubeName + " segment " + segmentId);
+ logger.info("Finished on calculating all level cuboids.");
+ }
+
+ private void sinkToHDFS(
+ final DataSet<Tuple2<ByteArray, Object[]>> dataSet,
+ final String metaUrl,
+ final String cubeName,
+ final CubeSegment cubeSeg,
+ final String hdfsBaseLocation,
+ final int level,
+ final Job job,
+ final KylinConfig kylinConfig) throws Exception {
+ final String cuboidOutputPath =
BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
+ final SerializableConfiguration sConf = new
SerializableConfiguration(job.getConfiguration());
+ FlinkUtil.setHadoopConfForCuboid(job, cubeSeg, metaUrl);
+
+ HadoopOutputFormat<Text, Text> hadoopOF =
+ new HadoopOutputFormat<>(new SequenceFileOutputFormat<>(),
job);
+
+ SequenceFileOutputFormat.setOutputPath(job, new
Path(cuboidOutputPath));
+
+ dataSet.map(new RichMapFunction<Tuple2<ByteArray, Object[]>,
Tuple2<Text, Text>>() {
+
+ BufferedMeasureCodec codec;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ KylinConfig kylinConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset =
KylinConfig
+ .setAndUnsetThreadLocalConfig(kylinConfig)) {
+ CubeDesc desc =
CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
+ codec = new BufferedMeasureCodec(desc.getMeasures());
+ }
+ }
+
+ @Override
+ public Tuple2<Text, Text> map(Tuple2<ByteArray, Object[]> tuple2)
throws Exception {
+ ByteBuffer valueBuf = codec.encode(tuple2.f1);
+ org.apache.hadoop.io.Text textResult = new
org.apache.hadoop.io.Text();
+ textResult.set(valueBuf.array(), 0, valueBuf.position());
+ return new Tuple2<>(new
org.apache.hadoop.io.Text(tuple2.f0.array()), textResult);
+ }
+
+ }).output(hadoopOF);
+
+ logger.info("Persisting DataSet for level " + level + " into " +
cuboidOutputPath);
+ }
+
+ private void sanityCheck(DataSet<Tuple2<ByteArray, Object[]>> dataSet,
Long totalCount, int thisLevel,
+ CubeStatsReader cubeStatsReader, final int countMeasureIndex)
throws Exception {
+ int thisCuboidNum =
cubeStatsReader.getCuboidsByLayer(thisLevel).size();
+ Long count2 = getDataSetCountSum(dataSet, countMeasureIndex);
+ if (count2 != totalCount * thisCuboidNum) {
+ throw new IllegalStateException(
+ String.format(Locale.ROOT, "Sanity check failed, level %s,
total count(*) is %s; cuboid number %s",
+ thisLevel, count2, thisCuboidNum));
+ } else {
+ logger.info("sanity check success for level " + thisLevel + ",
count(*) is " + (count2 / thisCuboidNum));
+ }
+ }
+
+ private Long getDataSetCountSum(DataSet<Tuple2<ByteArray, Object[]>>
dataSet, final int countMeasureIndex) throws Exception {
+ Long count = dataSet.map((Tuple2<ByteArray, Object[]> byteArrayTuple2)
->
+ new Tuple2<>(byteArrayTuple2.f0, (Long)
byteArrayTuple2.f1[countMeasureIndex])
+ ).sum(1).count();
+
+ return count;
+ }
+
+ /**
+ * A map function used to encode the base cuboid.
+ */
+ private static class EncodeBaseCuboidMapFunction extends
RichMapFunction<String[], Tuple2<ByteArray, Object[]>> {
+
+ private BaseCuboidBuilder baseCuboidBuilder = null;
+ private String cubeName;
+ private String segmentId;
+ private String metaUrl;
+ private SerializableConfiguration conf;
+
+ public EncodeBaseCuboidMapFunction(String cubeName, String segmentId,
String metaUrl, SerializableConfiguration conf) {
+ this.cubeName = cubeName;
+ this.segmentId = segmentId;
+ this.metaUrl = metaUrl;
+ this.conf = conf;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ KylinConfig kConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset =
KylinConfig
+ .setAndUnsetThreadLocalConfig(kConfig)) {
+ CubeInstance cubeInstance =
CubeManager.getInstance(kConfig).getCube(cubeName);
+ CubeDesc cubeDesc = cubeInstance.getDescriptor();
+ CubeSegment cubeSegment =
cubeInstance.getSegmentById(segmentId);
+ CubeJoinedFlatTableEnrich interDesc = new
CubeJoinedFlatTableEnrich(
+ EngineFactory.getJoinedFlatTableDesc(cubeSegment),
cubeDesc);
+ long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ Cuboid baseCuboid = Cuboid.findForMandatory(cubeDesc,
baseCuboidId);
+ baseCuboidBuilder = new BaseCuboidBuilder(kConfig, cubeDesc,
cubeSegment, interDesc,
+ AbstractRowKeyEncoder.createInstance(cubeSegment,
baseCuboid),
+ MeasureIngester.create(cubeDesc.getMeasures()),
cubeSegment.buildDictionaryMap());
+ }
+ }
+
+ @Override
+ public Tuple2<ByteArray, Object[]> map(String[] rowArray) throws
Exception {
+ baseCuboidBuilder.resetAggrs();
+ byte[] rowKey = baseCuboidBuilder.buildKey(rowArray);
+ Object[] result = baseCuboidBuilder.buildValueObjects(rowArray);
+ return new Tuple2<>(new ByteArray(rowKey), result);
+ }
+ }
+
+ /**
+ * A reduce function used to aggregate base cuboid.
+ */
+ private static class BaseCuboidReduceFunction extends
RichReduceFunction<Tuple2<ByteArray, Object[]>> {
+
+ protected String cubeName;
+ protected String metaUrl;
+ protected CubeDesc cubeDesc;
+ protected int measureNum;
+ protected MeasureAggregators aggregators;
+ protected SerializableConfiguration conf;
+
+ public BaseCuboidReduceFunction(String cubeName, String metaUrl,
SerializableConfiguration conf) {
+ this.cubeName = cubeName;
+ this.metaUrl = metaUrl;
+ this.conf = conf;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ KylinConfig kConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset =
KylinConfig
+ .setAndUnsetThreadLocalConfig(kConfig)) {
+ CubeInstance cubeInstance =
CubeManager.getInstance(kConfig).getCube(cubeName);
+ cubeDesc = cubeInstance.getDescriptor();
+ aggregators = new MeasureAggregators(cubeDesc.getMeasures());
+ measureNum = cubeDesc.getMeasures().size();
+ }
+ }
+
+ @Override
+ public Tuple2<ByteArray, Object[]> reduce(Tuple2<ByteArray, Object[]>
input1,
+ Tuple2<ByteArray, Object[]> input2) throws Exception {
+ Object[] result = new Object[measureNum];
+ aggregators.aggregate(input1.f1, input2.f1, result);
+ return new Tuple2<>(input1.f0, result);
+ }
+
+ }
+
+ /**
+ * A reduce function does aggregation based on boolean flag array.
+ */
+ private static class CuboidReduceFunction extends BaseCuboidReduceFunction
{
+
+ private boolean[] needAgg;
+
+ public CuboidReduceFunction(String cubeName, String metaUrl,
SerializableConfiguration conf, boolean[] needAgg) {
+ super(cubeName, metaUrl, conf);
+ this.needAgg = needAgg;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ }
+
+ @Override
+ public Tuple2<ByteArray, Object[]> reduce(Tuple2<ByteArray, Object[]>
input1,
+ Tuple2<ByteArray, Object[]> input2) throws Exception {
+ Object[] result = new Object[measureNum];
+ aggregators.aggregate(input1.f1, input2.f1, result, needAgg);
+ return new Tuple2<>(input1.f0, result);
+ }
+ }
+
+ /**
+ * A flatmap function which extracts a cuboid's children cuboids and emit
them to the down stream.
+ */
+ private static class CuboidFlatMapFunction extends
RichFlatMapFunction<Tuple2<ByteArray, Object[]>, Tuple2<ByteArray, Object[]>> {
+
+ private String cubeName;
+ private String segmentId;
+ private String metaUrl;
+ private CubeSegment cubeSegment;
+ private CubeDesc cubeDesc;
+ private NDCuboidBuilder ndCuboidBuilder;
+ private RowKeySplitter rowKeySplitter;
+ private SerializableConfiguration conf;
+
+ public CuboidFlatMapFunction(String cubeName, String segmentId, String
metaUrl, SerializableConfiguration conf) {
+ this.cubeName = cubeName;
+ this.segmentId = segmentId;
+ this.metaUrl = metaUrl;
+ this.conf = conf;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ KylinConfig kConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset =
KylinConfig
+ .setAndUnsetThreadLocalConfig(kConfig)) {
+ CubeInstance cubeInstance =
CubeManager.getInstance(kConfig).getCube(cubeName);
+ this.cubeSegment = cubeInstance.getSegmentById(segmentId);
+ this.cubeDesc = cubeInstance.getDescriptor();
+ this.ndCuboidBuilder = new NDCuboidBuilder(cubeSegment, new
RowKeyEncoderProvider(cubeSegment));
+ this.rowKeySplitter = new RowKeySplitter(cubeSegment);
+ }
+ }
+
+ @Override
+ public void flatMap(Tuple2<ByteArray, Object[]> tuple2,
Collector<Tuple2<ByteArray, Object[]>> collector) throws Exception {
+ byte[] key = tuple2.f0.array();
+ long cuboidId = rowKeySplitter.parseCuboid(key);
+ final List<Long> myChildren =
cubeSegment.getCuboidScheduler().getSpanningCuboid(cuboidId);
+
+ // if still empty or null
+ if (myChildren == null || myChildren.size() == 0) {
+ return;
+ }
+ rowKeySplitter.split(key);
+ final Cuboid parentCuboid = Cuboid.findForMandatory(cubeDesc,
cuboidId);
+
+ for (Long child : myChildren) {
+ Cuboid childCuboid = Cuboid.findForMandatory(cubeDesc, child);
+ ByteArray result = ndCuboidBuilder.buildKey2(parentCuboid,
childCuboid,
+ rowKeySplitter.getSplitBuffers());
+
+ collector.collect(new Tuple2<>(result, tuple2.f1));
+ }
+ }
+ }
+
+}
diff --git
a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
new file mode 100644
index 0000000..4473a44
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.flink;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.hadoopcompatibility.HadoopInputs;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.source.SourceManager;
+import org.apache.kylin.storage.StorageFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A helper class which contains some util methods used by Flink cube engine.
+ */
+public class FlinkUtil {
+
+ public static IFlinkInput.IFlinkBatchCubingInputSide
getBatchCubingInputSide(CubeSegment seg) {
+ IJoinedFlatTableDesc flatDesc =
EngineFactory.getJoinedFlatTableDesc(seg);
+ return SourceManager.createEngineAdapter(seg,
IFlinkInput.class).getBatchCubingInputSide(flatDesc);
+ }
+
+ public static IFlinkOutput.IFlinkBatchCubingOutputSide
getBatchCubingOutputSide(CubeSegment seg) {
+ return StorageFactory.createEngineAdapter(seg,
IFlinkOutput.class).getBatchCubingOutputSide(seg);
+ }
+
+ public static IFlinkOutput.IFlinkBatchMergeOutputSide
getBatchMergeOutputSide2(CubeSegment seg) {
+ return StorageFactory.createEngineAdapter(seg,
IFlinkOutput.class).getBatchMergeOutputSide(seg);
+ }
+
+ public static IFlinkInput.IFlinkBatchMergeInputSide
getBatchMergeInputSide(CubeSegment seg) {
+ return SourceManager.createEngineAdapter(seg,
IFlinkInput.class).getBatchMergeInputSide(seg);
+ }
+
+ public static IMROutput2.IMRBatchOptimizeOutputSide2
getBatchOptimizeOutputSide2(CubeSegment seg) {
+ return StorageFactory.createEngineAdapter(seg,
IMROutput2.class).getBatchOptimizeOutputSide(seg);
+ }
+
+ public static DataSet parseInputPath(String inputPath, FileSystem fs,
ExecutionEnvironment env, Class keyClass,
+ Class valueClass) throws IOException {
+ List<String> inputFolders = Lists.newArrayList();
+ Path inputHDFSPath = new Path(inputPath);
+ FileStatus[] fileStatuses = fs.listStatus(inputHDFSPath);
+ boolean hasDir = false;
+ for (FileStatus stat : fileStatuses) {
+ if (stat.isDirectory() &&
!stat.getPath().getName().startsWith("_")) {
+ hasDir = true;
+ inputFolders.add(stat.getPath().toString());
+ }
+ }
+
+ if (!hasDir) {
+ return env.createInput(HadoopInputs.readSequenceFile(keyClass,
valueClass, inputHDFSPath.toString()));
+ }
+
+ return env.createInput(HadoopInputs.readSequenceFile(keyClass,
valueClass, StringUtil.join(inputFolders, ",")));
+ }
+
+
+ public static int estimateLayerPartitionNum(int level, CubeStatsReader
statsReader, KylinConfig kylinConfig) {
+ double baseCuboidSize = statsReader.estimateLayerSize(level);
+ float partitionCutMB = kylinConfig.getFlinkPartitionCutMB();
+ int partition = (int) (baseCuboidSize / partitionCutMB);
+ partition = Math.max(kylinConfig.getFlinkMinPartition(), partition);
+ partition = Math.min(kylinConfig.getFlinkMaxPartition(), partition);
+ return partition;
+ }
+
+ public static int estimateTotalPartitionNum(CubeStatsReader statsReader,
KylinConfig kylinConfig) {
+ double totalSize = 0;
+ for (double x: statsReader.getCuboidSizeMap().values()) {
+ totalSize += x;
+ }
+ float partitionCutMB = kylinConfig.getFlinkPartitionCutMB();
+ int partition = (int) (totalSize / partitionCutMB);
+ partition = Math.max(kylinConfig.getFlinkMinPartition(), partition);
+ partition = Math.min(kylinConfig.getFlinkMaxPartition(), partition);
+ return partition;
+ }
+
+ public static void setHadoopConfForCuboid(Job job, CubeSegment segment,
String metaUrl) throws Exception {
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ }
+
+ public static void modifyFlinkHadoopConfiguration(Job job) throws
Exception {
+ job.getConfiguration().set("dfs.replication", "2"); // cuboid
intermediate files, replication=2
+
job.getConfiguration().set("mapreduce.output.fileoutputformat.compress",
"true");
+
job.getConfiguration().set("mapreduce.output.fileoutputformat.compress.type",
"BLOCK");
+
job.getConfiguration().set("mapreduce.output.fileoutputformat.compress.codec",
"org.apache.hadoop.io.compress.DefaultCodec"); // or
org.apache.hadoop.io.compress.SnappyCodec
+ }
+
+ public static DataSet<String[]> readHiveRecords(boolean isSequenceFile,
ExecutionEnvironment env, String inputPath, String hiveTable, Job job) throws
IOException {
+ DataSet<String[]> recordDataSet;
+
+ if (isSequenceFile) {
+ recordDataSet = env
+ .createInput(HadoopInputs.readHadoopFile(new
SequenceFileInputFormat(), BytesWritable.class, Text.class, inputPath, job),
+ TypeInformation.of(new
TypeHint<Tuple2<BytesWritable, Text>>() {}))
+ .map(new MapFunction<Tuple2<BytesWritable, Text>,
String[]>() {
+ @Override
+ public String[] map(Tuple2<BytesWritable, Text>
tuple2) throws Exception {
+
+ System.out.println("read records from hive.");
+
+ String s = Bytes.toString(tuple2.f1.getBytes(), 0,
tuple2.f1.getLength());
+ return
s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
+ }
+ });
+ } else {
+ throw new UnsupportedOperationException("Currently, Flink does not
support read hive table directly.");
+ }
+
+ return recordDataSet;
+ }
+
+}