[
https://issues.apache.org/jira/browse/KYLIN-3597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633329#comment-16633329
]
ASF GitHub Bot commented on KYLIN-3597:
---------------------------------------
shaofengshi closed pull request #278: KYLIN-3597 Close JavaSparkContext after
used.
URL: https://github.com/apache/kylin/pull/278
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
index 0b03f7077d..5037647a6f 100644
---
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
+++
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
@@ -111,127 +111,128 @@ protected void execute(OptionsHelper optionsHelper)
throws Exception {
conf.set("spark.kryo.registrator",
"org.apache.kylin.engine.spark.KylinKryoRegistrator");
conf.set("spark.kryo.registrationRequired",
"true").registerKryoClasses(kryoClassArray);
- JavaSparkContext sc = new JavaSparkContext(conf);
- SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set
dfs.replication=2 and enable compress
- KylinSparkJobListener jobListener = new KylinSparkJobListener();
- sc.sc().addSparkListener(jobListener);
-
- HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
- final SerializableConfiguration sConf = new
SerializableConfiguration(sc.hadoopConfiguration());
- final KylinConfig envConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
-
- final CubeInstance cubeInstance =
CubeManager.getInstance(envConfig).getCube(cubeName);
- final CubeDesc cubeDesc =
CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
- final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
- final CubeStatsReader cubeStatsReader = new
CubeStatsReader(cubeSegment, envConfig);
-
- logger.info("Input path: {}", inputPath);
- logger.info("Output path: {}", outputPath);
-
- final Job job = Job.getInstance(sConf.get());
-
- SparkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl);
-
- final MeasureAggregators aggregators = new
MeasureAggregators(cubeDesc.getMeasures());
- final Function2 reduceFunction = new Function2<Object[], Object[],
Object[]>() {
- @Override
- public Object[] call(Object[] input1, Object[] input2) throws
Exception {
- Object[] measureObjs = new Object[input1.length];
- aggregators.aggregate(input1, input2, measureObjs);
- return measureObjs;
- }
- };
-
- final PairFunction convertTextFunction = new PairFunction<Tuple2<Text,
Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
- private transient volatile boolean initialized = false;
- BufferedMeasureCodec codec;
-
- @Override
- public Tuple2<org.apache.hadoop.io.Text,
org.apache.hadoop.io.Text> call(Tuple2<Text, Object[]> tuple2)
- throws Exception {
-
- if (initialized == false) {
- synchronized (SparkCubingMerge.class) {
- if (initialized == false) {
- synchronized (SparkCubingMerge.class) {
- if (initialized == false) {
- KylinConfig kylinConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
- try
(KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
-
.setAndUnsetThreadLocalConfig(kylinConfig)) {
- CubeDesc desc =
CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
- codec = new
BufferedMeasureCodec(desc.getMeasures());
- initialized = true;
+ try (JavaSparkContext sc = new JavaSparkContext(conf)) {
+ SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set
dfs.replication=2 and enable compress
+ KylinSparkJobListener jobListener = new KylinSparkJobListener();
+ sc.sc().addSparkListener(jobListener);
+
+ HadoopUtil.deletePath(sc.hadoopConfiguration(), new
Path(outputPath));
+ final SerializableConfiguration sConf = new
SerializableConfiguration(sc.hadoopConfiguration());
+ final KylinConfig envConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+ final CubeInstance cubeInstance =
CubeManager.getInstance(envConfig).getCube(cubeName);
+ final CubeDesc cubeDesc =
CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
+ final CubeSegment cubeSegment =
cubeInstance.getSegmentById(segmentId);
+ final CubeStatsReader cubeStatsReader = new
CubeStatsReader(cubeSegment, envConfig);
+
+ logger.info("Input path: {}", inputPath);
+ logger.info("Output path: {}", outputPath);
+
+ final Job job = Job.getInstance(sConf.get());
+
+ SparkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl);
+
+ final MeasureAggregators aggregators = new
MeasureAggregators(cubeDesc.getMeasures());
+ final Function2 reduceFunction = new Function2<Object[], Object[],
Object[]>() {
+ @Override
+ public Object[] call(Object[] input1, Object[] input2) throws
Exception {
+ Object[] measureObjs = new Object[input1.length];
+ aggregators.aggregate(input1, input2, measureObjs);
+ return measureObjs;
+ }
+ };
+
+ final PairFunction convertTextFunction = new
PairFunction<Tuple2<Text, Object[]>, org.apache.hadoop.io.Text,
org.apache.hadoop.io.Text>() {
+ private transient volatile boolean initialized = false;
+ BufferedMeasureCodec codec;
+
+ @Override
+ public Tuple2<org.apache.hadoop.io.Text,
org.apache.hadoop.io.Text> call(Tuple2<Text, Object[]> tuple2)
+ throws Exception {
+
+ if (initialized == false) {
+ synchronized (SparkCubingMerge.class) {
+ if (initialized == false) {
+ synchronized (SparkCubingMerge.class) {
+ if (initialized == false) {
+ KylinConfig kylinConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+ try
(KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+
.setAndUnsetThreadLocalConfig(kylinConfig)) {
+ CubeDesc desc =
CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
+ codec = new
BufferedMeasureCodec(desc.getMeasures());
+ initialized = true;
+ }
}
}
}
}
}
+ ByteBuffer valueBuf = codec.encode(tuple2._2());
+ byte[] encodedBytes = new byte[valueBuf.position()];
+ System.arraycopy(valueBuf.array(), 0, encodedBytes, 0,
valueBuf.position());
+ return new Tuple2<>(tuple2._1(), new
org.apache.hadoop.io.Text(encodedBytes));
+ }
+ };
+
+ final int totalLevels =
cubeSegment.getCuboidScheduler().getBuildLevel();
+ final String[] inputFolders = StringSplitter.split(inputPath, ",");
+ FileSystem fs = HadoopUtil.getWorkingFileSystem();
+ boolean isLegacyMode = false;
+ for (String inputFolder : inputFolders) {
+ Path baseCuboidPath = new
Path(BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(inputFolder, 0));
+ if (fs.exists(baseCuboidPath) == false) {
+ // doesn't exist sub folder, that means the merged cuboid
in one folder (not by layer)
+ isLegacyMode = true;
+ break;
}
- ByteBuffer valueBuf = codec.encode(tuple2._2());
- byte[] encodedBytes = new byte[valueBuf.position()];
- System.arraycopy(valueBuf.array(), 0, encodedBytes, 0,
valueBuf.position());
- return new Tuple2<>(tuple2._1(), new
org.apache.hadoop.io.Text(encodedBytes));
- }
- };
-
- final int totalLevels =
cubeSegment.getCuboidScheduler().getBuildLevel();
- final String[] inputFolders = StringSplitter.split(inputPath, ",");
- FileSystem fs = HadoopUtil.getWorkingFileSystem();
- boolean isLegacyMode = false;
- for (String inputFolder : inputFolders) {
- Path baseCuboidPath = new
Path(BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(inputFolder, 0));
- if (fs.exists(baseCuboidPath) == false) {
- // doesn't exist sub folder, that means the merged cuboid in
one folder (not by layer)
- isLegacyMode = true;
- break;
- }
- }
-
- if (isLegacyMode == true) {
- // merge all layer's cuboid at once, this might be hard for Spark
- List<JavaPairRDD<Text, Object[]>> mergingSegs =
Lists.newArrayListWithExpectedSize(inputFolders.length);
- for (int i = 0; i < inputFolders.length; i++) {
- String path = inputFolders[i];
- JavaPairRDD segRdd = SparkUtil.parseInputPath(path, fs, sc,
Text.class, Text.class);
- CubeSegment sourceSegment = findSourceSegment(path,
cubeInstance);
- // re-encode with new dictionaries
- JavaPairRDD<Text, Object[]> newEcoddedRdd =
segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
- sourceSegment.getUuid(), cubeSegment.getUuid(),
metaUrl, sConf));
- mergingSegs.add(newEcoddedRdd);
}
- FileOutputFormat.setOutputPath(job, new Path(outputPath));
- sc.union(mergingSegs.toArray(new JavaPairRDD[mergingSegs.size()]))
- .reduceByKey(reduceFunction,
SparkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig))
-
.mapToPair(convertTextFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());
-
- } else {
- // merge by layer
- for (int level = 0; level <= totalLevels; level++) {
- List<JavaPairRDD<Text, Object[]>> mergingSegs =
Lists.newArrayList();
+ if (isLegacyMode == true) {
+ // merge all layer's cuboid at once, this might be hard for
Spark
+ List<JavaPairRDD<Text, Object[]>> mergingSegs =
Lists.newArrayListWithExpectedSize(inputFolders.length);
for (int i = 0; i < inputFolders.length; i++) {
String path = inputFolders[i];
+ JavaPairRDD segRdd = SparkUtil.parseInputPath(path, fs,
sc, Text.class, Text.class);
CubeSegment sourceSegment = findSourceSegment(path,
cubeInstance);
- final String cuboidInputPath =
BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(path, level);
- JavaPairRDD<Text, Text> segRdd =
sc.sequenceFile(cuboidInputPath, Text.class, Text.class);
// re-encode with new dictionaries
JavaPairRDD<Text, Object[]> newEcoddedRdd =
segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
sourceSegment.getUuid(), cubeSegment.getUuid(),
metaUrl, sConf));
mergingSegs.add(newEcoddedRdd);
}
- final String cuboidOutputPath =
BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(outputPath, level);
- FileOutputFormat.setOutputPath(job, new
Path(cuboidOutputPath));
-
+ FileOutputFormat.setOutputPath(job, new Path(outputPath));
sc.union(mergingSegs.toArray(new
JavaPairRDD[mergingSegs.size()]))
- .reduceByKey(reduceFunction,
- SparkUtil.estimateLayerPartitionNum(level,
cubeStatsReader, envConfig))
+ .reduceByKey(reduceFunction,
SparkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig))
.mapToPair(convertTextFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());
+
+ } else {
+ // merge by layer
+ for (int level = 0; level <= totalLevels; level++) {
+ List<JavaPairRDD<Text, Object[]>> mergingSegs =
Lists.newArrayList();
+ for (int i = 0; i < inputFolders.length; i++) {
+ String path = inputFolders[i];
+ CubeSegment sourceSegment = findSourceSegment(path,
cubeInstance);
+ final String cuboidInputPath =
BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(path, level);
+ JavaPairRDD<Text, Text> segRdd =
sc.sequenceFile(cuboidInputPath, Text.class, Text.class);
+ // re-encode with new dictionaries
+ JavaPairRDD<Text, Object[]> newEcoddedRdd =
segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
+ sourceSegment.getUuid(),
cubeSegment.getUuid(), metaUrl, sConf));
+ mergingSegs.add(newEcoddedRdd);
+ }
+
+ final String cuboidOutputPath =
BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(outputPath, level);
+ FileOutputFormat.setOutputPath(job, new
Path(cuboidOutputPath));
+
+ sc.union(mergingSegs.toArray(new
JavaPairRDD[mergingSegs.size()]))
+ .reduceByKey(reduceFunction,
+ SparkUtil.estimateLayerPartitionNum(level,
cubeStatsReader, envConfig))
+
.mapToPair(convertTextFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());
+ }
}
+ // output the data size to console, job engine will parse and save
the metric
+ // please note: this mechanism won't work when
spark.submit.deployMode=cluster
+ logger.info("HDFS: Number of bytes written={}",
jobListener.metrics.getBytesWritten());
}
- // output the data size to console, job engine will parse and save the
metric
- // please note: this mechanism won't work when
spark.submit.deployMode=cluster
- logger.info("HDFS: Number of bytes written={}",
jobListener.metrics.getBytesWritten());
}
static class ReEncodCuboidFunction implements PairFunction<Tuple2<Text,
Text>, Text, Object[]> {
diff --git
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
index 043f479e7d..5cfd2d7ccb 100644
---
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
+++
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
@@ -164,73 +164,75 @@ protected void execute(OptionsHelper optionsHelper)
throws Exception {
conf.set("spark.kryo.registrationRequired",
"true").registerKryoClasses(kryoClassArray);
KylinSparkJobListener jobListener = new KylinSparkJobListener();
- JavaSparkContext sc = new JavaSparkContext(conf);
- sc.sc().addSparkListener(jobListener);
- HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+ try (JavaSparkContext sc = new JavaSparkContext(conf)) {
+ sc.sc().addSparkListener(jobListener);
+ HadoopUtil.deletePath(sc.hadoopConfiguration(), new
Path(outputPath));
- final SerializableConfiguration sConf = new
SerializableConfiguration(sc.hadoopConfiguration());
- KylinConfig envConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+ final SerializableConfiguration sConf = new
SerializableConfiguration(sc.hadoopConfiguration());
+ KylinConfig envConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
- final CubeInstance cubeInstance =
CubeManager.getInstance(envConfig).getCube(cubeName);
+ final CubeInstance cubeInstance =
CubeManager.getInstance(envConfig).getCube(cubeName);
- final Job job = Job.getInstance(sConf.get());
+ final Job job = Job.getInstance(sConf.get());
- final FactDistinctColumnsReducerMapping reducerMapping = new
FactDistinctColumnsReducerMapping(cubeInstance);
+ final FactDistinctColumnsReducerMapping reducerMapping = new
FactDistinctColumnsReducerMapping(
+ cubeInstance);
- logger.info("RDD Output path: {}", outputPath);
- logger.info("getTotalReducerNum: {}",
reducerMapping.getTotalReducerNum());
- logger.info("getCuboidRowCounterReducerNum: {}",
reducerMapping.getCuboidRowCounterReducerNum());
- logger.info("counter path {}", counterPath);
+ logger.info("RDD Output path: {}", outputPath);
+ logger.info("getTotalReducerNum: {}",
reducerMapping.getTotalReducerNum());
+ logger.info("getCuboidRowCounterReducerNum: {}",
reducerMapping.getCuboidRowCounterReducerNum());
+ logger.info("counter path {}", counterPath);
- boolean isSequenceFile =
JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
+ boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE
+ .equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
- // calculate source record bytes size
- final LongAccumulator bytesWritten = sc.sc().longAccumulator();
+ // calculate source record bytes size
+ final LongAccumulator bytesWritten = sc.sc().longAccumulator();
- final JavaRDD<String[]> recordRDD =
SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable);
+ final JavaRDD<String[]> recordRDD =
SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable);
- JavaPairRDD<SelfDefineSortableKey, Text> flatOutputRDD =
recordRDD.mapPartitionsToPair(
- new FlatOutputFucntion(cubeName, segmentId, metaUrl, sConf,
samplingPercent, bytesWritten));
+ JavaPairRDD<SelfDefineSortableKey, Text> flatOutputRDD =
recordRDD.mapPartitionsToPair(
+ new FlatOutputFucntion(cubeName, segmentId, metaUrl,
sConf, samplingPercent, bytesWritten));
- JavaPairRDD<SelfDefineSortableKey, Iterable<Text>> aggredRDD =
flatOutputRDD
- .groupByKey(new FactDistinctPartitioner(cubeName, metaUrl,
sConf, reducerMapping.getTotalReducerNum()));
+ JavaPairRDD<SelfDefineSortableKey, Iterable<Text>> aggredRDD =
flatOutputRDD.groupByKey(
+ new FactDistinctPartitioner(cubeName, metaUrl, sConf,
reducerMapping.getTotalReducerNum()));
- JavaPairRDD<String, Tuple3<Writable, Writable, String>> outputRDD =
aggredRDD
- .mapPartitionsToPair(new MultiOutputFunction(cubeName,
metaUrl, sConf, samplingPercent));
+ JavaPairRDD<String, Tuple3<Writable, Writable, String>> outputRDD
= aggredRDD
+ .mapPartitionsToPair(new MultiOutputFunction(cubeName,
metaUrl, sConf, samplingPercent));
- // make each reducer output to respective dir
- MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN,
SequenceFileOutputFormat.class,
- NullWritable.class, Text.class);
- MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT,
SequenceFileOutputFormat.class,
- NullWritable.class, ArrayPrimitiveWritable.class);
- MultipleOutputs.addNamedOutput(job,
BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class,
- LongWritable.class, BytesWritable.class);
- MultipleOutputs.addNamedOutput(job,
BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class,
- NullWritable.class, LongWritable.class);
+ // make each reducer output to respective dir
+ MultipleOutputs.addNamedOutput(job,
BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class,
+ NullWritable.class, Text.class);
+ MultipleOutputs.addNamedOutput(job,
BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class,
+ NullWritable.class, ArrayPrimitiveWritable.class);
+ MultipleOutputs.addNamedOutput(job,
BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class,
+ LongWritable.class, BytesWritable.class);
+ MultipleOutputs.addNamedOutput(job,
BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class,
+ NullWritable.class, LongWritable.class);
- FileOutputFormat.setOutputPath(job, new Path(outputPath));
- FileOutputFormat.setCompressOutput(job, false);
+ FileOutputFormat.setOutputPath(job, new Path(outputPath));
+ FileOutputFormat.setCompressOutput(job, false);
- // prevent to create zero-sized default output
- LazyOutputFormat.setOutputFormatClass(job,
SequenceFileOutputFormat.class);
+ // prevent to create zero-sized default output
+ LazyOutputFormat.setOutputFormatClass(job,
SequenceFileOutputFormat.class);
+ MultipleOutputsRDD multipleOutputsRDD =
MultipleOutputsRDD.rddToMultipleOutputsRDD(outputRDD);
- MultipleOutputsRDD multipleOutputsRDD =
MultipleOutputsRDD.rddToMultipleOutputsRDD(outputRDD);
+
multipleOutputsRDD.saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration());
-
multipleOutputsRDD.saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration());
+ long recordCount = recordRDD.count();
+ logger.info("Map input records={}", recordCount);
+ logger.info("HDFS Read: {} HDFS Write", bytesWritten.value());
- long recordCount = recordRDD.count();
- logger.info("Map input records={}", recordCount);
- logger.info("HDFS Read: {} HDFS Write", bytesWritten.value());
+ Map<String, String> counterMap = Maps.newHashMap();
+ counterMap.put(ExecutableConstants.SOURCE_RECORDS_COUNT,
String.valueOf(recordCount));
+ counterMap.put(ExecutableConstants.SOURCE_RECORDS_SIZE,
String.valueOf(bytesWritten.value()));
- Map<String, String> counterMap = Maps.newHashMap();
- counterMap.put(ExecutableConstants.SOURCE_RECORDS_COUNT,
String.valueOf(recordCount));
- counterMap.put(ExecutableConstants.SOURCE_RECORDS_SIZE,
String.valueOf(bytesWritten.value()));
+ // save counter to hdfs
+ HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(),
counterPath, counterMap);
- // save counter to hdfs
- HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath,
counterMap);
-
- HadoopUtil.deleteHDFSMeta(metaUrl);
+ HadoopUtil.deleteHDFSMeta(metaUrl);
+ }
}
static class FlatOutputFucntion implements
PairFlatMapFunction<Iterator<String[]>, SelfDefineSortableKey, Text> {
diff --git
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
index 37f957fe31..bbdeb85ecc 100644
---
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
+++
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
@@ -124,36 +124,38 @@ protected void execute(OptionsHelper optionsHelper)
throws Exception {
conf.set("spark.kryo.registrator",
"org.apache.kylin.engine.spark.KylinKryoRegistrator");
conf.set("spark.kryo.registrationRequired",
"true").registerKryoClasses(kryoClassArray);
- JavaSparkContext sc = new JavaSparkContext(conf);
- KylinSparkJobListener jobListener = new KylinSparkJobListener();
- sc.sc().addSparkListener(jobListener);
+ try (JavaSparkContext sc = new JavaSparkContext(conf)) {
+ KylinSparkJobListener jobListener = new KylinSparkJobListener();
+ sc.sc().addSparkListener(jobListener);
- HadoopUtil.deletePath(sc.hadoopConfiguration(), new
Path(dictOutputPath));
+ HadoopUtil.deletePath(sc.hadoopConfiguration(), new
Path(dictOutputPath));
- final SerializableConfiguration sConf = new
SerializableConfiguration(sc.hadoopConfiguration());
- final KylinConfig envConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+ final SerializableConfiguration sConf = new
SerializableConfiguration(sc.hadoopConfiguration());
+ final KylinConfig envConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
- final CubeInstance cubeInstance =
CubeManager.getInstance(envConfig).getCube(cubeName);
- final CubeDesc cubeDesc =
CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
+ final CubeInstance cubeInstance =
CubeManager.getInstance(envConfig).getCube(cubeName);
+ final CubeDesc cubeDesc =
CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
- logger.info("Dictionary output path: {}", dictOutputPath);
- logger.info("Statistics output path: {}", statOutputPath);
+ logger.info("Dictionary output path: {}", dictOutputPath);
+ logger.info("Statistics output path: {}", statOutputPath);
- final TblColRef[] tblColRefs =
cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
- final int columnLength = tblColRefs.length;
+ final TblColRef[] tblColRefs =
cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
+ final int columnLength = tblColRefs.length;
- List<Integer> indexs = Lists.newArrayListWithCapacity(columnLength);
+ List<Integer> indexs =
Lists.newArrayListWithCapacity(columnLength);
- for (int i = 0; i <= columnLength; i++) {
- indexs.add(i);
- }
+ for (int i = 0; i <= columnLength; i++) {
+ indexs.add(i);
+ }
- JavaRDD<Integer> indexRDD = sc.parallelize(indexs, columnLength + 1);
+ JavaRDD<Integer> indexRDD = sc.parallelize(indexs, columnLength +
1);
- JavaPairRDD<Text, Text> colToDictPathRDD = indexRDD.mapToPair(new
MergeDictAndStatsFunction(cubeName, metaUrl,
- segmentId, segmentIds.split(","), statOutputPath, tblColRefs,
sConf));
+ JavaPairRDD<Text, Text> colToDictPathRDD = indexRDD.mapToPair(new
MergeDictAndStatsFunction(cubeName,
+ metaUrl, segmentId, segmentIds.split(","), statOutputPath,
tblColRefs, sConf));
- colToDictPathRDD.coalesce(1,
false).saveAsNewAPIHadoopFile(dictOutputPath, Text.class, Text.class,
SequenceFileOutputFormat.class);
+ colToDictPathRDD.coalesce(1,
false).saveAsNewAPIHadoopFile(dictOutputPath, Text.class, Text.class,
+ SequenceFileOutputFormat.class);
+ }
}
public static class MergeDictAndStatsFunction implements
PairFunction<Integer, Text, Text> {
diff --git
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index e2d43ba1c2..96690d00de 100644
---
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -134,112 +134,113 @@ protected void execute(OptionsHelper optionsHelper)
throws Exception {
conf.set("spark.kryo.registrationRequired",
"true").registerKryoClasses(kryoClassArray);
KylinSparkJobListener jobListener = new KylinSparkJobListener();
- JavaSparkContext sc = new JavaSparkContext(conf);
- sc.sc().addSparkListener(jobListener);
- final FileSystem fs =
partitionFilePath.getFileSystem(sc.hadoopConfiguration());
- if (!fs.exists(partitionFilePath)) {
- throw new IllegalArgumentException("File not exist: " +
partitionFilePath.toString());
- }
+ try (JavaSparkContext sc = new JavaSparkContext(conf)) {
+ sc.sc().addSparkListener(jobListener);
+ final FileSystem fs =
partitionFilePath.getFileSystem(sc.hadoopConfiguration());
+ if (!fs.exists(partitionFilePath)) {
+ throw new IllegalArgumentException("File not exist: " +
partitionFilePath.toString());
+ }
- HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
- final SerializableConfiguration sConf = new
SerializableConfiguration(sc.hadoopConfiguration());
+ HadoopUtil.deletePath(sc.hadoopConfiguration(), new
Path(outputPath));
+ final SerializableConfiguration sConf = new
SerializableConfiguration(sc.hadoopConfiguration());
- final KylinConfig envConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+ final KylinConfig envConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
- final CubeInstance cubeInstance =
CubeManager.getInstance(envConfig).getCube(cubeName);
- final CubeDesc cubeDesc = cubeInstance.getDescriptor();
- final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+ final CubeInstance cubeInstance =
CubeManager.getInstance(envConfig).getCube(cubeName);
+ final CubeDesc cubeDesc = cubeInstance.getDescriptor();
+ final CubeSegment cubeSegment =
cubeInstance.getSegmentById(segmentId);
- final MeasureCodec inputCodec = new
MeasureCodec(cubeDesc.getMeasures());
- final List<KeyValueCreator> keyValueCreators = Lists.newArrayList();
+ final MeasureCodec inputCodec = new
MeasureCodec(cubeDesc.getMeasures());
+ final List<KeyValueCreator> keyValueCreators =
Lists.newArrayList();
- for (HBaseColumnFamilyDesc cfDesc :
cubeDesc.getHbaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
- keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
+ for (HBaseColumnFamilyDesc cfDesc :
cubeDesc.getHbaseMapping().getColumnFamily()) {
+ for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+ keyValueCreators.add(new KeyValueCreator(cubeDesc,
colDesc));
+ }
}
- }
- final int cfNum = keyValueCreators.size();
- final boolean quickPath = (keyValueCreators.size() == 1) &&
keyValueCreators.get(0).isFullCopy;
-
- logger.info("Input path: {}", inputPath);
- logger.info("Output path: {}", outputPath);
- // read partition split keys
- List<RowKeyWritable> keys = new ArrayList<>();
- try (SequenceFile.Reader reader = new SequenceFile.Reader(fs,
partitionFilePath, sc.hadoopConfiguration())) {
- RowKeyWritable key = new RowKeyWritable();
- Writable value = NullWritable.get();
- while (reader.next(key, value)) {
- keys.add(key);
- logger.info(" ------- split key: {}", key);
- key = new RowKeyWritable(); // important, new an object!
+ final int cfNum = keyValueCreators.size();
+ final boolean quickPath = (keyValueCreators.size() == 1) &&
keyValueCreators.get(0).isFullCopy;
+
+ logger.info("Input path: {}", inputPath);
+ logger.info("Output path: {}", outputPath);
+ // read partition split keys
+ List<RowKeyWritable> keys = new ArrayList<>();
+ try (SequenceFile.Reader reader = new SequenceFile.Reader(fs,
partitionFilePath, sc.hadoopConfiguration())) {
+ RowKeyWritable key = new RowKeyWritable();
+ Writable value = NullWritable.get();
+ while (reader.next(key, value)) {
+ keys.add(key);
+ logger.info(" ------- split key: {}", key);
+ key = new RowKeyWritable(); // important, new an object!
+ }
}
- }
- logger.info("There are {} split keys, totally {} hfiles", keys.size(),
(keys.size() + 1));
+ logger.info("There are {} split keys, totally {} hfiles",
keys.size(), (keys.size() + 1));
- //HBase conf
- logger.info("Loading HBase configuration from:{}", hbaseConfFile);
- FSDataInputStream confInput = fs.open(new Path(hbaseConfFile));
+ //HBase conf
+ logger.info("Loading HBase configuration from:{}", hbaseConfFile);
+ FSDataInputStream confInput = fs.open(new Path(hbaseConfFile));
- Configuration hbaseJobConf = new Configuration();
- hbaseJobConf.addResource(confInput);
- hbaseJobConf.set("spark.hadoop.dfs.replication", "3"); // HFile,
replication=3
- Job job = Job.getInstance(hbaseJobConf,
cubeSegment.getStorageLocationIdentifier());
+ Configuration hbaseJobConf = new Configuration();
+ hbaseJobConf.addResource(confInput);
+ hbaseJobConf.set("spark.hadoop.dfs.replication", "3"); // HFile,
replication=3
+ Job job = Job.getInstance(hbaseJobConf,
cubeSegment.getStorageLocationIdentifier());
- FileOutputFormat.setOutputPath(job, new Path(outputPath));
+ FileOutputFormat.setOutputPath(job, new Path(outputPath));
- JavaPairRDD<Text, Text> inputRDDs =
SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
- final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
- if (quickPath) {
- hfilerdd = inputRDDs.mapToPair(new PairFunction<Tuple2<Text,
Text>, RowKeyWritable, KeyValue>() {
- @Override
- public Tuple2<RowKeyWritable, KeyValue> call(Tuple2<Text,
Text> textTextTuple2) throws Exception {
- KeyValue outputValue =
keyValueCreators.get(0).create(textTextTuple2._1,
- textTextTuple2._2.getBytes(), 0,
textTextTuple2._2.getLength());
- return new Tuple2<>(new
RowKeyWritable(outputValue.createKeyOnly(false).getKey()), outputValue);
- }
- });
- } else {
- hfilerdd = inputRDDs.flatMapToPair(new
PairFlatMapFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
- @Override
- public Iterator<Tuple2<RowKeyWritable, KeyValue>>
call(Tuple2<Text, Text> textTextTuple2)
- throws Exception {
-
- List<Tuple2<RowKeyWritable, KeyValue>> result =
Lists.newArrayListWithExpectedSize(cfNum);
- Object[] inputMeasures = new
Object[cubeDesc.getMeasures().size()];
-
inputCodec.decode(ByteBuffer.wrap(textTextTuple2._2.getBytes(), 0,
textTextTuple2._2.getLength()),
- inputMeasures);
-
- for (int i = 0; i < cfNum; i++) {
- KeyValue outputValue =
keyValueCreators.get(i).create(textTextTuple2._1, inputMeasures);
- result.add(new Tuple2<>(new
RowKeyWritable(outputValue.createKeyOnly(false).getKey()),
- outputValue));
+ JavaPairRDD<Text, Text> inputRDDs =
SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
+ final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
+ if (quickPath) {
+ hfilerdd = inputRDDs.mapToPair(new PairFunction<Tuple2<Text,
Text>, RowKeyWritable, KeyValue>() {
+ @Override
+ public Tuple2<RowKeyWritable, KeyValue> call(Tuple2<Text,
Text> textTextTuple2) throws Exception {
+ KeyValue outputValue =
keyValueCreators.get(0).create(textTextTuple2._1,
+ textTextTuple2._2.getBytes(), 0,
textTextTuple2._2.getLength());
+ return new Tuple2<>(new
RowKeyWritable(outputValue.createKeyOnly(false).getKey()), outputValue);
}
+ });
+ } else {
+ hfilerdd = inputRDDs.flatMapToPair(new
PairFlatMapFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
+ @Override
+ public Iterator<Tuple2<RowKeyWritable, KeyValue>>
call(Tuple2<Text, Text> textTextTuple2)
+ throws Exception {
- return result.iterator();
- }
- });
- }
+ List<Tuple2<RowKeyWritable, KeyValue>> result =
Lists.newArrayListWithExpectedSize(cfNum);
+ Object[] inputMeasures = new
Object[cubeDesc.getMeasures().size()];
+
inputCodec.decode(ByteBuffer.wrap(textTextTuple2._2.getBytes(), 0,
textTextTuple2._2.getLength()),
+ inputMeasures);
- hfilerdd.repartitionAndSortWithinPartitions(new HFilePartitioner(keys),
- RowKeyWritable.RowKeyComparator.INSTANCE)
- .mapToPair(new PairFunction<Tuple2<RowKeyWritable, KeyValue>,
ImmutableBytesWritable, KeyValue>() {
- @Override
- public Tuple2<ImmutableBytesWritable, KeyValue> call(
- Tuple2<RowKeyWritable, KeyValue>
rowKeyWritableKeyValueTuple2) throws Exception {
- return new Tuple2<>(new
ImmutableBytesWritable(rowKeyWritableKeyValueTuple2._2.getKey()),
- rowKeyWritableKeyValueTuple2._2);
+ for (int i = 0; i < cfNum; i++) {
+ KeyValue outputValue =
keyValueCreators.get(i).create(textTextTuple2._1, inputMeasures);
+ result.add(new Tuple2<>(new
RowKeyWritable(outputValue.createKeyOnly(false).getKey()),
+ outputValue));
+ }
+
+ return result.iterator();
}
- }).saveAsNewAPIHadoopDataset(job.getConfiguration());
+ });
+ }
- logger.info("HDFS: Number of bytes written={}",
jobListener.metrics.getBytesWritten());
+ hfilerdd.repartitionAndSortWithinPartitions(new
HFilePartitioner(keys),
+ RowKeyWritable.RowKeyComparator.INSTANCE)
+ .mapToPair(new PairFunction<Tuple2<RowKeyWritable,
KeyValue>, ImmutableBytesWritable, KeyValue>() {
+ @Override
+ public Tuple2<ImmutableBytesWritable, KeyValue> call(
+ Tuple2<RowKeyWritable, KeyValue>
rowKeyWritableKeyValueTuple2) throws Exception {
+ return new Tuple2<>(new
ImmutableBytesWritable(rowKeyWritableKeyValueTuple2._2.getKey()),
+ rowKeyWritableKeyValueTuple2._2);
+ }
+ }).saveAsNewAPIHadoopDataset(job.getConfiguration());
- Map<String, String> counterMap = Maps.newHashMap();
- counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN,
String.valueOf(jobListener.metrics.getBytesWritten()));
+ logger.info("HDFS: Number of bytes written={}",
jobListener.metrics.getBytesWritten());
- // save counter to hdfs
- HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath,
counterMap);
+ Map<String, String> counterMap = Maps.newHashMap();
+ counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN,
String.valueOf(jobListener.metrics.getBytesWritten()));
+
+ // save counter to hdfs
+ HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(),
counterPath, counterMap);
+ }
}
static class HFilePartitioner extends Partitioner {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Fix sonar reported static code issues
> -------------------------------------
>
> Key: KYLIN-3597
> URL: https://issues.apache.org/jira/browse/KYLIN-3597
> Project: Kylin
> Issue Type: Improvement
> Components: Others
> Reporter: Shaofeng SHI
> Priority: Major
> Fix For: v2.6.0
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)