This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 5316e19 KYLIN-3925 Add reduce step for FilterRecommendCuboidDataJob &
UpdateOldCuboidShardJob to avoid generating small hdfs files
5316e19 is described below
commit 5316e190acd85f52205b0849a0d8689004900c1b
Author: kyotoYaho <[email protected]>
AuthorDate: Mon Apr 1 15:45:34 2019 +0800
KYLIN-3925 Add reduce step for FilterRecommendCuboidDataJob &
UpdateOldCuboidShardJob to avoid generating small hdfs files
---
.../apache/kylin/cube/common/RowKeySplitter.java | 5 ++
.../kylin/engine/mr/common/BatchConstants.java | 2 +
.../engine/mr/common/ConvergeCuboidDataUtil.java | 57 ++++++++++++++++++
.../engine/mr/common/CuboidStatsReaderUtil.java | 9 ++-
.../kylin/engine/mr/common/MapReduceUtil.java | 32 +++++++++++
.../mr/steps/ConvergeCuboidDataPartitioner.java | 67 ++++++++++++++++++++++
...aMapper.java => ConvergeCuboidDataReducer.java} | 48 +++++++---------
.../mr/steps/FilterRecommendCuboidDataJob.java | 22 +++----
.../mr/steps/FilterRecommendCuboidDataMapper.java | 43 ++------------
.../engine/mr/steps/UpdateOldCuboidShardJob.java | 20 +++----
.../mr/steps/UpdateOldCuboidShardMapper.java | 42 +-------------
11 files changed, 213 insertions(+), 134 deletions(-)
diff --git
a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
index 264c7a5..1e09442 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
@@ -90,9 +90,14 @@ public class RowKeySplitter implements java.io.Serializable {
public long parseCuboid(byte[] bytes) {
+ return getCuboidId(bytes, enableSharding);
+ }
+
+ public static long getCuboidId(byte[] bytes, boolean enableSharding) {
int offset = enableSharding ? RowConstants.ROWKEY_SHARDID_LEN : 0;
return Bytes.toLong(bytes, offset, RowConstants.ROWKEY_CUBOIDID_LEN);
}
+
/**
* @param bytes
* @return cuboid ID
diff --git
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 66da1b2..af11bb6 100644
---
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -77,6 +77,8 @@ public interface BatchConstants {
String CFG_SHARD_NUM = "shard.num";
+ String CFG_CONVERGE_CUBOID_PARTITION_PARAM =
"converge.cuboid.partition.param";
+
/**
* command line ARGuments
*/
diff --git
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/ConvergeCuboidDataUtil.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/ConvergeCuboidDataUtil.java
new file mode 100644
index 0000000..87f2a28
--- /dev/null
+++
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/ConvergeCuboidDataUtil.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.steps.ConvergeCuboidDataPartitioner;
+import org.apache.kylin.engine.mr.steps.ConvergeCuboidDataReducer;
+
+public class ConvergeCuboidDataUtil {
+
+ public static void setupReducer(Job job, CubeSegment cubeSegment, Path
output) throws IOException {
+ // Output
+ //// prevent to create zero-sized default output
+ LazyOutputFormat.setOutputFormatClass(job,
SequenceFileOutputFormat.class);
+ FileOutputFormat.setOutputPath(job, output);
+
+ // Reducer
+ job.setReducerClass(ConvergeCuboidDataReducer.class);
+ job.setPartitionerClass(ConvergeCuboidDataPartitioner.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ Pair<Integer, Integer> numReduceTasks =
MapReduceUtil.getConvergeCuboidDataReduceTaskNums(cubeSegment);
+ job.setNumReduceTasks(numReduceTasks.getFirst());
+
+ int nBaseReduceTasks = numReduceTasks.getSecond();
+ boolean enableSharding = cubeSegment.isEnableSharding();
+ long baseCuboidId = cubeSegment.getCuboidScheduler().getBaseCuboidId();
+ String partiParams = enableSharding + "," + baseCuboidId + "," +
nBaseReduceTasks;
+
job.getConfiguration().set(BatchConstants.CFG_CONVERGE_CUBOID_PARTITION_PARAM,
partiParams);
+ }
+}
diff --git
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
index ee615c3..2ef70f8 100644
---
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
+++
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
@@ -135,6 +135,12 @@ public class CuboidStatsReaderUtil {
public static Map<Long, Long> readCuboidStatsFromSegment(Set<Long>
cuboidIds, CubeSegment cubeSegment)
throws IOException {
+ Pair<Map<Long, Long>, Long> stats =
readCuboidStatsWithSourceFromSegment(cuboidIds, cubeSegment);
+ return stats == null ? null : stats.getFirst();
+ }
+
+ public static Pair<Map<Long, Long>, Long>
readCuboidStatsWithSourceFromSegment(Set<Long> cuboidIds,
+ CubeSegment cubeSegment) throws IOException {
if (cubeSegment == null) {
logger.warn("The cube segment can not be " + null);
return null;
@@ -157,7 +163,6 @@ public class CuboidStatsReaderUtil {
cuboidsWithStats.put(cuboid, rowEstimate);
}
}
- return cuboidsWithStats;
+ return new Pair<>(cuboidsWithStats, cubeStatsReader.sourceRowCount);
}
-
}
diff --git
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
index 8fc26b4..ecde4aa 100644
---
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
+++
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
@@ -20,9 +20,11 @@ package org.apache.kylin.engine.mr.common;
import java.io.IOException;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
@@ -31,6 +33,8 @@ import org.apache.kylin.job.exception.JobException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Sets;
+
public class MapReduceUtil {
private static final Logger logger =
LoggerFactory.getLogger(MapReduceUtil.class);
@@ -112,7 +116,35 @@ public class MapReduceUtil {
for (Double cuboidSize : cubeSizeMap.values()) {
totalSizeInM += cuboidSize;
}
+ return getReduceTaskNum(totalSizeInM, kylinConfig);
+ }
+
+ // @return the first indicates the total reducer number, the second
indicates the reducer number for base cuboid
+ public static Pair<Integer, Integer>
getConvergeCuboidDataReduceTaskNums(CubeSegment cubeSeg) throws IOException {
+ long baseCuboidId = cubeSeg.getCuboidScheduler().getBaseCuboidId();
+
+ Set<Long> overlapCuboids =
Sets.newHashSet(cubeSeg.getCuboidScheduler().getAllCuboidIds());
+
overlapCuboids.retainAll(cubeSeg.getCubeInstance().getCuboidsRecommend());
+ overlapCuboids.add(baseCuboidId);
+
+ Pair<Map<Long, Long>, Long> cuboidStats = CuboidStatsReaderUtil
+ .readCuboidStatsWithSourceFromSegment(overlapCuboids, cubeSeg);
+ Map<Long, Double> cubeSizeMap =
CubeStatsReader.getCuboidSizeMapFromRowCount(cubeSeg, cuboidStats.getFirst(),
+ cuboidStats.getSecond());
+ double totalSizeInM = 0;
+ for (Double cuboidSize : cubeSizeMap.values()) {
+ totalSizeInM += cuboidSize;
+ }
+
+ double baseSizeInM = cubeSizeMap.get(baseCuboidId);
+
+ KylinConfig kylinConfig = cubeSeg.getConfig();
+ int nBase = getReduceTaskNum(baseSizeInM, kylinConfig);
+ int nOther = getReduceTaskNum(totalSizeInM - baseSizeInM, kylinConfig);
+ return new Pair<>(nBase + nOther, nBase);
+ }
+ private static int getReduceTaskNum(double totalSizeInM, KylinConfig
kylinConfig) {
double perReduceInputMB =
kylinConfig.getDefaultHadoopJobReducerInputMB();
double reduceCountRatio =
kylinConfig.getDefaultHadoopJobReducerCountRatio();
diff --git
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataPartitioner.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataPartitioner.java
new file mode 100644
index 0000000..605905a
--- /dev/null
+++
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataPartitioner.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+
+import com.google.common.base.Preconditions;
+
+public class ConvergeCuboidDataPartitioner extends Partitioner<Text, Text>
implements Configurable {
+
+ private Random rand = new Random();
+
+ private Configuration conf;
+ private boolean enableSharding;
+ private long baseCuboidID;
+ private int numReduceBaseCuboid;
+
+ @Override
+ public int getPartition(Text key, Text value, int numReduceTasks) {
+ long cuboidID = RowKeySplitter.getCuboidId(key.getBytes(),
enableSharding);
+ // the first numReduceBaseCuboid are for base cuboid
+ if (cuboidID == baseCuboidID) {
+ return rand.nextInt(numReduceBaseCuboid);
+ } else {
+ return numReduceBaseCuboid + rand.nextInt(numReduceTasks -
numReduceBaseCuboid);
+ }
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ String partiParam =
conf.get(BatchConstants.CFG_CONVERGE_CUBOID_PARTITION_PARAM);
+ String[] params = partiParam.split(",");
+ Preconditions.checkArgument(params.length >= 3);
+ this.enableSharding = Boolean.parseBoolean(params[0]);
+ this.baseCuboidID = Long.parseLong(params[1]);
+ this.numReduceBaseCuboid = Integer.parseInt(params[2]);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+}
diff --git
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataReducer.java
similarity index 69%
copy from
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java
copy to
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataReducer.java
index 2bb8349..78860bf 100644
---
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java
+++
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataReducer.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,7 +22,6 @@ import static
org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidBase;
import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidOld;
import java.io.IOException;
-import java.util.Set;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -35,19 +34,16 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.common.RowKeySplitter;
-import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.KylinReducer;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
-import com.google.common.base.Preconditions;
-
-public class FilterRecommendCuboidDataMapper extends KylinMapper<Text, Text,
Text, Text> {
+public class ConvergeCuboidDataReducer extends KylinReducer<Text, Text, Text,
Text> {
private MultipleOutputs mos;
- private RowKeySplitter rowKeySplitter;
+ private boolean enableSharding;
private long baseCuboid;
- private Set<Long> recommendCuboids;
@Override
protected void doSetup(Context context) throws IOException {
@@ -59,30 +55,28 @@ public class FilterRecommendCuboidDataMapper extends
KylinMapper<Text, Text, Tex
KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
- CubeManager cubeManager = CubeManager.getInstance(config);
- CubeInstance cube = cubeManager.getCube(cubeName);
- CubeSegment optSegment = cube.getSegmentById(segmentID);
- CubeSegment originalSegment =
cube.getOriginalSegmentToOptimize(optSegment);
-
- rowKeySplitter = new RowKeySplitter(originalSegment);
- baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
+ CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+ CubeSegment cubeSegment = cube.getSegmentById(segmentID);
+ CubeSegment oldSegment =
cube.getOriginalSegmentToOptimize(cubeSegment);
- recommendCuboids = cube.getCuboidsRecommend();
- Preconditions.checkNotNull(recommendCuboids, "The recommend cuboid map
could not be null");
+ this.enableSharding = oldSegment.isEnableSharding();
+ this.baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
}
@Override
- public void doMap(Text key, Text value, Context context) throws
IOException, InterruptedException {
- long cuboidID = rowKeySplitter.split(key.getBytes());
- if (cuboidID != baseCuboid && !recommendCuboids.contains(cuboidID)) {
- return;
+ public void doReduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
+ long cuboidID = RowKeySplitter.getCuboidId(key.getBytes(),
enableSharding);
+
+ String baseOutputPath = cuboidID == baseCuboid ? PathNameCuboidBase :
PathNameCuboidOld;
+ int n = 0;
+ for (Text value : values) {
+ mos.write(key, value, generateFileName(baseOutputPath));
+ n++;
}
-
- String baseOutputPath = PathNameCuboidOld;
- if (cuboidID == baseCuboid) {
- baseOutputPath = PathNameCuboidBase;
+ if (n > 1) {
+ throw new RuntimeException(
+ "multiple records share the same key in aggregated cuboid
data for cuboid " + cuboidID);
}
- mos.write(key, value, generateFileName(baseOutputPath));
}
@Override
diff --git
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataJob.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataJob.java
index 2fbbc73..1b8bf58 100644
---
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataJob.java
+++
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataJob.java
@@ -19,21 +19,20 @@
package org.apache.kylin.engine.mr.steps;
import java.util.Locale;
+
import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.ConvergeCuboidDataUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,26 +68,21 @@ public class FilterRecommendCuboidDataJob extends
AbstractHadoopJob {
// Mapper
job.setMapperClass(FilterRecommendCuboidDataMapper.class);
-
- // Reducer
- job.setNumReduceTasks(0);
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
// Input
job.setInputFormatClass(SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(job, input);
- // Output
- //// prevent to create zero-sized default output
- LazyOutputFormat.setOutputFormatClass(job,
SequenceFileOutputFormat.class);
- FileOutputFormat.setOutputPath(job, output);
+
+ // Reducer
+ ConvergeCuboidDataUtil.setupReducer(job, originalSegment, output);
// set job configuration
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID,
segmentID);
// add metadata to distributed cache
- attachSegmentMetadataWithDict(originalSegment,
job.getConfiguration());
+ attachSegmentMetadata(originalSegment, job.getConfiguration(),
false, false);
this.deletePath(job.getConfiguration(), output);
diff --git
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java
index 2bb8349..2fad4e9 100644
---
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java
+++
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java
@@ -18,18 +18,10 @@
package org.apache.kylin.engine.mr.steps;
-import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidBase;
-import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidOld;
-
import java.io.IOException;
import java.util.Set;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -43,16 +35,13 @@ import com.google.common.base.Preconditions;
public class FilterRecommendCuboidDataMapper extends KylinMapper<Text, Text,
Text, Text> {
- private MultipleOutputs mos;
-
- private RowKeySplitter rowKeySplitter;
+ private boolean enableSharding;
private long baseCuboid;
private Set<Long> recommendCuboids;
@Override
protected void doSetup(Context context) throws IOException {
super.bindCurrentConfiguration(context.getConfiguration());
- mos = new MultipleOutputs(context);
String cubeName =
context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
String segmentID =
context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
@@ -64,7 +53,7 @@ public class FilterRecommendCuboidDataMapper extends
KylinMapper<Text, Text, Tex
CubeSegment optSegment = cube.getSegmentById(segmentID);
CubeSegment originalSegment =
cube.getOriginalSegmentToOptimize(optSegment);
- rowKeySplitter = new RowKeySplitter(originalSegment);
+ enableSharding = originalSegment.isEnableSharding();
baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
recommendCuboids = cube.getCuboidsRecommend();
@@ -73,35 +62,11 @@ public class FilterRecommendCuboidDataMapper extends
KylinMapper<Text, Text, Tex
@Override
public void doMap(Text key, Text value, Context context) throws
IOException, InterruptedException {
- long cuboidID = rowKeySplitter.split(key.getBytes());
+ long cuboidID = RowKeySplitter.getCuboidId(key.getBytes(),
enableSharding);
if (cuboidID != baseCuboid && !recommendCuboids.contains(cuboidID)) {
return;
}
- String baseOutputPath = PathNameCuboidOld;
- if (cuboidID == baseCuboid) {
- baseOutputPath = PathNameCuboidBase;
- }
- mos.write(key, value, generateFileName(baseOutputPath));
- }
-
- @Override
- public void doCleanup(Context context) throws IOException,
InterruptedException {
- mos.close();
-
- Path outputDirBase = new
Path(context.getConfiguration().get(FileOutputFormat.OUTDIR),
PathNameCuboidBase);
- FileSystem fs = FileSystem.get(context.getConfiguration());
- if (!fs.exists(outputDirBase)) {
- fs.mkdirs(outputDirBase);
- SequenceFile
- .createWriter(context.getConfiguration(),
- SequenceFile.Writer.file(new Path(outputDirBase,
"part-m-00000")),
- SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(Text.class))
- .close();
- }
- }
-
- private String generateFileName(String subDir) {
- return subDir + "/part";
+ context.write(key, value);
}
}
diff --git
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java
index 80c483e..4012393 100644
---
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java
+++
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java
@@ -19,21 +19,20 @@
package org.apache.kylin.engine.mr.steps;
import java.util.Locale;
+
import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.ConvergeCuboidDataUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,20 +70,15 @@ public class UpdateOldCuboidShardJob extends
AbstractHadoopJob {
// Mapper
job.setMapperClass(UpdateOldCuboidShardMapper.class);
-
- // Reducer
- job.setNumReduceTasks(0);
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
// Input
job.setInputFormatClass(SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(job, input);
- // Output
- //// prevent to create zero-sized default output
- LazyOutputFormat.setOutputFormatClass(job,
SequenceFileOutputFormat.class);
- FileOutputFormat.setOutputPath(job, output);
+
+ // Reducer
+ ConvergeCuboidDataUtil.setupReducer(job, originalSegment, output);
// set job configuration
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
diff --git
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java
index 3d18bd6..ac1d499 100644
---
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java
+++
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java
@@ -18,17 +18,9 @@
package org.apache.kylin.engine.mr.steps;
-import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidBase;
-import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidOld;
-
import java.io.IOException;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.cube.CubeInstance;
@@ -50,9 +42,6 @@ public class UpdateOldCuboidShardMapper extends
KylinMapper<Text, Text, Text, Te
private static final Logger logger =
LoggerFactory.getLogger(UpdateOldCuboidShardMapper.class);
- private MultipleOutputs mos;
- private long baseCuboid;
-
private CubeDesc cubeDesc;
private RowKeySplitter rowKeySplitter;
private RowKeyEncoderProvider rowKeyEncoderProvider;
@@ -64,7 +53,6 @@ public class UpdateOldCuboidShardMapper extends
KylinMapper<Text, Text, Text, Te
@Override
protected void doSetup(Context context) throws IOException {
super.bindCurrentConfiguration(context.getConfiguration());
- mos = new MultipleOutputs(context);
String cubeName =
context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
String segmentID =
context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
@@ -76,7 +64,6 @@ public class UpdateOldCuboidShardMapper extends
KylinMapper<Text, Text, Text, Te
CubeSegment oldSegment =
cube.getOriginalSegmentToOptimize(cubeSegment);
cubeDesc = cube.getDescriptor();
- baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
rowKeySplitter = new RowKeySplitter(oldSegment);
rowKeyEncoderProvider = new RowKeyEncoderProvider(cubeSegment);
@@ -90,11 +77,7 @@ public class UpdateOldCuboidShardMapper extends
KylinMapper<Text, Text, Text, Te
int fullKeySize = buildKey(cuboid, rowKeySplitter.getSplitBuffers());
outputKey.set(newKeyBuf.array(), 0, fullKeySize);
- String baseOutputPath = PathNameCuboidOld;
- if (cuboidID == baseCuboid) {
- baseOutputPath = PathNameCuboidBase;
- }
- mos.write(outputKey, value, generateFileName(baseOutputPath));
+ context.write(outputKey, value);
}
private int buildKey(Cuboid cuboid, ByteArray[] splitBuffers) {
@@ -104,7 +87,8 @@ public class UpdateOldCuboidShardMapper extends
KylinMapper<Text, Text, Text, Te
int endIdx = startIdx + Long.bitCount(cuboid.getId());
int offset = 0;
for (int i = startIdx; i < endIdx; i++) {
- System.arraycopy(splitBuffers[i].array(),
splitBuffers[i].offset(), newKeyBodyBuf, offset, splitBuffers[i].length());
+ System.arraycopy(splitBuffers[i].array(),
splitBuffers[i].offset(), newKeyBodyBuf, offset,
+ splitBuffers[i].length());
offset += splitBuffers[i].length();
}
@@ -118,24 +102,4 @@ public class UpdateOldCuboidShardMapper extends
KylinMapper<Text, Text, Text, Te
return fullKeySize;
}
-
- @Override
- public void doCleanup(Context context) throws IOException,
InterruptedException {
- mos.close();
-
- Path outputDirBase = new
Path(context.getConfiguration().get(FileOutputFormat.OUTDIR),
PathNameCuboidBase);
- FileSystem fs = FileSystem.get(context.getConfiguration());
- if (!fs.exists(outputDirBase)) {
- fs.mkdirs(outputDirBase);
- SequenceFile
- .createWriter(context.getConfiguration(),
- SequenceFile.Writer.file(new Path(outputDirBase,
"part-m-00000")),
- SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(Text.class))
- .close();
- }
- }
-
- private String generateFileName(String subDir) {
- return subDir + "/part";
- }
}