This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new c897277 KYLIN-3482 Unclosed SetAndUnsetThreadLocalConfig in Spark
engine
c897277 is described below
commit c8972772af60d0a6736acb063ff6c4b775790b4a
Author: shaofengshi <[email protected]>
AuthorDate: Wed Aug 29 21:25:24 2018 +0800
KYLIN-3482 Unclosed SetAndUnsetThreadLocalConfig in Spark engine
---
.../kylin/engine/spark/SparkCubingByLayer.java | 74 +++++----
.../kylin/engine/spark/SparkCubingMerge.java | 22 +--
.../kylin/engine/spark/SparkFactDistinct.java | 159 +++++++++---------
.../kylin/engine/spark/SparkMergingDictionary.java | 178 +++++++++++----------
4 files changed, 225 insertions(+), 208 deletions(-)
diff --git
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 9f4ae34..09098d8 100644
---
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -17,12 +17,6 @@
*/
package org.apache.kylin.engine.spark;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
@@ -68,9 +62,14 @@ import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import scala.Tuple2;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
/**
* Spark application to build cube with the "by-layer" algorithm. Only support
source data from Hive; Metadata in HBase.
*/
@@ -235,10 +234,12 @@ public class SparkCubingByLayer extends
AbstractApplication implements Serializa
synchronized (SparkCubingByLayer.class) {
if (initialized == false) {
KylinConfig kylinConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
-
KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig);
- CubeDesc desc =
CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
- codec = new
BufferedMeasureCodec(desc.getMeasures());
- initialized = true;
+ try
(KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+
.setAndUnsetThreadLocalConfig(kylinConfig)) {
+ CubeDesc desc =
CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
+ codec = new
BufferedMeasureCodec(desc.getMeasures());
+ initialized = true;
+ }
}
}
}
@@ -274,18 +275,20 @@ public class SparkCubingByLayer extends
AbstractApplication implements Serializa
synchronized (SparkCubingByLayer.class) {
if (initialized == false) {
KylinConfig kConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
- KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
- CubeInstance cubeInstance =
CubeManager.getInstance(kConfig).getCube(cubeName);
- CubeDesc cubeDesc = cubeInstance.getDescriptor();
- CubeSegment cubeSegment =
cubeInstance.getSegmentById(segmentId);
- CubeJoinedFlatTableEnrich interDesc = new
CubeJoinedFlatTableEnrich(
-
EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
- long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- Cuboid baseCuboid = Cuboid.findForMandatory(cubeDesc,
baseCuboidId);
- baseCuboidBuilder = new BaseCuboidBuilder(kConfig,
cubeDesc, cubeSegment, interDesc,
-
AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid),
-
MeasureIngester.create(cubeDesc.getMeasures()),
cubeSegment.buildDictionaryMap());
- initialized = true;
+ try (KylinConfig.SetAndUnsetThreadLocalConfig
autoUnset = KylinConfig
+ .setAndUnsetThreadLocalConfig(kConfig)) {
+ CubeInstance cubeInstance =
CubeManager.getInstance(kConfig).getCube(cubeName);
+ CubeDesc cubeDesc = cubeInstance.getDescriptor();
+ CubeSegment cubeSegment =
cubeInstance.getSegmentById(segmentId);
+ CubeJoinedFlatTableEnrich interDesc = new
CubeJoinedFlatTableEnrich(
+
EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
+ long baseCuboidId =
Cuboid.getBaseCuboidId(cubeDesc);
+ Cuboid baseCuboid =
Cuboid.findForMandatory(cubeDesc, baseCuboidId);
+ baseCuboidBuilder = new BaseCuboidBuilder(kConfig,
cubeDesc, cubeSegment, interDesc,
+
AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid),
+
MeasureIngester.create(cubeDesc.getMeasures()),
cubeSegment.buildDictionaryMap());
+ initialized = true;
+ }
}
}
}
@@ -313,11 +316,13 @@ public class SparkCubingByLayer extends
AbstractApplication implements Serializa
public void init() {
KylinConfig kConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
- KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
- CubeInstance cubeInstance =
CubeManager.getInstance(kConfig).getCube(cubeName);
- cubeDesc = cubeInstance.getDescriptor();
- aggregators = new MeasureAggregators(cubeDesc.getMeasures());
- measureNum = cubeDesc.getMeasures().size();
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset =
KylinConfig
+ .setAndUnsetThreadLocalConfig(kConfig)) {
+ CubeInstance cubeInstance =
CubeManager.getInstance(kConfig).getCube(cubeName);
+ cubeDesc = cubeInstance.getDescriptor();
+ aggregators = new MeasureAggregators(cubeDesc.getMeasures());
+ measureNum = cubeDesc.getMeasures().size();
+ }
}
@Override
@@ -383,12 +388,13 @@ public class SparkCubingByLayer extends
AbstractApplication implements Serializa
public void init() {
KylinConfig kConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
- KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
- CubeInstance cubeInstance =
CubeManager.getInstance(kConfig).getCube(cubeName);
- this.cubeSegment = cubeInstance.getSegmentById(segmentId);
- this.cubeDesc = cubeInstance.getDescriptor();
- this.ndCuboidBuilder = new NDCuboidBuilder(cubeSegment, new
RowKeyEncoderProvider(cubeSegment));
- this.rowKeySplitter = new RowKeySplitter(cubeSegment);
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset =
KylinConfig.setAndUnsetThreadLocalConfig(kConfig)) {
+ CubeInstance cubeInstance =
CubeManager.getInstance(kConfig).getCube(cubeName);
+ this.cubeSegment = cubeInstance.getSegmentById(segmentId);
+ this.cubeDesc = cubeInstance.getDescriptor();
+ this.ndCuboidBuilder = new NDCuboidBuilder(cubeSegment, new
RowKeyEncoderProvider(cubeSegment));
+ this.rowKeySplitter = new RowKeySplitter(cubeSegment);
+ }
}
@Override
diff --git
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
index 74a2313..991c31e 100644
---
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
+++
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
@@ -17,10 +17,7 @@
*/
package org.apache.kylin.engine.spark;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.List;
-
+import com.google.common.collect.Lists;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
@@ -56,11 +53,12 @@ import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
import scala.Tuple2;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
/**
*/
public class SparkCubingMerge extends AbstractApplication implements
Serializable {
@@ -158,10 +156,12 @@ public class SparkCubingMerge extends AbstractApplication
implements Serializabl
synchronized (SparkCubingMerge.class) {
if (initialized == false) {
KylinConfig kylinConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
-
KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig);
- CubeDesc desc =
CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
- codec = new
BufferedMeasureCodec(desc.getMeasures());
- initialized = true;
+ try
(KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+
.setAndUnsetThreadLocalConfig(kylinConfig)) {
+ CubeDesc desc =
CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
+ codec = new
BufferedMeasureCodec(desc.getMeasures());
+ initialized = true;
+ }
}
}
}
diff --git
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
index 6188a56..77ebd69 100644
---
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
+++
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
@@ -18,17 +18,13 @@
package org.apache.kylin.engine.spark;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
@@ -89,18 +85,20 @@ import
org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-
import scala.Tuple2;
import scala.Tuple3;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
public class SparkFactDistinct extends AbstractApplication implements
Serializable {
protected static final Logger logger =
LoggerFactory.getLogger(SparkFactDistinct.class);
@@ -250,32 +248,37 @@ public class SparkFactDistinct extends
AbstractApplication implements Serializab
private void init() {
KylinConfig kConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
- KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
- CubeInstance cubeInstance =
CubeManager.getInstance(kConfig).getCube(cubeName);
- CubeDesc cubeDesc = cubeInstance.getDescriptor();
- CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
- CubeJoinedFlatTableEnrich intermediateTableDesc = new
CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment),
cubeDesc);
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset =
KylinConfig
+ .setAndUnsetThreadLocalConfig(kConfig)) {
+ CubeInstance cubeInstance =
CubeManager.getInstance(kConfig).getCube(cubeName);
+ CubeDesc cubeDesc = cubeInstance.getDescriptor();
+ CubeSegment cubeSegment =
cubeInstance.getSegmentById(segmentId);
+ CubeJoinedFlatTableEnrich intermediateTableDesc = new
CubeJoinedFlatTableEnrich(
+ EngineFactory.getJoinedFlatTableDesc(cubeSegment),
cubeDesc);
- reducerMapping = new
FactDistinctColumnsReducerMapping(cubeInstance);
- tmpbuf = ByteBuffer.allocate(4096);
+ reducerMapping = new
FactDistinctColumnsReducerMapping(cubeInstance);
+ tmpbuf = ByteBuffer.allocate(4096);
- int[] rokeyColumnIndexes =
intermediateTableDesc.getRowKeyColumnIndexes();
+ int[] rokeyColumnIndexes =
intermediateTableDesc.getRowKeyColumnIndexes();
- Long[] cuboidIds = getCuboidIds(cubeSegment);
+ Long[] cuboidIds = getCuboidIds(cubeSegment);
- Integer[][] cuboidsBitSet = CuboidUtil.getCuboidBitSet(cuboidIds,
rokeyColumnIndexes.length);
+ Integer[][] cuboidsBitSet =
CuboidUtil.getCuboidBitSet(cuboidIds, rokeyColumnIndexes.length);
- boolean isNewAlgorithm = isUsePutRowKeyToHllNewAlgorithm(cubeDesc);
+ boolean isNewAlgorithm =
isUsePutRowKeyToHllNewAlgorithm(cubeDesc);
- HLLCounter[] cuboidsHLL = getInitCuboidsHLL(cuboidIds.length,
cubeDesc.getConfig().getCubeStatsHLLPrecision());
+ HLLCounter[] cuboidsHLL = getInitCuboidsHLL(cuboidIds.length,
+ cubeDesc.getConfig().getCubeStatsHLLPrecision());
- cuboidStatCalculator = new
CuboidStatCalculator(rokeyColumnIndexes, cuboidIds, cuboidsBitSet,
isNewAlgorithm, cuboidsHLL);
- allCols = reducerMapping.getAllDimDictCols();
+ cuboidStatCalculator = new
CuboidStatCalculator(rokeyColumnIndexes, cuboidIds, cuboidsBitSet,
+ isNewAlgorithm, cuboidsHLL);
+ allCols = reducerMapping.getAllDimDictCols();
- initDictColDeduper(cubeDesc);
- initColumnIndex(intermediateTableDesc);
+ initDictColDeduper(cubeDesc);
+ initColumnIndex(intermediateTableDesc);
- initialized = true;
+ initialized = true;
+ }
}
@Override
@@ -572,11 +575,12 @@ public class SparkFactDistinct extends
AbstractApplication implements Serializab
private void init() {
KylinConfig kConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
- KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
- CubeInstance cubeInstance =
CubeManager.getInstance(kConfig).getCube(cubeName);
- reducerMapping = new
FactDistinctColumnsReducerMapping(cubeInstance);
-
- initialized = true;
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset =
KylinConfig
+ .setAndUnsetThreadLocalConfig(kConfig)) {
+ CubeInstance cubeInstance =
CubeManager.getInstance(kConfig).getCube(cubeName);
+ reducerMapping = new
FactDistinctColumnsReducerMapping(cubeInstance);
+ initialized = true;
+ }
}
@Override
@@ -641,45 +645,46 @@ public class SparkFactDistinct extends
AbstractApplication implements Serializab
private void init() throws IOException {
taskId = TaskContext.getPartitionId();
KylinConfig kConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
- KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
- CubeInstance cubeInstance =
CubeManager.getInstance(kConfig).getCube(cubeName);
- cubeDesc = cubeInstance.getDescriptor();
- cubeConfig = cubeInstance.getConfig();
- reducerMapping = new
FactDistinctColumnsReducerMapping(cubeInstance);
-
- result = Lists.newArrayList();
-
- if (reducerMapping.isCuboidRowCounterReducer(taskId)) {
- // hll
- isStatistics = true;
- baseCuboidId =
cubeInstance.getCuboidScheduler().getBaseCuboidId();
- baseCuboidRowCountInMappers = Lists.newArrayList();
- cuboidHLLMap = Maps.newHashMap();
-
- logger.info("Partition " + taskId + " handling stats");
- } else {
- // normal col
- col = reducerMapping.getColForReducer(taskId);
- Preconditions.checkNotNull(col);
-
- // local build dict
- buildDictInReducer = kConfig.isBuildDictInReducerEnabled();
- if (cubeDesc.getDictionaryBuilderClass(col) != null) { // only
works with default dictionary builder
- buildDictInReducer = false;
- }
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset =
KylinConfig.setAndUnsetThreadLocalConfig(kConfig)) {
+ CubeInstance cubeInstance =
CubeManager.getInstance(kConfig).getCube(cubeName);
+ cubeDesc = cubeInstance.getDescriptor();
+ cubeConfig = cubeInstance.getConfig();
+ reducerMapping = new
FactDistinctColumnsReducerMapping(cubeInstance);
+
+ result = Lists.newArrayList();
+
+ if (reducerMapping.isCuboidRowCounterReducer(taskId)) {
+ // hll
+ isStatistics = true;
+ baseCuboidId =
cubeInstance.getCuboidScheduler().getBaseCuboidId();
+ baseCuboidRowCountInMappers = Lists.newArrayList();
+ cuboidHLLMap = Maps.newHashMap();
+
+ logger.info("Partition " + taskId + " handling stats");
+ } else {
+ // normal col
+ col = reducerMapping.getColForReducer(taskId);
+ Preconditions.checkNotNull(col);
+
+ // local build dict
+ buildDictInReducer = kConfig.isBuildDictInReducerEnabled();
+ if (cubeDesc.getDictionaryBuilderClass(col) != null) { //
only works with default dictionary builder
+ buildDictInReducer = false;
+ }
- if (reducerMapping.getReducerNumForDimCol(col) > 1) {
- buildDictInReducer = false; // only works if this is the
only reducer of a dictionary column
- }
+ if (reducerMapping.getReducerNumForDimCol(col) > 1) {
+ buildDictInReducer = false; // only works if this is
the only reducer of a dictionary column
+ }
- if (buildDictInReducer) {
- builder =
DictionaryGenerator.newDictionaryBuilder(col.getType());
- builder.init(null, 0, null);
+ if (buildDictInReducer) {
+ builder =
DictionaryGenerator.newDictionaryBuilder(col.getType());
+ builder.init(null, 0, null);
+ }
+ logger.info("Partition " + taskId + " handling column " +
col + ", buildDictInReducer=" + buildDictInReducer);
}
- logger.info("Partition " + taskId + " handling column " + col
+ ", buildDictInReducer=" + buildDictInReducer);
- }
- initialized = true;
+ initialized = true;
+ }
}
private void logAFewRows(String value) {
diff --git
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
index deb7968..4d4346f 100644
---
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
+++
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
@@ -18,13 +18,8 @@
package org.apache.kylin.engine.spark;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
@@ -66,12 +61,15 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
import scala.Tuple2;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
/**
merge dictionary
*/
@@ -184,10 +182,12 @@ public class SparkMergingDictionary extends
AbstractApplication implements Seria
private void init() {
kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf,
metaUrl);
- KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig);
- CubeInstance cubeInstance =
CubeManager.getInstance(kylinConfig).getCube(cubeName);
- dictMgr = DictionaryManager.getInstance(kylinConfig);
- mergingSegments = getMergingSegments(cubeInstance, segmentIds);
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset =
KylinConfig
+ .setAndUnsetThreadLocalConfig(kylinConfig)) {
+ CubeInstance cubeInstance =
CubeManager.getInstance(kylinConfig).getCube(cubeName);
+ dictMgr = DictionaryManager.getInstance(kylinConfig);
+ mergingSegments = getMergingSegments(cubeInstance, segmentIds);
+ }
}
@Override
@@ -205,92 +205,98 @@ public class SparkMergingDictionary extends
AbstractApplication implements Seria
// merge dictionary
TblColRef col = tblColRefs[index];
List<DictionaryInfo> dictInfos = Lists.newArrayList();
- for (CubeSegment segment : mergingSegments) {
- if (segment.getDictResPath(col) != null) {
- DictionaryInfo dictInfo =
dictMgr.getDictionaryInfo(segment.getDictResPath(col));
- if (dictInfo != null && !dictInfos.contains(dictInfo))
{
- dictInfos.add(dictInfo);
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset =
KylinConfig
+ .setAndUnsetThreadLocalConfig(kylinConfig)) {
+ for (CubeSegment segment : mergingSegments) {
+ if (segment.getDictResPath(col) != null) {
+ DictionaryInfo dictInfo =
dictMgr.getDictionaryInfo(segment.getDictResPath(col));
+ if (dictInfo != null &&
!dictInfos.contains(dictInfo)) {
+ dictInfos.add(dictInfo);
+ }
}
}
- }
- DictionaryInfo mergedDictInfo =
dictMgr.mergeDictionary(dictInfos);
- String tblCol = col.getTableAlias() + ":" + col.getName();
- String dictInfoPath = mergedDictInfo == null ? "" :
mergedDictInfo.getResourcePath();
+ DictionaryInfo mergedDictInfo =
dictMgr.mergeDictionary(dictInfos);
+ String tblCol = col.getTableAlias() + ":" + col.getName();
+ String dictInfoPath = mergedDictInfo == null ? "" :
mergedDictInfo.getResourcePath();
- return new Tuple2<>(new Text(tblCol), new Text(dictInfoPath));
+ return new Tuple2<>(new Text(tblCol), new
Text(dictInfoPath));
+ }
} else {
// merge statistics
- CubeInstance cubeInstance =
CubeManager.getInstance(kylinConfig).getCube(cubeName);
- CubeSegment newSegment =
cubeInstance.getSegmentById(segmentId);
- ResourceStore rs = ResourceStore.getStore(kylinConfig);
-
- Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
- Configuration conf = null;
- int averageSamplingPercentage = 0;
-
- for (CubeSegment cubeSegment : mergingSegments) {
- String filePath = cubeSegment.getStatisticsResourcePath();
- InputStream is = rs.getResource(filePath).inputStream;
- File tempFile;
- FileOutputStream tempFileStream = null;
-
- try {
- tempFile = File.createTempFile(segmentId, ".seq");
- tempFileStream = new FileOutputStream(tempFile);
- org.apache.commons.io.IOUtils.copy(is, tempFileStream);
- } finally {
- IOUtils.closeStream(is);
- IOUtils.closeStream(tempFileStream);
- }
-
- FileSystem fs = HadoopUtil.getFileSystem("file:///" +
tempFile.getAbsolutePath());
- SequenceFile.Reader reader = null;
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset =
KylinConfig
+ .setAndUnsetThreadLocalConfig(kylinConfig)) {
+ CubeInstance cubeInstance =
CubeManager.getInstance(kylinConfig).getCube(cubeName);
+ CubeSegment newSegment =
cubeInstance.getSegmentById(segmentId);
+ ResourceStore rs = ResourceStore.getStore(kylinConfig);
+
+ Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
+ Configuration conf = null;
+ int averageSamplingPercentage = 0;
+
+ for (CubeSegment cubeSegment : mergingSegments) {
+ String filePath =
cubeSegment.getStatisticsResourcePath();
+ InputStream is = rs.getResource(filePath).inputStream;
+ File tempFile;
+ FileOutputStream tempFileStream = null;
+
+ try {
+ tempFile = File.createTempFile(segmentId, ".seq");
+ tempFileStream = new FileOutputStream(tempFile);
+ org.apache.commons.io.IOUtils.copy(is,
tempFileStream);
+ } finally {
+ IOUtils.closeStream(is);
+ IOUtils.closeStream(tempFileStream);
+ }
- try {
- conf = HadoopUtil.getCurrentConfiguration();
- //noinspection deprecation
- reader = new SequenceFile.Reader(fs, new
Path(tempFile.getAbsolutePath()), conf);
- LongWritable key = (LongWritable)
ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- BytesWritable value = (BytesWritable)
ReflectionUtils.newInstance(reader.getValueClass(), conf);
-
- while (reader.next(key, value)) {
- if (key.get() == 0L) {
- // sampling percentage;
- averageSamplingPercentage +=
Bytes.toInt(value.getBytes());
- } else if (key.get() > 0) {
- HLLCounter hll = new
HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
- ByteArray byteArray = new
ByteArray(value.getBytes());
- hll.readRegisters(byteArray.asBuffer());
-
- if (cuboidHLLMap.get(key.get()) != null) {
- cuboidHLLMap.get(key.get()).merge(hll);
- } else {
- cuboidHLLMap.put(key.get(), hll);
+ FileSystem fs = HadoopUtil.getFileSystem("file:///" +
tempFile.getAbsolutePath());
+ SequenceFile.Reader reader = null;
+
+ try {
+ conf = HadoopUtil.getCurrentConfiguration();
+ //noinspection deprecation
+ reader = new SequenceFile.Reader(fs, new
Path(tempFile.getAbsolutePath()), conf);
+ LongWritable key = (LongWritable)
ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ BytesWritable value = (BytesWritable)
ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+ while (reader.next(key, value)) {
+ if (key.get() == 0L) {
+ // sampling percentage;
+ averageSamplingPercentage +=
Bytes.toInt(value.getBytes());
+ } else if (key.get() > 0) {
+ HLLCounter hll = new
HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
+ ByteArray byteArray = new
ByteArray(value.getBytes());
+ hll.readRegisters(byteArray.asBuffer());
+
+ if (cuboidHLLMap.get(key.get()) != null) {
+ cuboidHLLMap.get(key.get()).merge(hll);
+ } else {
+ cuboidHLLMap.put(key.get(), hll);
+ }
}
}
+ } finally {
+ IOUtils.closeStream(reader);
}
- } finally {
- IOUtils.closeStream(reader);
}
- }
- averageSamplingPercentage = averageSamplingPercentage /
mergingSegments.size();
- CubeStatsWriter.writeCuboidStatistics(conf, new
Path(statOutputPath), cuboidHLLMap, averageSamplingPercentage);
- Path statisticsFilePath = new Path(statOutputPath,
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+ averageSamplingPercentage = averageSamplingPercentage /
mergingSegments.size();
+ CubeStatsWriter.writeCuboidStatistics(conf, new
Path(statOutputPath), cuboidHLLMap, averageSamplingPercentage);
+ Path statisticsFilePath = new Path(statOutputPath,
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
- FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath,
conf);
- FSDataInputStream fis = fs.open(statisticsFilePath);
+ FileSystem fs =
HadoopUtil.getFileSystem(statisticsFilePath, conf);
+ FSDataInputStream fis = fs.open(statisticsFilePath);
- try {
- // put the statistics to metadata store
- String statisticsFileName =
newSegment.getStatisticsResourcePath();
- rs.putResource(statisticsFileName, fis,
System.currentTimeMillis());
- } finally {
- IOUtils.closeStream(fis);
- }
+ try {
+ // put the statistics to metadata store
+ String statisticsFileName =
newSegment.getStatisticsResourcePath();
+ rs.putResource(statisticsFileName, fis,
System.currentTimeMillis());
+ } finally {
+ IOUtils.closeStream(fis);
+ }
- return new Tuple2<>(new Text(""), new Text(""));
+ return new Tuple2<>(new Text(""), new Text(""));
+ }
}
}