[ 
https://issues.apache.org/jira/browse/KYLIN-3597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633329#comment-16633329
 ] 

ASF GitHub Bot commented on KYLIN-3597:
---------------------------------------

shaofengshi closed pull request #278: KYLIN-3597 Close JavaSparkContext after 
used.
URL: https://github.com/apache/kylin/pull/278
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
index 0b03f7077d..5037647a6f 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
@@ -111,127 +111,128 @@ protected void execute(OptionsHelper optionsHelper) 
throws Exception {
         conf.set("spark.kryo.registrator", 
"org.apache.kylin.engine.spark.KylinKryoRegistrator");
         conf.set("spark.kryo.registrationRequired", 
"true").registerKryoClasses(kryoClassArray);
 
-        JavaSparkContext sc = new JavaSparkContext(conf);
-        SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set 
dfs.replication=2 and enable compress
-        KylinSparkJobListener jobListener = new KylinSparkJobListener();
-        sc.sc().addSparkListener(jobListener);
-
-        HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
-        final SerializableConfiguration sConf = new 
SerializableConfiguration(sc.hadoopConfiguration());
-        final KylinConfig envConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
-
-        final CubeInstance cubeInstance = 
CubeManager.getInstance(envConfig).getCube(cubeName);
-        final CubeDesc cubeDesc = 
CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
-        final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
-        final CubeStatsReader cubeStatsReader = new 
CubeStatsReader(cubeSegment, envConfig);
-
-        logger.info("Input path: {}", inputPath);
-        logger.info("Output path: {}", outputPath);
-
-        final Job job = Job.getInstance(sConf.get());
-
-        SparkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl);
-
-        final MeasureAggregators aggregators = new 
MeasureAggregators(cubeDesc.getMeasures());
-        final Function2 reduceFunction = new Function2<Object[], Object[], 
Object[]>() {
-            @Override
-            public Object[] call(Object[] input1, Object[] input2) throws 
Exception {
-                Object[] measureObjs = new Object[input1.length];
-                aggregators.aggregate(input1, input2, measureObjs);
-                return measureObjs;
-            }
-        };
-
-        final PairFunction convertTextFunction = new PairFunction<Tuple2<Text, 
Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
-            private transient volatile boolean initialized = false;
-            BufferedMeasureCodec codec;
-
-            @Override
-            public Tuple2<org.apache.hadoop.io.Text, 
org.apache.hadoop.io.Text> call(Tuple2<Text, Object[]> tuple2)
-                    throws Exception {
-
-                if (initialized == false) {
-                    synchronized (SparkCubingMerge.class) {
-                        if (initialized == false) {
-                            synchronized (SparkCubingMerge.class) {
-                                if (initialized == false) {
-                                    KylinConfig kylinConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
-                                    try 
(KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
-                                            
.setAndUnsetThreadLocalConfig(kylinConfig)) {
-                                        CubeDesc desc = 
CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
-                                        codec = new 
BufferedMeasureCodec(desc.getMeasures());
-                                        initialized = true;
+        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
+            SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set 
dfs.replication=2 and enable compress
+            KylinSparkJobListener jobListener = new KylinSparkJobListener();
+            sc.sc().addSparkListener(jobListener);
+
+            HadoopUtil.deletePath(sc.hadoopConfiguration(), new 
Path(outputPath));
+            final SerializableConfiguration sConf = new 
SerializableConfiguration(sc.hadoopConfiguration());
+            final KylinConfig envConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+            final CubeInstance cubeInstance = 
CubeManager.getInstance(envConfig).getCube(cubeName);
+            final CubeDesc cubeDesc = 
CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
+            final CubeSegment cubeSegment = 
cubeInstance.getSegmentById(segmentId);
+            final CubeStatsReader cubeStatsReader = new 
CubeStatsReader(cubeSegment, envConfig);
+
+            logger.info("Input path: {}", inputPath);
+            logger.info("Output path: {}", outputPath);
+
+            final Job job = Job.getInstance(sConf.get());
+
+            SparkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl);
+
+            final MeasureAggregators aggregators = new 
MeasureAggregators(cubeDesc.getMeasures());
+            final Function2 reduceFunction = new Function2<Object[], Object[], 
Object[]>() {
+                @Override
+                public Object[] call(Object[] input1, Object[] input2) throws 
Exception {
+                    Object[] measureObjs = new Object[input1.length];
+                    aggregators.aggregate(input1, input2, measureObjs);
+                    return measureObjs;
+                }
+            };
+
+            final PairFunction convertTextFunction = new 
PairFunction<Tuple2<Text, Object[]>, org.apache.hadoop.io.Text, 
org.apache.hadoop.io.Text>() {
+                private transient volatile boolean initialized = false;
+                BufferedMeasureCodec codec;
+
+                @Override
+                public Tuple2<org.apache.hadoop.io.Text, 
org.apache.hadoop.io.Text> call(Tuple2<Text, Object[]> tuple2)
+                        throws Exception {
+
+                    if (initialized == false) {
+                        synchronized (SparkCubingMerge.class) {
+                            if (initialized == false) {
+                                synchronized (SparkCubingMerge.class) {
+                                    if (initialized == false) {
+                                        KylinConfig kylinConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+                                        try 
(KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+                                                
.setAndUnsetThreadLocalConfig(kylinConfig)) {
+                                            CubeDesc desc = 
CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
+                                            codec = new 
BufferedMeasureCodec(desc.getMeasures());
+                                            initialized = true;
+                                        }
                                     }
                                 }
                             }
                         }
                     }
+                    ByteBuffer valueBuf = codec.encode(tuple2._2());
+                    byte[] encodedBytes = new byte[valueBuf.position()];
+                    System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, 
valueBuf.position());
+                    return new Tuple2<>(tuple2._1(), new 
org.apache.hadoop.io.Text(encodedBytes));
+                }
+            };
+
+            final int totalLevels = 
cubeSegment.getCuboidScheduler().getBuildLevel();
+            final String[] inputFolders = StringSplitter.split(inputPath, ",");
+            FileSystem fs = HadoopUtil.getWorkingFileSystem();
+            boolean isLegacyMode = false;
+            for (String inputFolder : inputFolders) {
+                Path baseCuboidPath = new 
Path(BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(inputFolder, 0));
+                if (fs.exists(baseCuboidPath) == false) {
+                    // doesn't exist sub folder, that means the merged cuboid 
in one folder (not by layer)
+                    isLegacyMode = true;
+                    break;
                 }
-                ByteBuffer valueBuf = codec.encode(tuple2._2());
-                byte[] encodedBytes = new byte[valueBuf.position()];
-                System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, 
valueBuf.position());
-                return new Tuple2<>(tuple2._1(), new 
org.apache.hadoop.io.Text(encodedBytes));
-            }
-        };
-
-        final int totalLevels = 
cubeSegment.getCuboidScheduler().getBuildLevel();
-        final String[] inputFolders = StringSplitter.split(inputPath, ",");
-        FileSystem fs = HadoopUtil.getWorkingFileSystem();
-        boolean isLegacyMode = false;
-        for (String inputFolder : inputFolders) {
-            Path baseCuboidPath = new 
Path(BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(inputFolder, 0));
-            if (fs.exists(baseCuboidPath) == false) {
-                // doesn't exist sub folder, that means the merged cuboid in 
one folder (not by layer)
-                isLegacyMode = true;
-                break;
-            }
-        }
-
-        if (isLegacyMode == true) {
-            // merge all layer's cuboid at once, this might be hard for Spark
-            List<JavaPairRDD<Text, Object[]>> mergingSegs = 
Lists.newArrayListWithExpectedSize(inputFolders.length);
-            for (int i = 0; i < inputFolders.length; i++) {
-                String path = inputFolders[i];
-                JavaPairRDD segRdd = SparkUtil.parseInputPath(path, fs, sc, 
Text.class, Text.class);
-                CubeSegment sourceSegment = findSourceSegment(path, 
cubeInstance);
-                // re-encode with new dictionaries
-                JavaPairRDD<Text, Object[]> newEcoddedRdd = 
segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
-                        sourceSegment.getUuid(), cubeSegment.getUuid(), 
metaUrl, sConf));
-                mergingSegs.add(newEcoddedRdd);
             }
 
-            FileOutputFormat.setOutputPath(job, new Path(outputPath));
-            sc.union(mergingSegs.toArray(new JavaPairRDD[mergingSegs.size()]))
-                    .reduceByKey(reduceFunction, 
SparkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig))
-                    
.mapToPair(convertTextFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());
-
-        } else {
-            // merge by layer
-            for (int level = 0; level <= totalLevels; level++) {
-                List<JavaPairRDD<Text, Object[]>> mergingSegs = 
Lists.newArrayList();
+            if (isLegacyMode == true) {
+                // merge all layer's cuboid at once, this might be hard for 
Spark
+                List<JavaPairRDD<Text, Object[]>> mergingSegs = 
Lists.newArrayListWithExpectedSize(inputFolders.length);
                 for (int i = 0; i < inputFolders.length; i++) {
                     String path = inputFolders[i];
+                    JavaPairRDD segRdd = SparkUtil.parseInputPath(path, fs, 
sc, Text.class, Text.class);
                     CubeSegment sourceSegment = findSourceSegment(path, 
cubeInstance);
-                    final String cuboidInputPath = 
BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(path, level);
-                    JavaPairRDD<Text, Text> segRdd = 
sc.sequenceFile(cuboidInputPath, Text.class, Text.class);
                     // re-encode with new dictionaries
                     JavaPairRDD<Text, Object[]> newEcoddedRdd = 
segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
                             sourceSegment.getUuid(), cubeSegment.getUuid(), 
metaUrl, sConf));
                     mergingSegs.add(newEcoddedRdd);
                 }
 
-                final String cuboidOutputPath = 
BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(outputPath, level);
-                FileOutputFormat.setOutputPath(job, new 
Path(cuboidOutputPath));
-
+                FileOutputFormat.setOutputPath(job, new Path(outputPath));
                 sc.union(mergingSegs.toArray(new 
JavaPairRDD[mergingSegs.size()]))
-                        .reduceByKey(reduceFunction,
-                                SparkUtil.estimateLayerPartitionNum(level, 
cubeStatsReader, envConfig))
+                        .reduceByKey(reduceFunction, 
SparkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig))
                         
.mapToPair(convertTextFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());
+
+            } else {
+                // merge by layer
+                for (int level = 0; level <= totalLevels; level++) {
+                    List<JavaPairRDD<Text, Object[]>> mergingSegs = 
Lists.newArrayList();
+                    for (int i = 0; i < inputFolders.length; i++) {
+                        String path = inputFolders[i];
+                        CubeSegment sourceSegment = findSourceSegment(path, 
cubeInstance);
+                        final String cuboidInputPath = 
BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(path, level);
+                        JavaPairRDD<Text, Text> segRdd = 
sc.sequenceFile(cuboidInputPath, Text.class, Text.class);
+                        // re-encode with new dictionaries
+                        JavaPairRDD<Text, Object[]> newEcoddedRdd = 
segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
+                                sourceSegment.getUuid(), 
cubeSegment.getUuid(), metaUrl, sConf));
+                        mergingSegs.add(newEcoddedRdd);
+                    }
+
+                    final String cuboidOutputPath = 
BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(outputPath, level);
+                    FileOutputFormat.setOutputPath(job, new 
Path(cuboidOutputPath));
+
+                    sc.union(mergingSegs.toArray(new 
JavaPairRDD[mergingSegs.size()]))
+                            .reduceByKey(reduceFunction,
+                                    SparkUtil.estimateLayerPartitionNum(level, 
cubeStatsReader, envConfig))
+                            
.mapToPair(convertTextFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());
+                }
             }
+            // output the data size to console, job engine will parse and save 
the metric
+            // please note: this mechanism won't work when 
spark.submit.deployMode=cluster
+            logger.info("HDFS: Number of bytes written={}", 
jobListener.metrics.getBytesWritten());
         }
-        // output the data size to console, job engine will parse and save the 
metric
-        // please note: this mechanism won't work when 
spark.submit.deployMode=cluster
-        logger.info("HDFS: Number of bytes written={}", 
jobListener.metrics.getBytesWritten());
     }
 
     static class ReEncodCuboidFunction implements PairFunction<Tuple2<Text, 
Text>, Text, Object[]> {
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
index 043f479e7d..5cfd2d7ccb 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
@@ -164,73 +164,75 @@ protected void execute(OptionsHelper optionsHelper) 
throws Exception {
         conf.set("spark.kryo.registrationRequired", 
"true").registerKryoClasses(kryoClassArray);
 
         KylinSparkJobListener jobListener = new KylinSparkJobListener();
-        JavaSparkContext sc = new JavaSparkContext(conf);
-        sc.sc().addSparkListener(jobListener);
-        HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
+            sc.sc().addSparkListener(jobListener);
+            HadoopUtil.deletePath(sc.hadoopConfiguration(), new 
Path(outputPath));
 
-        final SerializableConfiguration sConf = new 
SerializableConfiguration(sc.hadoopConfiguration());
-        KylinConfig envConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+            final SerializableConfiguration sConf = new 
SerializableConfiguration(sc.hadoopConfiguration());
+            KylinConfig envConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
 
-        final CubeInstance cubeInstance = 
CubeManager.getInstance(envConfig).getCube(cubeName);
+            final CubeInstance cubeInstance = 
CubeManager.getInstance(envConfig).getCube(cubeName);
 
-        final Job job = Job.getInstance(sConf.get());
+            final Job job = Job.getInstance(sConf.get());
 
-        final FactDistinctColumnsReducerMapping reducerMapping = new 
FactDistinctColumnsReducerMapping(cubeInstance);
+            final FactDistinctColumnsReducerMapping reducerMapping = new 
FactDistinctColumnsReducerMapping(
+                    cubeInstance);
 
-        logger.info("RDD Output path: {}", outputPath);
-        logger.info("getTotalReducerNum: {}", 
reducerMapping.getTotalReducerNum());
-        logger.info("getCuboidRowCounterReducerNum: {}", 
reducerMapping.getCuboidRowCounterReducerNum());
-        logger.info("counter path {}", counterPath);
+            logger.info("RDD Output path: {}", outputPath);
+            logger.info("getTotalReducerNum: {}", 
reducerMapping.getTotalReducerNum());
+            logger.info("getCuboidRowCounterReducerNum: {}", 
reducerMapping.getCuboidRowCounterReducerNum());
+            logger.info("counter path {}", counterPath);
 
-        boolean isSequenceFile = 
JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
+            boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE
+                    .equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
 
-        // calculate source record bytes size
-        final LongAccumulator bytesWritten = sc.sc().longAccumulator();
+            // calculate source record bytes size
+            final LongAccumulator bytesWritten = sc.sc().longAccumulator();
 
-        final JavaRDD<String[]> recordRDD = 
SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable);
+            final JavaRDD<String[]> recordRDD = 
SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable);
 
-        JavaPairRDD<SelfDefineSortableKey, Text> flatOutputRDD = 
recordRDD.mapPartitionsToPair(
-                new FlatOutputFucntion(cubeName, segmentId, metaUrl, sConf, 
samplingPercent, bytesWritten));
+            JavaPairRDD<SelfDefineSortableKey, Text> flatOutputRDD = 
recordRDD.mapPartitionsToPair(
+                    new FlatOutputFucntion(cubeName, segmentId, metaUrl, 
sConf, samplingPercent, bytesWritten));
 
-        JavaPairRDD<SelfDefineSortableKey, Iterable<Text>> aggredRDD = 
flatOutputRDD
-                .groupByKey(new FactDistinctPartitioner(cubeName, metaUrl, 
sConf, reducerMapping.getTotalReducerNum()));
+            JavaPairRDD<SelfDefineSortableKey, Iterable<Text>> aggredRDD = 
flatOutputRDD.groupByKey(
+                    new FactDistinctPartitioner(cubeName, metaUrl, sConf, 
reducerMapping.getTotalReducerNum()));
 
-        JavaPairRDD<String, Tuple3<Writable, Writable, String>> outputRDD = 
aggredRDD
-                .mapPartitionsToPair(new MultiOutputFunction(cubeName, 
metaUrl, sConf, samplingPercent));
+            JavaPairRDD<String, Tuple3<Writable, Writable, String>> outputRDD 
= aggredRDD
+                    .mapPartitionsToPair(new MultiOutputFunction(cubeName, 
metaUrl, sConf, samplingPercent));
 
-        // make each reducer output to respective dir
-        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, 
SequenceFileOutputFormat.class,
-                NullWritable.class, Text.class);
-        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, 
SequenceFileOutputFormat.class,
-                NullWritable.class, ArrayPrimitiveWritable.class);
-        MultipleOutputs.addNamedOutput(job, 
BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class,
-                LongWritable.class, BytesWritable.class);
-        MultipleOutputs.addNamedOutput(job, 
BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class,
-                NullWritable.class, LongWritable.class);
+            // make each reducer output to respective dir
+            MultipleOutputs.addNamedOutput(job, 
BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class,
+                    NullWritable.class, Text.class);
+            MultipleOutputs.addNamedOutput(job, 
BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class,
+                    NullWritable.class, ArrayPrimitiveWritable.class);
+            MultipleOutputs.addNamedOutput(job, 
BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class,
+                    LongWritable.class, BytesWritable.class);
+            MultipleOutputs.addNamedOutput(job, 
BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class,
+                    NullWritable.class, LongWritable.class);
 
-        FileOutputFormat.setOutputPath(job, new Path(outputPath));
-        FileOutputFormat.setCompressOutput(job, false);
+            FileOutputFormat.setOutputPath(job, new Path(outputPath));
+            FileOutputFormat.setCompressOutput(job, false);
 
-        // prevent to create zero-sized default output
-        LazyOutputFormat.setOutputFormatClass(job, 
SequenceFileOutputFormat.class);
+            // prevent to create zero-sized default output
+            LazyOutputFormat.setOutputFormatClass(job, 
SequenceFileOutputFormat.class);
 
+            MultipleOutputsRDD multipleOutputsRDD = 
MultipleOutputsRDD.rddToMultipleOutputsRDD(outputRDD);
 
-        MultipleOutputsRDD multipleOutputsRDD = 
MultipleOutputsRDD.rddToMultipleOutputsRDD(outputRDD);
+            
multipleOutputsRDD.saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration());
 
-        
multipleOutputsRDD.saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration());
+            long recordCount = recordRDD.count();
+            logger.info("Map input records={}", recordCount);
+            logger.info("HDFS Read: {} HDFS Write", bytesWritten.value());
 
-        long recordCount = recordRDD.count();
-        logger.info("Map input records={}", recordCount);
-        logger.info("HDFS Read: {} HDFS Write", bytesWritten.value());
+            Map<String, String> counterMap = Maps.newHashMap();
+            counterMap.put(ExecutableConstants.SOURCE_RECORDS_COUNT, 
String.valueOf(recordCount));
+            counterMap.put(ExecutableConstants.SOURCE_RECORDS_SIZE, 
String.valueOf(bytesWritten.value()));
 
-        Map<String, String> counterMap = Maps.newHashMap();
-        counterMap.put(ExecutableConstants.SOURCE_RECORDS_COUNT, 
String.valueOf(recordCount));
-        counterMap.put(ExecutableConstants.SOURCE_RECORDS_SIZE, 
String.valueOf(bytesWritten.value()));
+            // save counter to hdfs
+            HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), 
counterPath, counterMap);
 
-        // save counter to hdfs
-        HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, 
counterMap);
-
-        HadoopUtil.deleteHDFSMeta(metaUrl);
+            HadoopUtil.deleteHDFSMeta(metaUrl);
+        }
     }
 
     static class FlatOutputFucntion implements 
PairFlatMapFunction<Iterator<String[]>, SelfDefineSortableKey, Text> {
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
index 37f957fe31..bbdeb85ecc 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
@@ -124,36 +124,38 @@ protected void execute(OptionsHelper optionsHelper) 
throws Exception {
         conf.set("spark.kryo.registrator", 
"org.apache.kylin.engine.spark.KylinKryoRegistrator");
         conf.set("spark.kryo.registrationRequired", 
"true").registerKryoClasses(kryoClassArray);
 
-        JavaSparkContext sc = new JavaSparkContext(conf);
-        KylinSparkJobListener jobListener = new KylinSparkJobListener();
-        sc.sc().addSparkListener(jobListener);
+        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
+            KylinSparkJobListener jobListener = new KylinSparkJobListener();
+            sc.sc().addSparkListener(jobListener);
 
-        HadoopUtil.deletePath(sc.hadoopConfiguration(), new 
Path(dictOutputPath));
+            HadoopUtil.deletePath(sc.hadoopConfiguration(), new 
Path(dictOutputPath));
 
-        final SerializableConfiguration sConf = new 
SerializableConfiguration(sc.hadoopConfiguration());
-        final KylinConfig envConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+            final SerializableConfiguration sConf = new 
SerializableConfiguration(sc.hadoopConfiguration());
+            final KylinConfig envConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
 
-        final CubeInstance cubeInstance = 
CubeManager.getInstance(envConfig).getCube(cubeName);
-        final CubeDesc cubeDesc = 
CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
+            final CubeInstance cubeInstance = 
CubeManager.getInstance(envConfig).getCube(cubeName);
+            final CubeDesc cubeDesc = 
CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
 
-        logger.info("Dictionary output path: {}", dictOutputPath);
-        logger.info("Statistics output path: {}", statOutputPath);
+            logger.info("Dictionary output path: {}", dictOutputPath);
+            logger.info("Statistics output path: {}", statOutputPath);
 
-        final TblColRef[] tblColRefs = 
cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
-        final int columnLength = tblColRefs.length;
+            final TblColRef[] tblColRefs = 
cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
+            final int columnLength = tblColRefs.length;
 
-        List<Integer> indexs = Lists.newArrayListWithCapacity(columnLength);
+            List<Integer> indexs = 
Lists.newArrayListWithCapacity(columnLength);
 
-        for (int i = 0; i <= columnLength; i++) {
-            indexs.add(i);
-        }
+            for (int i = 0; i <= columnLength; i++) {
+                indexs.add(i);
+            }
 
-        JavaRDD<Integer> indexRDD = sc.parallelize(indexs, columnLength + 1);
+            JavaRDD<Integer> indexRDD = sc.parallelize(indexs, columnLength + 
1);
 
-        JavaPairRDD<Text, Text> colToDictPathRDD = indexRDD.mapToPair(new 
MergeDictAndStatsFunction(cubeName, metaUrl,
-                segmentId, segmentIds.split(","), statOutputPath, tblColRefs, 
sConf));
+            JavaPairRDD<Text, Text> colToDictPathRDD = indexRDD.mapToPair(new 
MergeDictAndStatsFunction(cubeName,
+                    metaUrl, segmentId, segmentIds.split(","), statOutputPath, 
tblColRefs, sConf));
 
-        colToDictPathRDD.coalesce(1, 
false).saveAsNewAPIHadoopFile(dictOutputPath, Text.class, Text.class, 
SequenceFileOutputFormat.class);
+            colToDictPathRDD.coalesce(1, 
false).saveAsNewAPIHadoopFile(dictOutputPath, Text.class, Text.class,
+                    SequenceFileOutputFormat.class);
+        }
     }
 
     public static class MergeDictAndStatsFunction implements 
PairFunction<Integer, Text, Text> {
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index e2d43ba1c2..96690d00de 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -134,112 +134,113 @@ protected void execute(OptionsHelper optionsHelper) 
throws Exception {
         conf.set("spark.kryo.registrationRequired", 
"true").registerKryoClasses(kryoClassArray);
 
         KylinSparkJobListener jobListener = new KylinSparkJobListener();
-        JavaSparkContext sc = new JavaSparkContext(conf);
-        sc.sc().addSparkListener(jobListener);
-        final FileSystem fs = 
partitionFilePath.getFileSystem(sc.hadoopConfiguration());
-        if (!fs.exists(partitionFilePath)) {
-            throw new IllegalArgumentException("File not exist: " + 
partitionFilePath.toString());
-        }
+        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
+            sc.sc().addSparkListener(jobListener);
+            final FileSystem fs = 
partitionFilePath.getFileSystem(sc.hadoopConfiguration());
+            if (!fs.exists(partitionFilePath)) {
+                throw new IllegalArgumentException("File not exist: " + 
partitionFilePath.toString());
+            }
 
-        HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
-        final SerializableConfiguration sConf = new 
SerializableConfiguration(sc.hadoopConfiguration());
+            HadoopUtil.deletePath(sc.hadoopConfiguration(), new 
Path(outputPath));
+            final SerializableConfiguration sConf = new 
SerializableConfiguration(sc.hadoopConfiguration());
 
-        final KylinConfig envConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+            final KylinConfig envConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
 
-        final CubeInstance cubeInstance = 
CubeManager.getInstance(envConfig).getCube(cubeName);
-        final CubeDesc cubeDesc = cubeInstance.getDescriptor();
-        final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+            final CubeInstance cubeInstance = 
CubeManager.getInstance(envConfig).getCube(cubeName);
+            final CubeDesc cubeDesc = cubeInstance.getDescriptor();
+            final CubeSegment cubeSegment = 
cubeInstance.getSegmentById(segmentId);
 
-        final MeasureCodec inputCodec = new 
MeasureCodec(cubeDesc.getMeasures());
-        final List<KeyValueCreator> keyValueCreators = Lists.newArrayList();
+            final MeasureCodec inputCodec = new 
MeasureCodec(cubeDesc.getMeasures());
+            final List<KeyValueCreator> keyValueCreators = 
Lists.newArrayList();
 
-        for (HBaseColumnFamilyDesc cfDesc : 
cubeDesc.getHbaseMapping().getColumnFamily()) {
-            for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
-                keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
+            for (HBaseColumnFamilyDesc cfDesc : 
cubeDesc.getHbaseMapping().getColumnFamily()) {
+                for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+                    keyValueCreators.add(new KeyValueCreator(cubeDesc, 
colDesc));
+                }
             }
-        }
 
-        final int cfNum = keyValueCreators.size();
-        final boolean quickPath = (keyValueCreators.size() == 1) && 
keyValueCreators.get(0).isFullCopy;
-
-        logger.info("Input path: {}", inputPath);
-        logger.info("Output path: {}", outputPath);
-        // read partition split keys
-        List<RowKeyWritable> keys = new ArrayList<>();
-        try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, 
partitionFilePath, sc.hadoopConfiguration())) {
-            RowKeyWritable key = new RowKeyWritable();
-            Writable value = NullWritable.get();
-            while (reader.next(key, value)) {
-                keys.add(key);
-                logger.info(" ------- split key: {}", key);
-                key = new RowKeyWritable(); // important, new an object!
+            final int cfNum = keyValueCreators.size();
+            final boolean quickPath = (keyValueCreators.size() == 1) && 
keyValueCreators.get(0).isFullCopy;
+
+            logger.info("Input path: {}", inputPath);
+            logger.info("Output path: {}", outputPath);
+            // read partition split keys
+            List<RowKeyWritable> keys = new ArrayList<>();
+            try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, 
partitionFilePath, sc.hadoopConfiguration())) {
+                RowKeyWritable key = new RowKeyWritable();
+                Writable value = NullWritable.get();
+                while (reader.next(key, value)) {
+                    keys.add(key);
+                    logger.info(" ------- split key: {}", key);
+                    key = new RowKeyWritable(); // important, new an object!
+                }
             }
-        }
 
-        logger.info("There are {} split keys, totally {} hfiles", keys.size(), 
(keys.size() + 1));
+            logger.info("There are {} split keys, totally {} hfiles", 
keys.size(), (keys.size() + 1));
 
-        //HBase conf
-        logger.info("Loading HBase configuration from:{}", hbaseConfFile);
-        FSDataInputStream confInput = fs.open(new Path(hbaseConfFile));
+            //HBase conf
+            logger.info("Loading HBase configuration from:{}", hbaseConfFile);
+            FSDataInputStream confInput = fs.open(new Path(hbaseConfFile));
 
-        Configuration hbaseJobConf = new Configuration();
-        hbaseJobConf.addResource(confInput);
-        hbaseJobConf.set("spark.hadoop.dfs.replication", "3"); // HFile, 
replication=3
-        Job job = Job.getInstance(hbaseJobConf, 
cubeSegment.getStorageLocationIdentifier());
+            Configuration hbaseJobConf = new Configuration();
+            hbaseJobConf.addResource(confInput);
+            hbaseJobConf.set("spark.hadoop.dfs.replication", "3"); // HFile, 
replication=3
+            Job job = Job.getInstance(hbaseJobConf, 
cubeSegment.getStorageLocationIdentifier());
 
-        FileOutputFormat.setOutputPath(job, new Path(outputPath));
+            FileOutputFormat.setOutputPath(job, new Path(outputPath));
 
-        JavaPairRDD<Text, Text> inputRDDs = 
SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
-        final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
-        if (quickPath) {
-            hfilerdd = inputRDDs.mapToPair(new PairFunction<Tuple2<Text, 
Text>, RowKeyWritable, KeyValue>() {
-                @Override
-                public Tuple2<RowKeyWritable, KeyValue> call(Tuple2<Text, 
Text> textTextTuple2) throws Exception {
-                    KeyValue outputValue = 
keyValueCreators.get(0).create(textTextTuple2._1,
-                            textTextTuple2._2.getBytes(), 0, 
textTextTuple2._2.getLength());
-                    return new Tuple2<>(new 
RowKeyWritable(outputValue.createKeyOnly(false).getKey()), outputValue);
-                }
-            });
-        } else {
-            hfilerdd = inputRDDs.flatMapToPair(new 
PairFlatMapFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
-                @Override
-                public Iterator<Tuple2<RowKeyWritable, KeyValue>> 
call(Tuple2<Text, Text> textTextTuple2)
-                        throws Exception {
-
-                    List<Tuple2<RowKeyWritable, KeyValue>> result = 
Lists.newArrayListWithExpectedSize(cfNum);
-                    Object[] inputMeasures = new 
Object[cubeDesc.getMeasures().size()];
-                    
inputCodec.decode(ByteBuffer.wrap(textTextTuple2._2.getBytes(), 0, 
textTextTuple2._2.getLength()),
-                            inputMeasures);
-
-                    for (int i = 0; i < cfNum; i++) {
-                        KeyValue outputValue = 
keyValueCreators.get(i).create(textTextTuple2._1, inputMeasures);
-                        result.add(new Tuple2<>(new 
RowKeyWritable(outputValue.createKeyOnly(false).getKey()),
-                                outputValue));
+            JavaPairRDD<Text, Text> inputRDDs = 
SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
+            final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
+            if (quickPath) {
+                hfilerdd = inputRDDs.mapToPair(new PairFunction<Tuple2<Text, 
Text>, RowKeyWritable, KeyValue>() {
+                    @Override
+                    public Tuple2<RowKeyWritable, KeyValue> call(Tuple2<Text, 
Text> textTextTuple2) throws Exception {
+                        KeyValue outputValue = 
keyValueCreators.get(0).create(textTextTuple2._1,
+                                textTextTuple2._2.getBytes(), 0, 
textTextTuple2._2.getLength());
+                        return new Tuple2<>(new 
RowKeyWritable(outputValue.createKeyOnly(false).getKey()), outputValue);
                     }
+                });
+            } else {
+                hfilerdd = inputRDDs.flatMapToPair(new 
PairFlatMapFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
+                    @Override
+                    public Iterator<Tuple2<RowKeyWritable, KeyValue>> 
call(Tuple2<Text, Text> textTextTuple2)
+                            throws Exception {
 
-                    return result.iterator();
-                }
-            });
-        }
+                        List<Tuple2<RowKeyWritable, KeyValue>> result = 
Lists.newArrayListWithExpectedSize(cfNum);
+                        Object[] inputMeasures = new 
Object[cubeDesc.getMeasures().size()];
+                        
inputCodec.decode(ByteBuffer.wrap(textTextTuple2._2.getBytes(), 0, 
textTextTuple2._2.getLength()),
+                                inputMeasures);
 
-        hfilerdd.repartitionAndSortWithinPartitions(new HFilePartitioner(keys),
-                RowKeyWritable.RowKeyComparator.INSTANCE)
-                .mapToPair(new PairFunction<Tuple2<RowKeyWritable, KeyValue>, 
ImmutableBytesWritable, KeyValue>() {
-                    @Override
-                    public Tuple2<ImmutableBytesWritable, KeyValue> call(
-                            Tuple2<RowKeyWritable, KeyValue> 
rowKeyWritableKeyValueTuple2) throws Exception {
-                        return new Tuple2<>(new 
ImmutableBytesWritable(rowKeyWritableKeyValueTuple2._2.getKey()),
-                                rowKeyWritableKeyValueTuple2._2);
+                        for (int i = 0; i < cfNum; i++) {
+                            KeyValue outputValue = 
keyValueCreators.get(i).create(textTextTuple2._1, inputMeasures);
+                            result.add(new Tuple2<>(new 
RowKeyWritable(outputValue.createKeyOnly(false).getKey()),
+                                    outputValue));
+                        }
+
+                        return result.iterator();
                     }
-                }).saveAsNewAPIHadoopDataset(job.getConfiguration());
+                });
+            }
 
-        logger.info("HDFS: Number of bytes written={}", 
jobListener.metrics.getBytesWritten());
+            hfilerdd.repartitionAndSortWithinPartitions(new 
HFilePartitioner(keys),
+                    RowKeyWritable.RowKeyComparator.INSTANCE)
+                    .mapToPair(new PairFunction<Tuple2<RowKeyWritable, 
KeyValue>, ImmutableBytesWritable, KeyValue>() {
+                        @Override
+                        public Tuple2<ImmutableBytesWritable, KeyValue> call(
+                                Tuple2<RowKeyWritable, KeyValue> 
rowKeyWritableKeyValueTuple2) throws Exception {
+                            return new Tuple2<>(new 
ImmutableBytesWritable(rowKeyWritableKeyValueTuple2._2.getKey()),
+                                    rowKeyWritableKeyValueTuple2._2);
+                        }
+                    }).saveAsNewAPIHadoopDataset(job.getConfiguration());
 
-        Map<String, String> counterMap = Maps.newHashMap();
-        counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN, 
String.valueOf(jobListener.metrics.getBytesWritten()));
+            logger.info("HDFS: Number of bytes written={}", 
jobListener.metrics.getBytesWritten());
 
-        // save counter to hdfs
-        HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, 
counterMap);
+            Map<String, String> counterMap = Maps.newHashMap();
+            counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN, 
String.valueOf(jobListener.metrics.getBytesWritten()));
+
+            // save counter to hdfs
+            HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), 
counterPath, counterMap);
+        }
     }
 
     static class HFilePartitioner extends Partitioner {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Fix sonar reported static code issues
> -------------------------------------
>
>                 Key: KYLIN-3597
>                 URL: https://issues.apache.org/jira/browse/KYLIN-3597
>             Project: Kylin
>          Issue Type: Improvement
>          Components: Others
>            Reporter: Shaofeng SHI
>            Priority: Major
>             Fix For: v2.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to