This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new c897277  KYLIN-3482 Unclosed SetAndUnsetThreadLocalConfig in Spark 
engine
c897277 is described below

commit c8972772af60d0a6736acb063ff6c4b775790b4a
Author: shaofengshi <[email protected]>
AuthorDate: Wed Aug 29 21:25:24 2018 +0800

    KYLIN-3482 Unclosed SetAndUnsetThreadLocalConfig in Spark engine
---
 .../kylin/engine/spark/SparkCubingByLayer.java     |  74 +++++----
 .../kylin/engine/spark/SparkCubingMerge.java       |  22 +--
 .../kylin/engine/spark/SparkFactDistinct.java      | 159 +++++++++---------
 .../kylin/engine/spark/SparkMergingDictionary.java | 178 +++++++++++----------
 4 files changed, 225 insertions(+), 208 deletions(-)

diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 9f4ae34..09098d8 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -17,12 +17,6 @@
 */
 package org.apache.kylin.engine.spark;
 
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -68,9 +62,14 @@ import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.storage.StorageLevel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.Tuple2;
 
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
 /**
  * Spark application to build cube with the "by-layer" algorithm. Only support 
source data from Hive; Metadata in HBase.
  */
@@ -235,10 +234,12 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
                             synchronized (SparkCubingByLayer.class) {
                                 if (initialized == false) {
                                     KylinConfig kylinConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
-                                    
KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig);
-                                    CubeDesc desc = 
CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
-                                    codec = new 
BufferedMeasureCodec(desc.getMeasures());
-                                    initialized = true;
+                                    try 
(KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+                                            
.setAndUnsetThreadLocalConfig(kylinConfig)) {
+                                        CubeDesc desc = 
CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
+                                        codec = new 
BufferedMeasureCodec(desc.getMeasures());
+                                        initialized = true;
+                                    }
                                 }
                             }
                         }
@@ -274,18 +275,20 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
                 synchronized (SparkCubingByLayer.class) {
                     if (initialized == false) {
                         KylinConfig kConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
-                        KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
-                        CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
-                        CubeDesc cubeDesc = cubeInstance.getDescriptor();
-                        CubeSegment cubeSegment = 
cubeInstance.getSegmentById(segmentId);
-                        CubeJoinedFlatTableEnrich interDesc = new 
CubeJoinedFlatTableEnrich(
-                                
EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
-                        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-                        Cuboid baseCuboid = Cuboid.findForMandatory(cubeDesc, 
baseCuboidId);
-                        baseCuboidBuilder = new BaseCuboidBuilder(kConfig, 
cubeDesc, cubeSegment, interDesc,
-                                
AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid),
-                                
MeasureIngester.create(cubeDesc.getMeasures()), 
cubeSegment.buildDictionaryMap());
-                        initialized = true;
+                        try (KylinConfig.SetAndUnsetThreadLocalConfig 
autoUnset = KylinConfig
+                                .setAndUnsetThreadLocalConfig(kConfig)) {
+                            CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
+                            CubeDesc cubeDesc = cubeInstance.getDescriptor();
+                            CubeSegment cubeSegment = 
cubeInstance.getSegmentById(segmentId);
+                            CubeJoinedFlatTableEnrich interDesc = new 
CubeJoinedFlatTableEnrich(
+                                    
EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
+                            long baseCuboidId = 
Cuboid.getBaseCuboidId(cubeDesc);
+                            Cuboid baseCuboid = 
Cuboid.findForMandatory(cubeDesc, baseCuboidId);
+                            baseCuboidBuilder = new BaseCuboidBuilder(kConfig, 
cubeDesc, cubeSegment, interDesc,
+                                    
AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid),
+                                    
MeasureIngester.create(cubeDesc.getMeasures()), 
cubeSegment.buildDictionaryMap());
+                            initialized = true;
+                        }
                     }
                 }
             }
@@ -313,11 +316,13 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
 
         public void init() {
             KylinConfig kConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
-            KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
-            CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
-            cubeDesc = cubeInstance.getDescriptor();
-            aggregators = new MeasureAggregators(cubeDesc.getMeasures());
-            measureNum = cubeDesc.getMeasures().size();
+            try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = 
KylinConfig
+                    .setAndUnsetThreadLocalConfig(kConfig)) {
+                CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
+                cubeDesc = cubeInstance.getDescriptor();
+                aggregators = new MeasureAggregators(cubeDesc.getMeasures());
+                measureNum = cubeDesc.getMeasures().size();
+            }
         }
 
         @Override
@@ -383,12 +388,13 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
 
         public void init() {
             KylinConfig kConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
-            KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
-            CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
-            this.cubeSegment = cubeInstance.getSegmentById(segmentId);
-            this.cubeDesc = cubeInstance.getDescriptor();
-            this.ndCuboidBuilder = new NDCuboidBuilder(cubeSegment, new 
RowKeyEncoderProvider(cubeSegment));
-            this.rowKeySplitter = new RowKeySplitter(cubeSegment);
+            try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = 
KylinConfig.setAndUnsetThreadLocalConfig(kConfig)) {
+                CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
+                this.cubeSegment = cubeInstance.getSegmentById(segmentId);
+                this.cubeDesc = cubeInstance.getDescriptor();
+                this.ndCuboidBuilder = new NDCuboidBuilder(cubeSegment, new 
RowKeyEncoderProvider(cubeSegment));
+                this.rowKeySplitter = new RowKeySplitter(cubeSegment);
+            }
         }
 
         @Override
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
index 74a2313..991c31e 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
@@ -17,10 +17,7 @@
 */
 package org.apache.kylin.engine.spark;
 
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.List;
-
+import com.google.common.collect.Lists;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -56,11 +53,12 @@ import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
 import scala.Tuple2;
 
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
 /**
  */
 public class SparkCubingMerge extends AbstractApplication implements 
Serializable {
@@ -158,10 +156,12 @@ public class SparkCubingMerge extends AbstractApplication 
implements Serializabl
                             synchronized (SparkCubingMerge.class) {
                                 if (initialized == false) {
                                     KylinConfig kylinConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
-                                    
KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig);
-                                    CubeDesc desc = 
CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
-                                    codec = new 
BufferedMeasureCodec(desc.getMeasures());
-                                    initialized = true;
+                                    try 
(KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+                                            
.setAndUnsetThreadLocalConfig(kylinConfig)) {
+                                        CubeDesc desc = 
CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
+                                        codec = new 
BufferedMeasureCodec(desc.getMeasures());
+                                        initialized = true;
+                                    }
                                 }
                             }
                         }
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
index 6188a56..77ebd69 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
@@ -18,17 +18,13 @@
 
 package org.apache.kylin.engine.spark;
 
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -89,18 +85,20 @@ import 
org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.util.LongAccumulator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-
 import scala.Tuple2;
 import scala.Tuple3;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 public class SparkFactDistinct extends AbstractApplication implements 
Serializable {
 
     protected static final Logger logger = 
LoggerFactory.getLogger(SparkFactDistinct.class);
@@ -250,32 +248,37 @@ public class SparkFactDistinct extends 
AbstractApplication implements Serializab
 
         private void init() {
             KylinConfig kConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
-            KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
-            CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
-            CubeDesc cubeDesc = cubeInstance.getDescriptor();
-            CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
-            CubeJoinedFlatTableEnrich intermediateTableDesc = new 
CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), 
cubeDesc);
+            try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = 
KylinConfig
+                    .setAndUnsetThreadLocalConfig(kConfig)) {
+                CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
+                CubeDesc cubeDesc = cubeInstance.getDescriptor();
+                CubeSegment cubeSegment = 
cubeInstance.getSegmentById(segmentId);
+                CubeJoinedFlatTableEnrich intermediateTableDesc = new 
CubeJoinedFlatTableEnrich(
+                        EngineFactory.getJoinedFlatTableDesc(cubeSegment), 
cubeDesc);
 
-            reducerMapping = new 
FactDistinctColumnsReducerMapping(cubeInstance);
-            tmpbuf = ByteBuffer.allocate(4096);
+                reducerMapping = new 
FactDistinctColumnsReducerMapping(cubeInstance);
+                tmpbuf = ByteBuffer.allocate(4096);
 
-            int[] rokeyColumnIndexes = 
intermediateTableDesc.getRowKeyColumnIndexes();
+                int[] rokeyColumnIndexes = 
intermediateTableDesc.getRowKeyColumnIndexes();
 
-            Long[] cuboidIds = getCuboidIds(cubeSegment);
+                Long[] cuboidIds = getCuboidIds(cubeSegment);
 
-            Integer[][] cuboidsBitSet = CuboidUtil.getCuboidBitSet(cuboidIds, 
rokeyColumnIndexes.length);
+                Integer[][] cuboidsBitSet = 
CuboidUtil.getCuboidBitSet(cuboidIds, rokeyColumnIndexes.length);
 
-            boolean isNewAlgorithm = isUsePutRowKeyToHllNewAlgorithm(cubeDesc);
+                boolean isNewAlgorithm = 
isUsePutRowKeyToHllNewAlgorithm(cubeDesc);
 
-            HLLCounter[] cuboidsHLL = getInitCuboidsHLL(cuboidIds.length, 
cubeDesc.getConfig().getCubeStatsHLLPrecision());
+                HLLCounter[] cuboidsHLL = getInitCuboidsHLL(cuboidIds.length,
+                        cubeDesc.getConfig().getCubeStatsHLLPrecision());
 
-            cuboidStatCalculator = new 
CuboidStatCalculator(rokeyColumnIndexes, cuboidIds, cuboidsBitSet, 
isNewAlgorithm, cuboidsHLL);
-            allCols = reducerMapping.getAllDimDictCols();
+                cuboidStatCalculator = new 
CuboidStatCalculator(rokeyColumnIndexes, cuboidIds, cuboidsBitSet,
+                        isNewAlgorithm, cuboidsHLL);
+                allCols = reducerMapping.getAllDimDictCols();
 
-            initDictColDeduper(cubeDesc);
-            initColumnIndex(intermediateTableDesc);
+                initDictColDeduper(cubeDesc);
+                initColumnIndex(intermediateTableDesc);
 
-            initialized = true;
+                initialized = true;
+            }
         }
 
         @Override
@@ -572,11 +575,12 @@ public class SparkFactDistinct extends 
AbstractApplication implements Serializab
 
         private void init() {
             KylinConfig kConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
-            KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
-            CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
-            reducerMapping = new 
FactDistinctColumnsReducerMapping(cubeInstance);
-
-            initialized = true;
+            try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = 
KylinConfig
+                    .setAndUnsetThreadLocalConfig(kConfig)) {
+                CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
+                reducerMapping = new 
FactDistinctColumnsReducerMapping(cubeInstance);
+                initialized = true;
+            }
         }
 
         @Override
@@ -641,45 +645,46 @@ public class SparkFactDistinct extends 
AbstractApplication implements Serializab
         private void init() throws IOException {
             taskId = TaskContext.getPartitionId();
             KylinConfig kConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
-            KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
-            CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
-            cubeDesc = cubeInstance.getDescriptor();
-            cubeConfig = cubeInstance.getConfig();
-            reducerMapping = new 
FactDistinctColumnsReducerMapping(cubeInstance);
-
-            result = Lists.newArrayList();
-
-            if (reducerMapping.isCuboidRowCounterReducer(taskId)) {
-                // hll
-                isStatistics = true;
-                baseCuboidId = 
cubeInstance.getCuboidScheduler().getBaseCuboidId();
-                baseCuboidRowCountInMappers = Lists.newArrayList();
-                cuboidHLLMap = Maps.newHashMap();
-
-                logger.info("Partition " + taskId + " handling stats");
-            } else {
-                // normal col
-                col = reducerMapping.getColForReducer(taskId);
-                Preconditions.checkNotNull(col);
-
-                // local build dict
-                buildDictInReducer = kConfig.isBuildDictInReducerEnabled();
-                if (cubeDesc.getDictionaryBuilderClass(col) != null) { // only 
works with default dictionary builder
-                    buildDictInReducer = false;
-                }
+            try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = 
KylinConfig.setAndUnsetThreadLocalConfig(kConfig)) {
+                CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
+                cubeDesc = cubeInstance.getDescriptor();
+                cubeConfig = cubeInstance.getConfig();
+                reducerMapping = new 
FactDistinctColumnsReducerMapping(cubeInstance);
+
+                result = Lists.newArrayList();
+
+                if (reducerMapping.isCuboidRowCounterReducer(taskId)) {
+                    // hll
+                    isStatistics = true;
+                    baseCuboidId = 
cubeInstance.getCuboidScheduler().getBaseCuboidId();
+                    baseCuboidRowCountInMappers = Lists.newArrayList();
+                    cuboidHLLMap = Maps.newHashMap();
+
+                    logger.info("Partition " + taskId + " handling stats");
+                } else {
+                    // normal col
+                    col = reducerMapping.getColForReducer(taskId);
+                    Preconditions.checkNotNull(col);
+
+                    // local build dict
+                    buildDictInReducer = kConfig.isBuildDictInReducerEnabled();
+                    if (cubeDesc.getDictionaryBuilderClass(col) != null) { // 
only works with default dictionary builder
+                        buildDictInReducer = false;
+                    }
 
-                if (reducerMapping.getReducerNumForDimCol(col) > 1) {
-                    buildDictInReducer = false; // only works if this is the 
only reducer of a dictionary column
-                }
+                    if (reducerMapping.getReducerNumForDimCol(col) > 1) {
+                        buildDictInReducer = false; // only works if this is 
the only reducer of a dictionary column
+                    }
 
-                if (buildDictInReducer) {
-                    builder = 
DictionaryGenerator.newDictionaryBuilder(col.getType());
-                    builder.init(null, 0, null);
+                    if (buildDictInReducer) {
+                        builder = 
DictionaryGenerator.newDictionaryBuilder(col.getType());
+                        builder.init(null, 0, null);
+                    }
+                    logger.info("Partition " + taskId + " handling column " + 
col + ", buildDictInReducer=" + buildDictInReducer);
                 }
-                logger.info("Partition " + taskId + " handling column " + col 
+ ", buildDictInReducer=" + buildDictInReducer);
-            }
 
-            initialized = true;
+                initialized = true;
+            }
         }
 
         private void logAFewRows(String value) {
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
index deb7968..4d4346f 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
@@ -18,13 +18,8 @@
 
 package org.apache.kylin.engine.spark;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -66,12 +61,15 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.PairFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
 import scala.Tuple2;
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
 /**
     merge dictionary
  */
@@ -184,10 +182,12 @@ public class SparkMergingDictionary extends 
AbstractApplication implements Seria
 
         private void init() {
             kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, 
metaUrl);
-            KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig);
-            CubeInstance cubeInstance = 
CubeManager.getInstance(kylinConfig).getCube(cubeName);
-            dictMgr = DictionaryManager.getInstance(kylinConfig);
-            mergingSegments = getMergingSegments(cubeInstance, segmentIds);
+            try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = 
KylinConfig
+                    .setAndUnsetThreadLocalConfig(kylinConfig)) {
+                CubeInstance cubeInstance = 
CubeManager.getInstance(kylinConfig).getCube(cubeName);
+                dictMgr = DictionaryManager.getInstance(kylinConfig);
+                mergingSegments = getMergingSegments(cubeInstance, segmentIds);
+            }
         }
 
         @Override
@@ -205,92 +205,98 @@ public class SparkMergingDictionary extends 
AbstractApplication implements Seria
                 // merge dictionary
                 TblColRef col = tblColRefs[index];
                 List<DictionaryInfo> dictInfos = Lists.newArrayList();
-                for (CubeSegment segment : mergingSegments) {
-                    if (segment.getDictResPath(col) != null) {
-                        DictionaryInfo dictInfo = 
dictMgr.getDictionaryInfo(segment.getDictResPath(col));
-                        if (dictInfo != null && !dictInfos.contains(dictInfo)) 
{
-                            dictInfos.add(dictInfo);
+                try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = 
KylinConfig
+                        .setAndUnsetThreadLocalConfig(kylinConfig)) {
+                    for (CubeSegment segment : mergingSegments) {
+                        if (segment.getDictResPath(col) != null) {
+                            DictionaryInfo dictInfo = 
dictMgr.getDictionaryInfo(segment.getDictResPath(col));
+                            if (dictInfo != null && 
!dictInfos.contains(dictInfo)) {
+                                dictInfos.add(dictInfo);
+                            }
                         }
                     }
-                }
 
-                DictionaryInfo mergedDictInfo = 
dictMgr.mergeDictionary(dictInfos);
-                String tblCol = col.getTableAlias() + ":" + col.getName();
-                String dictInfoPath = mergedDictInfo == null ? "" : 
mergedDictInfo.getResourcePath();
+                    DictionaryInfo mergedDictInfo = 
dictMgr.mergeDictionary(dictInfos);
+                    String tblCol = col.getTableAlias() + ":" + col.getName();
+                    String dictInfoPath = mergedDictInfo == null ? "" : 
mergedDictInfo.getResourcePath();
 
-                return new Tuple2<>(new Text(tblCol), new Text(dictInfoPath));
+                    return new Tuple2<>(new Text(tblCol), new 
Text(dictInfoPath));
+                }
             } else {
                 // merge statistics
-                CubeInstance cubeInstance = 
CubeManager.getInstance(kylinConfig).getCube(cubeName);
-                CubeSegment newSegment = 
cubeInstance.getSegmentById(segmentId);
-                ResourceStore rs = ResourceStore.getStore(kylinConfig);
-
-                Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
-                Configuration conf = null;
-                int averageSamplingPercentage = 0;
-
-                for (CubeSegment cubeSegment : mergingSegments) {
-                    String filePath = cubeSegment.getStatisticsResourcePath();
-                    InputStream is = rs.getResource(filePath).inputStream;
-                    File tempFile;
-                    FileOutputStream tempFileStream = null;
-
-                    try {
-                        tempFile = File.createTempFile(segmentId, ".seq");
-                        tempFileStream = new FileOutputStream(tempFile);
-                        org.apache.commons.io.IOUtils.copy(is, tempFileStream);
-                    } finally {
-                        IOUtils.closeStream(is);
-                        IOUtils.closeStream(tempFileStream);
-                    }
-
-                    FileSystem fs = HadoopUtil.getFileSystem("file:///" + 
tempFile.getAbsolutePath());
-                    SequenceFile.Reader reader = null;
+                try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = 
KylinConfig
+                        .setAndUnsetThreadLocalConfig(kylinConfig)) {
+                    CubeInstance cubeInstance = 
CubeManager.getInstance(kylinConfig).getCube(cubeName);
+                    CubeSegment newSegment = 
cubeInstance.getSegmentById(segmentId);
+                    ResourceStore rs = ResourceStore.getStore(kylinConfig);
+
+                    Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
+                    Configuration conf = null;
+                    int averageSamplingPercentage = 0;
+
+                    for (CubeSegment cubeSegment : mergingSegments) {
+                        String filePath = 
cubeSegment.getStatisticsResourcePath();
+                        InputStream is = rs.getResource(filePath).inputStream;
+                        File tempFile;
+                        FileOutputStream tempFileStream = null;
+
+                        try {
+                            tempFile = File.createTempFile(segmentId, ".seq");
+                            tempFileStream = new FileOutputStream(tempFile);
+                            org.apache.commons.io.IOUtils.copy(is, 
tempFileStream);
+                        } finally {
+                            IOUtils.closeStream(is);
+                            IOUtils.closeStream(tempFileStream);
+                        }
 
-                    try {
-                        conf = HadoopUtil.getCurrentConfiguration();
-                        //noinspection deprecation
-                        reader = new SequenceFile.Reader(fs, new 
Path(tempFile.getAbsolutePath()), conf);
-                        LongWritable key = (LongWritable) 
ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-                        BytesWritable value = (BytesWritable) 
ReflectionUtils.newInstance(reader.getValueClass(), conf);
-
-                        while (reader.next(key, value)) {
-                            if (key.get() == 0L) {
-                                // sampling percentage;
-                                averageSamplingPercentage += 
Bytes.toInt(value.getBytes());
-                            } else if (key.get() > 0) {
-                                HLLCounter hll = new 
HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
-                                ByteArray byteArray = new 
ByteArray(value.getBytes());
-                                hll.readRegisters(byteArray.asBuffer());
-
-                                if (cuboidHLLMap.get(key.get()) != null) {
-                                    cuboidHLLMap.get(key.get()).merge(hll);
-                                } else {
-                                    cuboidHLLMap.put(key.get(), hll);
+                        FileSystem fs = HadoopUtil.getFileSystem("file:///" + 
tempFile.getAbsolutePath());
+                        SequenceFile.Reader reader = null;
+
+                        try {
+                            conf = HadoopUtil.getCurrentConfiguration();
+                            //noinspection deprecation
+                            reader = new SequenceFile.Reader(fs, new 
Path(tempFile.getAbsolutePath()), conf);
+                            LongWritable key = (LongWritable) 
ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+                            BytesWritable value = (BytesWritable) 
ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+                            while (reader.next(key, value)) {
+                                if (key.get() == 0L) {
+                                    // sampling percentage;
+                                    averageSamplingPercentage += 
Bytes.toInt(value.getBytes());
+                                } else if (key.get() > 0) {
+                                    HLLCounter hll = new 
HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
+                                    ByteArray byteArray = new 
ByteArray(value.getBytes());
+                                    hll.readRegisters(byteArray.asBuffer());
+
+                                    if (cuboidHLLMap.get(key.get()) != null) {
+                                        cuboidHLLMap.get(key.get()).merge(hll);
+                                    } else {
+                                        cuboidHLLMap.put(key.get(), hll);
+                                    }
                                 }
                             }
+                        } finally {
+                            IOUtils.closeStream(reader);
                         }
-                    } finally {
-                        IOUtils.closeStream(reader);
                     }
-                }
 
-                averageSamplingPercentage = averageSamplingPercentage / 
mergingSegments.size();
-                CubeStatsWriter.writeCuboidStatistics(conf, new 
Path(statOutputPath), cuboidHLLMap, averageSamplingPercentage);
-                Path statisticsFilePath = new Path(statOutputPath, 
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+                    averageSamplingPercentage = averageSamplingPercentage / 
mergingSegments.size();
+                    CubeStatsWriter.writeCuboidStatistics(conf, new 
Path(statOutputPath), cuboidHLLMap, averageSamplingPercentage);
+                    Path statisticsFilePath = new Path(statOutputPath, 
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
 
-                FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, 
conf);
-                FSDataInputStream fis = fs.open(statisticsFilePath);
+                    FileSystem fs = 
HadoopUtil.getFileSystem(statisticsFilePath, conf);
+                    FSDataInputStream fis = fs.open(statisticsFilePath);
 
-                try {
-                    // put the statistics to metadata store
-                    String statisticsFileName = 
newSegment.getStatisticsResourcePath();
-                    rs.putResource(statisticsFileName, fis, 
System.currentTimeMillis());
-                } finally {
-                    IOUtils.closeStream(fis);
-                }
+                    try {
+                        // put the statistics to metadata store
+                        String statisticsFileName = 
newSegment.getStatisticsResourcePath();
+                        rs.putResource(statisticsFileName, fis, 
System.currentTimeMillis());
+                    } finally {
+                        IOUtils.closeStream(fis);
+                    }
 
-                return new Tuple2<>(new Text(""), new Text(""));
+                    return new Tuple2<>(new Text(""), new Text(""));
+                }
             }
 
         }

Reply via email to