Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2314 ab65bd56e -> 8cbd6a6c1


KYLIN-2314 Use col identity instead of col name in dictionary refs


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8cbd6a6c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8cbd6a6c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8cbd6a6c

Branch: refs/heads/KYLIN-2314
Commit: 8cbd6a6c14f9639145faa2b8ea728e5d4f510c46
Parents: ab65bd5
Author: Yang Li <[email protected]>
Authored: Sat Dec 24 13:14:37 2016 +0800
Committer: Yang Li <[email protected]>
Committed: Sat Dec 24 13:14:37 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeManager.java |  2 +-
 .../java/org/apache/kylin/cube/CubeSegment.java | 30 ++++----
 .../kylin/engine/mr/JobBuilderSupport.java      |  2 -
 .../engine/mr/steps/CreateDictionaryJob.java    |  5 +-
 .../engine/mr/steps/CubingExecutableUtil.java   |  8 --
 .../engine/mr/steps/FactDistinctColumnsJob.java |  3 +
 .../mr/steps/FactDistinctColumnsReducer.java    | 39 ++++++----
 .../mr/steps/FactDistinctHiveColumnsMapper.java | 81 ++++++++++----------
 .../mr/steps/UpdateCubeInfoAfterBuildStep.java  |  4 +-
 .../mr/steps/UpdateCubeInfoAfterMergeStep.java  |  1 -
 10 files changed, 87 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/8cbd6a6c/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index fe2030a..9670b89 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -240,7 +240,7 @@ public class CubeManager implements IRealizationProvider {
         if (dictInfo != null) {
             Dictionary<?> dict = dictInfo.getDictionaryObject();
             cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
-            cubeSeg.getRowkeyStats().add(new Object[] { col.getName(), 
dict.getSize(), dict.getSizeOfId() });
+            cubeSeg.getRowkeyStats().add(new Object[] { col.getIdentity(), 
dict.getSize(), dict.getSizeOfId() });
 
             CubeUpdate update = new CubeUpdate(cubeSeg.getCubeInstance());
             update.setToUpdateSegs(cubeSeg);

http://git-wip-us.apache.org/repos/asf/kylin/blob/8cbd6a6c/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index e155f86..36a6044 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -98,9 +98,6 @@ public class CubeSegment implements Comparable<CubeSegment>, 
IBuildable, ISegmen
     @JsonProperty("snapshots")
     private ConcurrentHashMap<String, String> snapshots; // table name ==> 
snapshot resource path
 
-    @JsonProperty("index_path")
-    private String indexPath;
-
     @JsonProperty("rowkey_stats")
     private List<Object[]> rowkeyStats = Lists.newArrayList();
 
@@ -296,15 +293,22 @@ public class CubeSegment implements 
Comparable<CubeSegment>, IBuildable, ISegmen
     }
 
     public String getDictResPath(TblColRef col) {
-        return getDictionaries().get(dictKey(col));
+        String r;
+        String dictKey = col.getIdentity();
+        r = getDictionaries().get(dictKey);
+        
+        // try Kylin v1.x dict key as well
+        if (r == null) {
+            String v1DictKey = col.getTable() + "/" + col.getName();
+            r = getDictionaries().get(v1DictKey);
+        }
+        
+        return r;
     }
 
     public void putDictResPath(TblColRef col, String dictResPath) {
-        getDictionaries().put(dictKey(col), dictResPath);
-    }
-
-    private String dictKey(TblColRef col) {
-        return col.getTable() + "/" + col.getName();
+        String dictKey = col.getIdentity();
+        getDictionaries().put(dictKey, dictResPath);
     }
 
     public void setStorageLocationIdentifier(String storageLocationIdentifier) 
{
@@ -523,14 +527,6 @@ public class CubeSegment implements 
Comparable<CubeSegment>, IBuildable, ISegmen
         return cubeInstance;
     }
 
-    public String getIndexPath() {
-        return indexPath;
-    }
-
-    public void setIndexPath(String indexPath) {
-        this.indexPath = indexPath;
-    }
-
     public Map<String, String> getAdditionalInfo() {
         return additionalInfo;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/8cbd6a6c/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 47eb9c3..5f5814b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -100,7 +100,6 @@ public class JobBuilderSupport {
         CubingExecutableUtil.setCubeName(seg.getRealization().getName(), 
result.getParams());
         CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
         CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
-        CubingExecutableUtil.setIndexPath(this.getSecondaryIndexPath(jobId), 
result.getParams());
 
 
         return result;
@@ -125,7 +124,6 @@ public class JobBuilderSupport {
         CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
         CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
         CubingExecutableUtil.setMergingSegmentIds(mergingSegmentIds, 
result.getParams());
-        CubingExecutableUtil.setIndexPath(this.getSecondaryIndexPath(jobId), 
result.getParams());
 
         return result;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/8cbd6a6c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
index 4985503..8b9697e 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@ -57,14 +57,13 @@ public class CreateDictionaryJob extends AbstractHadoopJob {
         DictionaryGeneratorCLI.processSegment(config, cubeName, segmentID, new 
DistinctColumnValuesProvider() {
             @Override
             public ReadableTable getDistinctValuesFor(TblColRef col) {
-                return new SortedColumnDFSFile(factColumnsInputPath + "/" + 
col.getName(), col.getType());
+                return new SortedColumnDFSFile(factColumnsInputPath + "/" + 
col.getIdentity(), col.getType());
             }
         }, new DictionaryProvider() {
 
             @Override
             public Dictionary<String> getDictionary(TblColRef col) throws 
IOException {
-                Path colDir = new Path(factColumnsInputPath, col.getName());
-                Path dictFile = new Path(colDir, col.getName() + 
FactDistinctColumnsReducer.DICT_FILE_POSTFIX);
+                Path dictFile = new Path(factColumnsInputPath, 
col.getIdentity() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX);
                 FileSystem fs = HadoopUtil.getFileSystem(dictFile.toString());
                 if (fs.exists(dictFile) == false)
                     return null;

http://git-wip-us.apache.org/repos/asf/kylin/blob/8cbd6a6c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
index b0d5a89..65c5869 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
@@ -135,12 +135,4 @@ public class CubingExecutableUtil {
         return params.get(MERGED_STATISTICS_PATH);
     }
 
-    public static void setIndexPath(String indexPath, Map<String, String> 
params) {
-        params.put(INDEX_PATH, indexPath);
-    }
-
-    public static String getIndexPath(Map<String, String> params) {
-        return params.get(INDEX_PATH);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/8cbd6a6c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index 2eb694e..9fc8922 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -145,6 +145,9 @@ public class FactDistinctColumnsJob extends 
AbstractHadoopJob {
         job.setPartitionerClass(FactDistinctColumnPartitioner.class);
         job.setNumReduceTasks(numberOfReducers);
 
+        // important, reducer writes HDFS directly at the moment
+        job.setReduceSpeculativeExecution(false);
+        
         FileOutputFormat.setOutputPath(job, output);
         job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, 
output.toString());
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/8cbd6a6c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 0223914..5d42797 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -59,7 +59,7 @@ import com.google.common.collect.Maps;
  */
 public class FactDistinctColumnsReducer extends 
KylinReducer<SelfDefineSortableKey, Text, NullWritable, Text> {
 
-    protected static final Logger logger = 
LoggerFactory.getLogger(FactDistinctColumnsReducer.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(FactDistinctColumnsReducer.class);
 
     private List<TblColRef> columnList;
     private String statisticsOutput = null;
@@ -76,6 +76,8 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<SelfDefineSortableK
     private int uhcReducerCount;
     private Map<Integer, Integer> reducerIdToColumnIndex = new HashMap<>();
     private int taskId;
+    private boolean isPartitionCol = false;
+    private int rowCount = 0;
 
     //local build dict
     private boolean isReducerLocalBuildDict;
@@ -84,7 +86,6 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<SelfDefineSortableK
     private long timeMinValue = Long.MAX_VALUE;
     public static final String DICT_FILE_POSTFIX = ".rldict";
     public static final String PARTITION_COL_INFO_FILE_POSTFIX = ".pci";
-    private boolean isPartitionCol = false;
 
     @Override
     protected void setup(Context context) throws IOException {
@@ -111,6 +112,7 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<SelfDefineSortableK
             baseCuboidRowCountInMappers = Lists.newArrayList();
             cuboidHLLMap = Maps.newHashMap();
             samplingPercentage = 
Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
+            logger.info("Reducer " + taskId + " handling stats");
         } else if (collectStatistics && (taskId == numberOfTasks - 2)) {
             // partition col
             isStatistics = false;
@@ -120,6 +122,7 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<SelfDefineSortableK
                 logger.info("Do not have partition col. This reducer will keep 
empty");
             }
             colValues = Lists.newLinkedList();
+            logger.info("Reducer " + taskId + " handling partition column " + 
col);
         } else {
             // normal col
             isStatistics = false;
@@ -135,6 +138,7 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<SelfDefineSortableK
                 builder = 
DictionaryGenerator.newDictionaryBuilder(col.getType());
                 builder.init(null, 0);
             }
+            logger.info("Reducer " + taskId + " handling column " + col + ", 
isReducerLocalBuildDict=" + isReducerLocalBuildDict);
         }
     }
 
@@ -178,6 +182,7 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<SelfDefineSortableK
         } else if (isPartitionCol) {
             // partition col
             String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 
1);
+            logAFewRows(value);
             long time = DateFormat.stringToMillis(value);
             timeMinValue = Math.min(timeMinValue, time);
             timeMaxValue = Math.max(timeMaxValue, time);
@@ -185,6 +190,7 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<SelfDefineSortableK
             // normal col
             if (isReducerLocalBuildDict) {
                 String value = Bytes.toString(key.getBytes(), 1, 
key.getLength() - 1);
+                logAFewRows(value);
                 builder.addValue(value);
             } else {
                 colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, 
key.getLength() - 1)));
@@ -195,14 +201,22 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<SelfDefineSortableK
                 }
             }
         }
+        
+        rowCount++;
+    }
+
+    private void logAFewRows(String value) {
+        if (rowCount < 10) {
+            logger.info("Received value: " + value);
+        }
     }
 
     private void outputDistinctValues(TblColRef col, Collection<ByteArray> 
values, Context context) throws IOException {
         final Configuration conf = context.getConfiguration();
         final FileSystem fs = FileSystem.get(conf);
         final String outputPath = conf.get(BatchConstants.CFG_OUTPUT_PATH);
-        final Path colDir = new Path(outputPath, col.getName());
-        final String fileName = col.getName() + "-" + taskId % uhcReducerCount;
+        final Path colDir = new Path(outputPath, col.getIdentity());
+        final String fileName = col.getIdentity() + "-" + taskId % 
uhcReducerCount;
         final Path outputFile = new Path(colDir, fileName);
 
         FSDataOutputStream out = null;
@@ -229,7 +243,7 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<SelfDefineSortableK
     }
 
     private void outputDict(TblColRef col, Dictionary<String> dict, Context 
context) throws IOException {
-        final String fileName = col.getName() + DICT_FILE_POSTFIX;
+        final String fileName = col.getIdentity() + DICT_FILE_POSTFIX;
         FSDataOutputStream out = getOutputStream(context, fileName);
         try {
             String dictClassName = dict.getClass().getName();
@@ -242,7 +256,7 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<SelfDefineSortableK
     }
 
     private void outputPartitionInfo(Context context) throws IOException {
-        final String fileName = col.getName() + 
PARTITION_COL_INFO_FILE_POSTFIX;
+        final String fileName = col.getIdentity() + 
PARTITION_COL_INFO_FILE_POSTFIX;
         FSDataOutputStream out = getOutputStream(context, fileName);
         try {
             out.writeLong(timeMinValue);
@@ -256,15 +270,12 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<SelfDefineSortableK
     private FSDataOutputStream getOutputStream(Context context, String 
outputFileName) throws IOException {
         final Configuration conf = context.getConfiguration();
         final FileSystem fs = FileSystem.get(conf);
-        final String outputPath = conf.get(BatchConstants.CFG_OUTPUT_PATH);
-        final Path colDir = new Path(outputPath, col.getName());
-        final Path outputFile = new Path(colDir, outputFileName);
-        FSDataOutputStream out = null;
-        if (!fs.exists(colDir)) {
-            fs.mkdirs(colDir);
+        final Path outputPath = new 
Path(conf.get(BatchConstants.CFG_OUTPUT_PATH));
+        final Path outputFile = new Path(outputPath, outputFileName);
+        if (!fs.exists(outputPath)) {
+            fs.mkdirs(outputPath);
         }
-        fs.deleteOnExit(outputFile);
-        out = fs.create(outputFile);
+        FSDataOutputStream out = fs.create(outputFile);
         return out;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/8cbd6a6c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
index 5692c76..ed65343 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
@@ -31,6 +31,8 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.hllc.HLLCounter;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 import com.google.common.hash.HashFunction;
@@ -41,6 +43,8 @@ import com.google.common.hash.Hashing;
  */
 public class FactDistinctHiveColumnsMapper<KEYIN> extends 
FactDistinctColumnsMapperBase<KEYIN, Object> {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(FactDistinctHiveColumnsMapper.class);
+
     protected boolean collectStatistics = false;
     protected CuboidScheduler cuboidScheduler = null;
     protected int nRowKey;
@@ -51,7 +55,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends 
FactDistinctColumnsMap
     private int rowCount = 0;
     private int samplingPercentage;
     private ByteArray[] row_hashcodes = null;
-    private ByteBuffer keyBuffer;
+    private ByteBuffer tmpbuf;
     private static final Text EMPTY_TEXT = new Text();
     public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE;
     public static final byte MARK_FOR_HLL = (byte) 0xFF;
@@ -62,7 +66,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends 
FactDistinctColumnsMap
     @Override
     protected void setup(Context context) throws IOException {
         super.setup(context);
-        keyBuffer = ByteBuffer.allocate(4096);
+        tmpbuf = ByteBuffer.allocate(4096);
         collectStatistics = 
Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
         if (collectStatistics) {
             samplingPercentage = 
Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
@@ -127,55 +131,54 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends 
FactDistinctColumnsMap
     public void doMap(KEYIN key, Object record, Context context) throws 
IOException, InterruptedException {
         String[] row = flatTableInputFormat.parseMapperInput(record);
 
-        keyBuffer.clear();
-        try {
-            for (int i = 0; i < factDictCols.size(); i++) {
-                String fieldValue = row[dictionaryColumnIndex[i]];
-                if (fieldValue == null)
-                    continue;
-                int offset = keyBuffer.position();
-
-                int reducerIndex;
-                if (uhcIndex[i] == 0) {
-                    //for the normal dictionary column
-                    reducerIndex = columnIndexToReducerBeginId.get(i);
-                } else {
-                    //for the uhc
-                    reducerIndex = columnIndexToReducerBeginId.get(i) + 
(fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount;
-                }
-                keyBuffer.put(Bytes.toBytes(reducerIndex)[3]);
-                keyBuffer.put(Bytes.toBytes(fieldValue));
-                outputKey.set(keyBuffer.array(), offset, keyBuffer.position() 
- offset);
-                sortableKey.setText(outputKey);
-                //judge type
-                sortableKey.setTypeIdByDatatype(factDictCols.get(i).getType());
-                context.write(sortableKey, EMPTY_TEXT);
+        for (int i = 0; i < factDictCols.size(); i++) {
+            String fieldValue = row[dictionaryColumnIndex[i]];
+            if (fieldValue == null)
+                continue;
+
+            int reducerIndex;
+            if (uhcIndex[i] == 0) {
+                //for the normal dictionary column
+                reducerIndex = columnIndexToReducerBeginId.get(i);
+            } else {
+                //for the uhc
+                reducerIndex = columnIndexToReducerBeginId.get(i) + 
(fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount;
+            }
+
+            tmpbuf.clear();
+            tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
+            tmpbuf.put(Bytes.toBytes(fieldValue));
+            outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+            sortableKey.setText(outputKey);
+            //judge type
+            sortableKey.setTypeIdByDatatype(factDictCols.get(i).getType());
+            context.write(sortableKey, EMPTY_TEXT);
+
+            // log a few rows for troubleshooting
+            if (rowCount < 10) {
+                logger.info("Sample output: " + factDictCols.get(i) + " '" + 
fieldValue + "' => reducer " + reducerIndex);
             }
-        } catch (Exception ex) {
-            handleErrorRecord(row, ex);
         }
 
         if (collectStatistics) {
-            if (rowCount < samplingPercentage) {
+            if (rowCount % 100 < samplingPercentage) {
                 putRowKeyToHLL(row);
             }
 
             if (needFetchPartitionCol == true) {
                 String fieldValue = row[partitionColumnIndex];
                 if (fieldValue != null) {
-                    int offset = keyBuffer.position();
-                    keyBuffer.put(MARK_FOR_PARTITION_COL);
-                    keyBuffer.put(Bytes.toBytes(fieldValue));
-                    outputKey.set(keyBuffer.array(), offset, 
keyBuffer.position() - offset);
+                    tmpbuf.clear();
+                    tmpbuf.put(MARK_FOR_PARTITION_COL);
+                    tmpbuf.put(Bytes.toBytes(fieldValue));
+                    outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
                     sortableKey.setText(outputKey);
                     sortableKey.setTypeId((byte) 0);
                     context.write(sortableKey, EMPTY_TEXT);
                 }
             }
         }
-
-        if (rowCount++ == 100)
-            rowCount = 0;
+        rowCount++;
     }
 
     private void putRowKeyToHLL(String[] row) {
@@ -211,10 +214,10 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends 
FactDistinctColumnsMap
             for (int i = 0; i < cuboidIds.length; i++) {
                 hll = allCuboidsHLL[i];
 
-                keyBuffer.clear();
-                keyBuffer.put(MARK_FOR_HLL); // one byte
-                keyBuffer.putLong(cuboidIds[i]);
-                outputKey.set(keyBuffer.array(), 0, keyBuffer.position());
+                tmpbuf.clear();
+                tmpbuf.put(MARK_FOR_HLL); // one byte
+                tmpbuf.putLong(cuboidIds[i]);
+                outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
                 hllBuf.clear();
                 hll.writeRegisters(hllBuf);
                 outputValue.set(hllBuf.array(), 0, hllBuf.position());

http://git-wip-us.apache.org/repos/asf/kylin/blob/8cbd6a6c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index d3becfe..dcc9190 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -59,7 +59,6 @@ public class UpdateCubeInfoAfterBuildStep extends 
AbstractExecutable {
         long cubeSizeBytes = cubingJob.findCubeSizeBytes();
 
         
segment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams()));
-        
segment.setIndexPath(CubingExecutableUtil.getIndexPath(this.getParams()));
         segment.setLastBuildTime(System.currentTimeMillis());
         segment.setSizeKB(cubeSizeBytes / 1024);
         segment.setInputRecords(sourceCount);
@@ -81,8 +80,7 @@ public class UpdateCubeInfoAfterBuildStep extends 
AbstractExecutable {
     private void updateTimeRange(CubeSegment segment) throws IOException {
         final TblColRef partitionCol = 
segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
         final String factColumnsInputPath = 
this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
-        Path colDir = new Path(factColumnsInputPath, partitionCol.getName());
-        Path outputFile = new Path(colDir, partitionCol.getName() + 
FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX);
+        Path outputFile = new Path(factColumnsInputPath, 
partitionCol.getIdentity() + 
FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX);
         FileSystem fs = HadoopUtil.getFileSystem(outputFile.toString());
         FSDataInputStream is = null;
         long minValue = Long.MAX_VALUE, maxValue = Long.MIN_VALUE;

http://git-wip-us.apache.org/repos/asf/kylin/blob/8cbd6a6c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
index d2fa73e..add5c42 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
@@ -75,7 +75,6 @@ public class UpdateCubeInfoAfterMergeStep extends 
AbstractExecutable {
         mergedSegment.setInputRecords(sourceCount);
         mergedSegment.setInputRecordsSize(sourceSize);
         
mergedSegment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams()));
-        
mergedSegment.setIndexPath(CubingExecutableUtil.getIndexPath(this.getParams()));
         mergedSegment.setLastBuildTime(System.currentTimeMillis());
 
         try {

Reply via email to