Repository: carbondata
Updated Branches:
  refs/heads/master 786db2171 -> 5443b227b


[CARBONDATA-2947] Adaptive encoding support for timestamp no dictionary and 
Refactor ColumnPageWrapper

Support adaptive encoding for Timestamp data type in case of no dictionary 
column

This closes #2736


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5443b227
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5443b227
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5443b227

Branch: refs/heads/master
Commit: 5443b227b806eecc81299f3c50d54d0b66fb00bb
Parents: 786db21
Author: dhatchayani <[email protected]>
Authored: Wed Sep 19 19:09:04 2018 +0530
Committer: ravipesala <[email protected]>
Committed: Tue Sep 25 16:08:39 2018 +0530

----------------------------------------------------------------------
 .../datastore/chunk/DimensionColumnPage.java    | 12 +++
 .../chunk/impl/AbstractDimensionColumnPage.java | 10 +++
 .../chunk/store/ColumnPageWrapper.java          | 91 ++++++++++++++++++--
 .../page/encoding/DefaultEncodingFactory.java   |  3 +-
 .../core/datastore/row/WriteStepRowUtil.java    |  7 ++
 .../core/scan/executor/util/QueryUtil.java      | 32 +++++++
 .../carbondata/core/scan/filter/FilterUtil.java | 19 ++--
 ...RowLevelRangeLessThanFilterExecuterImpl.java |  8 +-
 .../carbondata/core/util/CarbonUnsafeUtil.java  |  4 +-
 .../carbondata/core/util/DataTypeUtil.java      | 80 +++++++++++++++--
 .../datamap/bloom/BloomCoarseGrainDataMap.java  |  2 +-
 .../datamap/bloom/DataConvertUtil.java          |  2 +-
 .../datamap/IndexDataMapRebuildRDD.scala        | 11 ++-
 .../TestStreamingTableOperation.scala           |  2 +-
 .../converter/impl/FieldEncoderFactory.java     | 22 +++--
 .../impl/MeasureFieldConverterImpl.java         | 45 +++++++---
 .../converter/impl/RowConverterImpl.java        | 17 +++-
 .../loading/sort/SortStepRowHandler.java        | 11 ++-
 .../merger/CompactionResultSortProcessor.java   | 44 +++++++++-
 .../carbondata/processing/store/TablePage.java  | 16 +++-
 .../streaming/CarbonStreamRecordWriter.java     |  3 +-
 21 files changed, 385 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/core/src/main/java/org/apache/carbondata/core/datastore/chunk/DimensionColumnPage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/DimensionColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/DimensionColumnPage.java
index 50fa09a..fa2b73e 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/DimensionColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/DimensionColumnPage.java
@@ -16,6 +16,8 @@
  */
 package org.apache.carbondata.core.datastore.chunk;
 
+import java.util.BitSet;
+
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 
 /**
@@ -102,4 +104,14 @@ public interface DimensionColumnPage {
    */
   void freeMemory();
 
+  /**
+   * to check whether the page is adaptive encoded
+   */
+  boolean isAdaptiveEncoded();
+
+  /**
+   * to get the null bit sets in case of adaptive encoded page
+   */
+  BitSet getNullBits();
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/AbstractDimensionColumnPage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/AbstractDimensionColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/AbstractDimensionColumnPage.java
index d400952..fdf57a9 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/AbstractDimensionColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/AbstractDimensionColumnPage.java
@@ -16,6 +16,8 @@
  */
 package org.apache.carbondata.core.datastore.chunk.impl;
 
+import java.util.BitSet;
+
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
 import 
org.apache.carbondata.core.datastore.chunk.store.DimensionDataChunkStore;
 
@@ -44,6 +46,14 @@ public abstract class AbstractDimensionColumnPage implements 
DimensionColumnPage
     return dataChunkStore.isExplicitSorted();
   }
 
+  @Override public boolean isAdaptiveEncoded() {
+    return false;
+  }
+
+  @Override public BitSet getNullBits() {
+    return null;
+  }
+
   /**
    * Below method to get the data based in row id
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
index 71cfc46..098287e 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
@@ -18,6 +18,8 @@
 package org.apache.carbondata.core.datastore.chunk.store;
 
 
+import java.util.BitSet;
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
@@ -102,9 +104,7 @@ public class ColumnPageWrapper implements 
DimensionColumnPage {
       if (isExplicitSorted) {
         rowId = invertedReverseIndex[rowId];
       }
-      byte[] value = getChunkData(rowId, true);
-      int length = value.length;
-      QueryUtil.putDataToVector(vector, value, vectorRow, length);
+      QueryUtil.putDataToVector(vector, getActualData(rowId, true), vectorRow);
     }
   }
 
@@ -132,7 +132,7 @@ public class ColumnPageWrapper implements 
DimensionColumnPage {
     if (null != localDictionary) {
       return localDictionary
           
.getDictionaryValue(CarbonUtil.getSurrogateInternal(columnPage.getBytes(rowId), 
0, 3));
-    } else if ((columnType == ColumnType.COMPLEX_PRIMITIVE && 
this.isAdaptivePrimitive()) || (
+    } else if ((columnType == ColumnType.COMPLEX_PRIMITIVE && 
isAdaptiveEncoded()) || (
         columnType == ColumnType.PLAIN_VALUE && 
DataTypeUtil.isPrimitiveColumn(srcDataType))) {
       if (!isRowIdChanged && columnPage.getNullBits().get(rowId)
           && columnType == ColumnType.COMPLEX_PRIMITIVE) {
@@ -181,7 +181,7 @@ public class ColumnPageWrapper implements 
DimensionColumnPage {
       } else {
         throw new RuntimeException("unsupported type: " + targetDataType);
       }
-    } else if ((columnType == ColumnType.COMPLEX_PRIMITIVE && 
!this.isAdaptivePrimitive())) {
+    } else if ((columnType == ColumnType.COMPLEX_PRIMITIVE && 
!isAdaptiveEncoded())) {
       if (!isRowIdChanged && columnPage.getNullBits().get(rowId)) {
         return CarbonCommonConstants.EMPTY_BYTE_ARRAY;
       }
@@ -205,6 +205,81 @@ public class ColumnPageWrapper implements 
DimensionColumnPage {
     }
   }
 
+  private Object getActualData(int rowId, boolean isRowIdChanged) {
+    ColumnType columnType = columnPage.getColumnSpec().getColumnType();
+    DataType srcDataType = columnPage.getColumnSpec().getSchemaDataType();
+    DataType targetDataType = columnPage.getDataType();
+    if (null != localDictionary) {
+      return localDictionary
+          
.getDictionaryValue(CarbonUtil.getSurrogateInternal(columnPage.getBytes(rowId), 
0, 3));
+    } else if ((columnType == ColumnType.COMPLEX_PRIMITIVE && 
this.isAdaptiveEncoded()) || (
+        columnType == ColumnType.PLAIN_VALUE && 
DataTypeUtil.isPrimitiveColumn(srcDataType))) {
+      if (!isRowIdChanged && columnPage.getNullBits().get(rowId)
+          && columnType == ColumnType.COMPLEX_PRIMITIVE) {
+        // if this row is null, return default null represent in byte array
+        return CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+      }
+      if (!isRowIdChanged && columnPage.getNullBits().get(rowId)) {
+        // if this row is null, return default null represent in byte array
+        return CarbonCommonConstants.EMPTY_BYTE_ARRAY;
+      }
+      if (srcDataType == DataTypes.DOUBLE || srcDataType == DataTypes.FLOAT) {
+        double doubleData = columnPage.getDouble(rowId);
+        if (srcDataType == DataTypes.FLOAT) {
+          return (float) doubleData;
+        } else {
+          return doubleData;
+        }
+      } else if (DataTypes.isDecimal(srcDataType)) {
+        throw new RuntimeException("unsupported type: " + srcDataType);
+      } else if ((srcDataType == DataTypes.BYTE) || (srcDataType == 
DataTypes.BOOLEAN) || (
+          srcDataType == DataTypes.SHORT) || (srcDataType == 
DataTypes.SHORT_INT) || (srcDataType
+          == DataTypes.INT) || (srcDataType == DataTypes.LONG) || (srcDataType
+          == DataTypes.TIMESTAMP)) {
+        long longData = columnPage.getLong(rowId);
+        if ((srcDataType == DataTypes.BYTE)) {
+          return (byte) longData;
+        } else if (srcDataType == DataTypes.BOOLEAN) {
+          byte out = (byte) longData;
+          return ByteUtil.toBoolean(out);
+        } else if (srcDataType == DataTypes.SHORT) {
+          return (short) longData;
+        } else if (srcDataType == DataTypes.SHORT_INT) {
+          return (int) longData;
+        } else if (srcDataType == DataTypes.INT) {
+          return (int) longData;
+        } else {
+          // timestamp and long
+          return longData;
+        }
+      } else if ((targetDataType == DataTypes.STRING) || (targetDataType == 
DataTypes.VARCHAR) || (
+          targetDataType == DataTypes.BYTE_ARRAY)) {
+        return columnPage.getBytes(rowId);
+      } else {
+        throw new RuntimeException("unsupported type: " + targetDataType);
+      }
+    } else if ((columnType == ColumnType.COMPLEX_PRIMITIVE && 
!this.isAdaptiveEncoded())) {
+      if (!isRowIdChanged && columnPage.getNullBits().get(rowId)) {
+        return CarbonCommonConstants.EMPTY_BYTE_ARRAY;
+      }
+      if ((srcDataType == DataTypes.BYTE) || (srcDataType == 
DataTypes.BOOLEAN)) {
+        byte[] out = new byte[1];
+        out[0] = (columnPage.getByte(rowId));
+        return ByteUtil.toBoolean(out);
+      } else if (srcDataType == DataTypes.BYTE_ARRAY) {
+        return columnPage.getBytes(rowId);
+      } else if (srcDataType == DataTypes.DOUBLE) {
+        return columnPage.getDouble(rowId);
+      } else if (srcDataType == targetDataType) {
+        return columnPage.getBytes(rowId);
+      } else {
+        throw new RuntimeException("unsupported type: " + targetDataType);
+      }
+    } else {
+      return columnPage.getBytes(rowId);
+    }
+  }
+
   @Override
   public int getInvertedIndex(int rowId) {
     return invertedIndex[rowId];
@@ -239,8 +314,12 @@ public class ColumnPageWrapper implements 
DimensionColumnPage {
     }
   }
 
-  public boolean isAdaptivePrimitive() {
+  @Override public boolean isAdaptiveEncoded() {
     return isAdaptivePrimitivePage;
   }
 
+  @Override public BitSet getNullBits() {
+    return columnPage.getNullBits();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
index 9e8d853..146d5dd 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
@@ -121,7 +121,8 @@ public class DefaultEncodingFactory extends EncodingFactory 
{
     } else if (dataType == DataTypes.BYTE ||
         dataType == DataTypes.SHORT ||
         dataType == DataTypes.INT ||
-        dataType == DataTypes.LONG) {
+        dataType == DataTypes.LONG ||
+        dataType == DataTypes.TIMESTAMP) {
       return selectCodecByAlgorithmForIntegral(stats, false, 
columnSpec).createEncoder(null);
     } else if (DataTypes.isDecimal(dataType)) {
       return createEncoderForDecimalDataTypeMeasure(columnPage, columnSpec);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java
index 3d9de56..fe4e10e 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.datastore.row;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
 import org.apache.carbondata.core.util.DataTypeUtil;
@@ -68,6 +69,12 @@ public class WriteStepRowUtil {
         noDictKeys[i] = DataTypeUtil
             .getDataBasedOnDataTypeForNoDictionaryColumn(noDictionaryKeys[i],
                 noDicAndComplexColumns[i].getDataType());
+        // for timestamp the above method will give the original data, so it 
should be
+        // converted again to the format to be loaded (without micros)
+        if (null != noDictKeys[i]
+            && noDicAndComplexColumns[i].getDataType() == DataTypes.TIMESTAMP) 
{
+          noDictKeys[i] = (long) noDictKeys[i] / 1000L;
+        }
       } else {
         noDictKeys[i] = noDictionaryKeys[i];
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
index efe3e55..9fb0857 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
@@ -783,4 +783,36 @@ public class QueryUtil {
       }
     }
   }
+
+  /**
+   * Put the data to vector
+   *
+   * @param vector
+   * @param value
+   * @param vectorRow
+   */
+  public static void putDataToVector(CarbonColumnVector vector, Object value, 
int vectorRow) {
+    DataType dt = vector.getType();
+    if (value.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY) || value
+        .equals(CarbonCommonConstants.EMPTY_BYTE_ARRAY)) {
+      vector.putNull(vectorRow);
+    } else {
+      if (dt == DataTypes.STRING) {
+        vector.putBytes(vectorRow, (byte[]) value);
+      } else if (dt == DataTypes.BOOLEAN) {
+        vector.putBoolean(vectorRow, (boolean) value);
+      } else if (dt == DataTypes.BYTE) {
+        vector.putByte(vectorRow, (byte) value);
+      } else if (dt == DataTypes.SHORT) {
+        vector.putShort(vectorRow, (short) value);
+      } else if (dt == DataTypes.INT) {
+        vector.putInt(vectorRow, (int) value);
+      } else if (dt == DataTypes.LONG) {
+        vector.putLong(vectorRow,
+            DataTypeUtil.getDataBasedOnRestructuredDataType(value, 
vector.getBlockDataType()));
+      } else if (dt == DataTypes.TIMESTAMP) {
+        vector.putLong(vectorRow, (long) value * 1000L);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index ba6a033..b4354d2 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -1944,12 +1944,21 @@ public final class FilterUtil {
    * @param dimensionColumnPage
    * @param bitSet
    */
-  public static void removeNullValues(DimensionColumnPage dimensionColumnPage,
-      BitSet bitSet, byte[] defaultValue) {
+  public static void removeNullValues(DimensionColumnPage dimensionColumnPage, 
BitSet bitSet,
+      byte[] defaultValue) {
     if (!bitSet.isEmpty()) {
-      for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) 
{
-        if (dimensionColumnPage.compareTo(i, defaultValue) == 0) {
-          bitSet.flip(i);
+      if (null != dimensionColumnPage.getNullBits() && 
!dimensionColumnPage.getNullBits().isEmpty()
+          && !dimensionColumnPage.isExplicitSorted() && 
!dimensionColumnPage.isAdaptiveEncoded()) {
+        for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 
1)) {
+          if (dimensionColumnPage.getNullBits().get(i)) {
+            bitSet.flip(i);
+          }
+        }
+      } else {
+        for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 
1)) {
+          if (dimensionColumnPage.compareTo(i, defaultValue) == 0) {
+            bitSet.flip(i);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecuterImpl.java
index f0773d5..7c48180 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecuterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecuterImpl.java
@@ -178,16 +178,14 @@ public class RowLevelRangeLessThanFilterExecuterImpl 
extends RowLevelFilterExecu
           .compareTo(filterValues[k], CarbonCommonConstants.EMPTY_BYTE_ARRAY) 
== 0) {
         return true;
       }
-      // filter value should be in range of max and min value i.e
-      // max>filtervalue>min
-      // so filter-max should be negative
+      // so filter-min should be positive
       Object data =
           
DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(filterValues[k], 
dataType);
       SerializableComparator comparator = Comparator.getComparator(dataType);
       int minCompare = comparator.compare(data, minValue);
       // if any filter value is in range than this block needs to be
-      // scanned less than equal to max range.
-      if (minCompare >= 0) {
+      // scanned less than min range.
+      if (minCompare > 0) {
         isScanRequired = true;
         break;
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/core/src/main/java/org/apache/carbondata/core/util/CarbonUnsafeUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonUnsafeUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUnsafeUtil.java
index 28cec5f..e383196 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUnsafeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUnsafeUtil.java
@@ -44,7 +44,7 @@ public class CarbonUnsafeUtil {
       CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) 
data);
     } else if (dataType == DataTypes.INT) {
       CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, (int) data);
-    } else if (dataType == DataTypes.LONG) {
+    } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
       CarbonUnsafe.getUnsafe().putLong(baseObject, address + size, (long) 
data);
     } else if (DataTypes.isDecimal(dataType) || dataType == DataTypes.DOUBLE) {
       CarbonUnsafe.getUnsafe().putDouble(baseObject, address + size, (double) 
data);
@@ -79,7 +79,7 @@ public class CarbonUnsafeUtil {
       data = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
     } else if (dataType == DataTypes.INT) {
       data = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
-    } else if (dataType == DataTypes.LONG) {
+    } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
       data = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size);
     } else if (DataTypes.isDecimal(dataType) || dataType == DataTypes.DOUBLE) {
       data = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 4059316..fbcbee5 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -78,7 +78,6 @@ public final class DataTypeUtil {
    *
    * @param msrValue
    * @param dataType
-   * @param carbonMeasure
    * @return
    */
   public static Object getMeasureValueBasedOnDataType(String msrValue, 
DataType dataType,
@@ -91,7 +90,6 @@ public final class DataTypeUtil {
    *
    * @param msrValue
    * @param dataType
-   * @param carbonMeasure
    * @return
    */
   public static Object getMeasureValueBasedOnDataType(String msrValue, 
DataType dataType,
@@ -126,6 +124,60 @@ public final class DataTypeUtil {
     }
   }
 
+  /**
+   * This method will convert a given value to its specific type
+   *
+   * @param dimValue
+   * @param dataType
+   * @return
+   */
+  public static Object getNoDictionaryValueBasedOnDataType(String dimValue, 
DataType dataType,
+      int scale, int precision, boolean useConverter, String timeStampFormat) {
+    if (dataType == DataTypes.BOOLEAN) {
+      return BooleanConvert.parseBoolean(dimValue);
+    } else if (DataTypes.isDecimal(dataType)) {
+      BigDecimal bigDecimal =
+          new BigDecimal(dimValue).setScale(scale, RoundingMode.HALF_UP);
+      BigDecimal decimal = normalizeDecimalValue(bigDecimal, precision);
+      if (useConverter) {
+        return converter.convertFromBigDecimalToDecimal(decimal);
+      } else {
+        return decimal;
+      }
+    } else if (dataType == DataTypes.SHORT) {
+      return Short.parseShort(dimValue);
+    } else if (dataType == DataTypes.INT) {
+      return Integer.parseInt(dimValue);
+    } else if (dataType == DataTypes.LONG) {
+      return Long.valueOf(dimValue);
+    } else if (dataType == DataTypes.FLOAT) {
+      return Float.parseFloat(dimValue);
+    } else if (dataType == DataTypes.BYTE) {
+      return Byte.parseByte(dimValue);
+    } else if (dataType == DataTypes.TIMESTAMP) {
+      Date dateToStr = null;
+      DateFormat dateFormatter = null;
+      try {
+        if (null != timeStampFormat && !timeStampFormat.trim().isEmpty()) {
+          dateFormatter = new SimpleDateFormat(timeStampFormat);
+          dateFormatter.setLenient(false);
+        } else {
+          dateFormatter = timeStampformatter.get();
+        }
+        dateToStr = dateFormatter.parse(dimValue);
+        return dateToStr.getTime();
+      } catch (ParseException e) {
+        throw new NumberFormatException(e.getMessage());
+      }
+    } else {
+      Double parsedValue = Double.valueOf(dimValue);
+      if (Double.isInfinite(parsedValue) || Double.isNaN(parsedValue)) {
+        return null;
+      }
+      return parsedValue;
+    }
+  }
+
   public static Object getMeasureObjectFromDataType(byte[] data, DataType 
dataType) {
     if (data == null || data.length == 0) {
       return null;
@@ -1032,6 +1084,24 @@ public final class DataTypeUtil {
   }
 
   /**
+   * Method to type case the data based on modified data type. This method 
will used for
+   * retrieving the data after change in data type restructure operation
+   *
+   * @param data
+   * @param restructureDataType
+   * @return
+   */
+  public static long getDataBasedOnRestructuredDataType(Object data, DataType 
restructureDataType) {
+    long value = 0L;
+    if (restructureDataType == DataTypes.INT) {
+      value = (int) data;
+    } else if (restructureDataType == DataTypes.LONG) {
+      value = (long) data;
+    }
+    return value;
+  }
+
+  /**
    * Check if the column is a no dictionary primitive column
    *
    * @param dataType
@@ -1039,9 +1109,9 @@ public final class DataTypeUtil {
    */
   public static boolean isPrimitiveColumn(DataType dataType) {
     if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE || 
dataType == DataTypes.SHORT
-        || dataType == DataTypes.INT || dataType == DataTypes.LONG || 
DataTypes.isDecimal(dataType)
-        || dataType == DataTypes.FLOAT || dataType == DataTypes.DOUBLE
-        || dataType == DataTypes.BYTE_ARRAY) {
+        || dataType == DataTypes.INT || dataType == DataTypes.LONG
+        || dataType == DataTypes.TIMESTAMP || DataTypes.isDecimal(dataType)
+        || dataType == DataTypes.FLOAT || dataType == DataTypes.DOUBLE) {
       return true;
     }
     return false;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
----------------------------------------------------------------------
diff --git 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
index 2e2d94b..344ec09 100644
--- 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
+++ 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
@@ -140,7 +140,7 @@ public class BloomCoarseGrainDataMap extends 
CoarseGrainDataMap {
         dataField.setTimestampFormat(tsFormat);
         FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
             .createFieldEncoder(dataField, absoluteTableIdentifier, i, 
nullFormat, null, false,
-                localCaches[i], false, parentTablePath);
+                localCaches[i], false, parentTablePath, false);
         this.name2Converters.put(indexedColumn.get(i).getColName(), 
fieldConverter);
       }
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/DataConvertUtil.java
----------------------------------------------------------------------
diff --git 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/DataConvertUtil.java
 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/DataConvertUtil.java
index f59202d..c639905 100644
--- 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/DataConvertUtil.java
+++ 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/DataConvertUtil.java
@@ -61,7 +61,7 @@ public class DataConvertUtil {
       return (short) 0;
     } else if (dataType == DataTypes.INT) {
       return 0;
-    } else if (dataType == DataTypes.LONG) {
+    } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
       return 0L;
     } else if (dataType == DataTypes.DOUBLE) {
       return 0.0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index 1897c87..e3fec10 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -269,8 +269,17 @@ class RawBytesReadSupport(segmentProperties: 
SegmentProperties, indexColumns: Ar
         // no dictionary primitive columns are expected to be in original data 
while loading,
         // so convert it to original data
         if (DataTypeUtil.isPrimitiveColumn(col.getDataType)) {
-          val dataFromBytes = DataTypeUtil
+          var dataFromBytes = DataTypeUtil
             .getDataBasedOnDataTypeForNoDictionaryColumn(bytes, 
col.getDataType)
+          if (dataFromBytes == null) {
+            dataFromBytes = DataConvertUtil
+              .getNullValueForMeasure(col.getDataType, 
col.getColumnSchema.getScale)
+          }
+          // for timestamp the above method will give the original data, so it 
should be
+          // converted again to the format to be loaded (without micros)
+          if (null != dataFromBytes && col.getDataType == DataTypes.TIMESTAMP) 
{
+            dataFromBytes = (dataFromBytes.asInstanceOf[Long] / 
1000L).asInstanceOf[Object];
+          }
           dataFromBytes
         } else {
           bytes

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index baf4664..c4e3517 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -1003,7 +1003,7 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
     checkAnswer(
       sql("select * from stream_table_filter where updated is null"),
       Seq(Row(null, "", "", null, null, null, null, null, null)))
-    assert(1 == partitionNums("select * from stream_table_filter where updated 
is null"))
+    assert(3 == partitionNums("select * from stream_table_filter where updated 
is null"))
 
     checkAnswer(
       sql("select * from stream_table_filter where id is null and updated is 
not null"),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 7dfe95f..435cf24 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -58,16 +58,24 @@ public class FieldEncoderFactory {
   /**
    * Creates the FieldConverter for all dimensions, for measures return null.
    *
-   * @param dataField             column schema
-   * @param absoluteTableIdentifier table identifier
-   * @param index                 index of column in the row.
+   * @param dataField                 column schema
+   * @param absoluteTableIdentifier   table identifier
+   * @param index                     index of column in the row
+   * @param nullFormat                null format of the field
+   * @param client
+   * @param useOnePass
+   * @param localCache
    * @param isEmptyBadRecord
+   * @param parentTablePath
+   * @param isConvertToBinary     whether the no dictionary field to be 
converted to binary or not
    * @return
+   * @throws IOException
    */
   public FieldConverter createFieldEncoder(DataField dataField,
       AbsoluteTableIdentifier absoluteTableIdentifier, int index, String 
nullFormat,
       DictionaryClient client, Boolean useOnePass, Map<Object, Integer> 
localCache,
-      boolean isEmptyBadRecord, String parentTablePath) throws IOException {
+      boolean isEmptyBadRecord, String parentTablePath, boolean 
isConvertToBinary)
+      throws IOException {
     // Converters are only needed for dimensions and measures it return null.
     if (dataField.getColumn().isDimension()) {
       if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY) &&
@@ -112,9 +120,11 @@ public class FieldEncoderFactory {
             createComplexDataType(dataField, absoluteTableIdentifier,
                 client, useOnePass, localCache, index, nullFormat, 
isEmptyBadRecord), index);
       } else {
-        // if the no dictionary column is a numeric column then treat is as 
measure col
+        // if the no dictionary column is a numeric column and no need to 
convert to binary
+        // then treat it is as measure col
         // so that the adaptive encoding can be applied on it easily
-        if 
(DataTypeUtil.isPrimitiveColumn(dataField.getColumn().getDataType())) {
+        if (DataTypeUtil.isPrimitiveColumn(dataField.getColumn().getDataType())
+            && !isConvertToBinary) {
           return new MeasureFieldConverterImpl(dataField, nullFormat, index, 
isEmptyBadRecord);
         }
         return new NonDictionaryFieldConverterImpl(dataField, nullFormat, 
index, isEmptyBadRecord);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
index 20278e4..212037b 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
@@ -20,6 +20,7 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.loading.DataField;
 import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
@@ -71,6 +72,9 @@ public class MeasureFieldConverterImpl implements 
FieldConverter {
             dataField.getColumn().getDataType());
         
logHolder.getColumnMessageMap().put(dataField.getColumn().getColName(), 
message);
       }
+      if (dataField.getColumn().isDimension()) {
+        logHolder.setReason(message);
+      }
       return null;
     } else if (literalValue.length() == 0) {
       if (isEmptyBadRecord) {
@@ -87,21 +91,42 @@ public class MeasureFieldConverterImpl implements 
FieldConverter {
       return null;
     } else {
       try {
-        if (dataField.isUseActualData()) {
-          output = DataTypeUtil
-              .getMeasureValueBasedOnDataType(literalValue, 
dataField.getColumn().getDataType(),
-                  dataField.getColumn().getColumnSchema().getScale(),
-                  dataField.getColumn().getColumnSchema().getPrecision(), 
true);
+        // in case of no dictionary dimension
+        if (dataField.getColumn().isDimension()) {
+          String dateFormat = null;
+          if (dataField.getColumn().getDataType() == DataTypes.DATE) {
+            dateFormat = dataField.getDateFormat();
+          } else if (dataField.getColumn().getDataType() == 
DataTypes.TIMESTAMP) {
+            dateFormat = dataField.getTimestampFormat();
+          }
+          if (dataField.isUseActualData()) {
+            output = 
DataTypeUtil.getNoDictionaryValueBasedOnDataType(literalValue,
+                dataField.getColumn().getDataType(),
+                dataField.getColumn().getColumnSchema().getScale(),
+                dataField.getColumn().getColumnSchema().getPrecision(), true, 
dateFormat);
+          } else {
+            output = 
DataTypeUtil.getNoDictionaryValueBasedOnDataType(literalValue,
+                dataField.getColumn().getDataType(),
+                dataField.getColumn().getColumnSchema().getScale(),
+                dataField.getColumn().getColumnSchema().getPrecision(), false, 
dateFormat);
+          }
         } else {
-          output = DataTypeUtil
-              .getMeasureValueBasedOnDataType(literalValue, 
dataField.getColumn().getDataType(),
-                  dataField.getColumn().getColumnSchema().getScale(),
-                  dataField.getColumn().getColumnSchema().getPrecision());
+          if (dataField.isUseActualData()) {
+            output = DataTypeUtil
+                .getMeasureValueBasedOnDataType(literalValue, 
dataField.getColumn().getDataType(),
+                    dataField.getColumn().getColumnSchema().getScale(),
+                    dataField.getColumn().getColumnSchema().getPrecision(), 
true);
+          } else {
+            output = DataTypeUtil
+                .getMeasureValueBasedOnDataType(literalValue, 
dataField.getColumn().getDataType(),
+                    dataField.getColumn().getColumnSchema().getScale(),
+                    dataField.getColumn().getColumnSchema().getPrecision());
+          }
         }
         return output;
       } catch (NumberFormatException e) {
         if (LOGGER.isDebugEnabled()) {
-          LOGGER.debug("Can not convert value to Numeric type value. Value 
considered as null.");
+          LOGGER.debug("Cannot convert value to Numeric type value. Value 
considered as null.");
         }
         logHolder.setReason(CarbonDataProcessorUtil
             .prepareFailureReason(dataField.getColumn().getColName(),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
index a5e5138..2d4e167 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
@@ -69,6 +69,8 @@ public class RowConverterImpl implements RowConverter {
 
   private Map<Object, Integer>[] localCaches;
 
+  private boolean isConvertToBinary;
+
   public RowConverterImpl(DataField[] fields, CarbonDataLoadConfiguration 
configuration,
       BadRecordsLogger badRecordLogger) {
     this.fields = fields;
@@ -76,6 +78,14 @@ public class RowConverterImpl implements RowConverter {
     this.badRecordLogger = badRecordLogger;
   }
 
+  public RowConverterImpl(DataField[] fields, CarbonDataLoadConfiguration 
configuration,
+      BadRecordsLogger badRecordLogger, boolean isConvertToBinary) {
+    this.fields = fields;
+    this.configuration = configuration;
+    this.badRecordLogger = badRecordLogger;
+    this.isConvertToBinary = isConvertToBinary;
+  }
+
   @Override
   public void initialize() throws IOException {
     String nullFormat =
@@ -95,7 +105,7 @@ public class RowConverterImpl implements RowConverter {
       FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
           .createFieldEncoder(fields[i], configuration.getTableIdentifier(), 
i, nullFormat, client,
               configuration.getUseOnePass(), localCaches[i], isEmptyBadRecord,
-              configuration.getParentTablePath());
+              configuration.getParentTablePath(), isConvertToBinary);
       fieldConverterList.add(fieldConverter);
     }
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
@@ -199,7 +209,8 @@ public class RowConverterImpl implements RowConverter {
   @Override
   public RowConverter createCopyForNewThread() {
     RowConverterImpl converter =
-        new RowConverterImpl(this.fields, this.configuration, 
this.badRecordLogger);
+        new RowConverterImpl(this.fields, this.configuration, 
this.badRecordLogger,
+            this.isConvertToBinary);
     List<FieldConverter> fieldConverterList = new ArrayList<>();
     DictionaryClient client = createDictionaryClient();
     dictClients.add(client);
@@ -215,7 +226,7 @@ public class RowConverterImpl implements RowConverter {
         fieldConverter = FieldEncoderFactory.getInstance()
             .createFieldEncoder(fields[i], configuration.getTableIdentifier(), 
i, nullFormat,
                 client, configuration.getUseOnePass(), localCaches[i], 
isEmptyBadRecord,
-                configuration.getParentTablePath());
+                configuration.getParentTablePath(), isConvertToBinary);
       } catch (IOException e) {
         throw new RuntimeException(e);
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
index 1262fde..fa12dcc 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
@@ -300,7 +300,7 @@ public class SortStepRowHandler implements Serializable {
       data = inputStream.readShort();
     } else if (dataType == DataTypes.INT) {
       data = inputStream.readInt();
-    } else if (dataType == DataTypes.LONG) {
+    } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
       data = inputStream.readLong();
     } else if (dataType == DataTypes.DOUBLE) {
       data = inputStream.readDouble();
@@ -384,7 +384,7 @@ public class SortStepRowHandler implements Serializable {
       tmpContent = rowBuffer.getShort();
     } else if (DataTypes.INT == tmpDataType) {
       tmpContent = rowBuffer.getInt();
-    } else if (DataTypes.LONG == tmpDataType) {
+    } else if (DataTypes.LONG == tmpDataType || DataTypes.TIMESTAMP == 
tmpDataType) {
       tmpContent = rowBuffer.getLong();
     } else if (DataTypes.DOUBLE == tmpDataType) {
       tmpContent = rowBuffer.getDouble();
@@ -501,7 +501,7 @@ public class SortStepRowHandler implements Serializable {
         outputStream.writeShort((short) data);
       } else if (dataType == DataTypes.INT) {
         outputStream.writeInt((int) data);
-      } else if (dataType == DataTypes.LONG) {
+      } else if (dataType == DataTypes.LONG || dataType == 
DataTypes.TIMESTAMP) {
         outputStream.writeLong((long) data);
       } else if (dataType == DataTypes.DOUBLE) {
         outputStream.writeDouble((double) data);
@@ -715,6 +715,9 @@ public class SortStepRowHandler implements Serializable {
           size += 2;
         } else {
           int sizeInBytes = this.noDictSortDataTypes[idx].getSizeInBytes();
+          if (this.noDictSortDataTypes[idx] == DataTypes.TIMESTAMP) {
+            sizeInBytes = DataTypes.LONG.getSizeInBytes();
+          }
           CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, 
(short) sizeInBytes);
           size += 2;
           // put data to unsafe according to the data types
@@ -829,7 +832,7 @@ public class SortStepRowHandler implements Serializable {
       reUsableByteArrayDataOutputStream.writeShort((Short) tmpValue);
     } else if (DataTypes.INT == tmpDataType) {
       reUsableByteArrayDataOutputStream.writeInt((Integer) tmpValue);
-    } else if (DataTypes.LONG == tmpDataType) {
+    } else if (DataTypes.LONG == tmpDataType || DataTypes.TIMESTAMP == 
tmpDataType) {
       reUsableByteArrayDataOutputStream.writeLong((Long) tmpValue);
     } else if (DataTypes.DOUBLE == tmpDataType) {
       reUsableByteArrayDataOutputStream.writeDouble((Double) tmpValue);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 1aa6da8..6133016 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -231,7 +231,7 @@ public class CompactionResultSortProcessor extends 
AbstractResultProcessor {
       if (CompactionType.STREAMING == compactionType) {
         while (resultIterator.hasNext()) {
           // the input iterator of streaming segment is already using raw row
-          addRowForSorting(resultIterator.next());
+          
addRowForSorting(prepareStreamingRowObjectForSorting(resultIterator.next()));
           isRecordFound = true;
         }
       } else {
@@ -256,6 +256,43 @@ public class CompactionResultSortProcessor extends 
AbstractResultProcessor {
    * @param row
    * @return
    */
+  private Object[] prepareStreamingRowObjectForSorting(Object[] row) {
+    List<CarbonDimension> dimensions = segmentProperties.getDimensions();
+    Object[] preparedRow = new Object[dimensions.size() + measureCount];
+    for (int i = 0; i < dimensions.size(); i++) {
+      CarbonDimension dims = dimensions.get(i);
+      if (dims.hasEncoding(Encoding.DICTIONARY)) {
+        // dictionary
+        preparedRow[i] = row[i];
+      } else {
+        // no dictionary dims
+        if (DataTypeUtil.isPrimitiveColumn(dims.getDataType()) && 
!dims.isComplex()) {
+          // no dictionary measure columns are expected as original data
+          preparedRow[i] = DataTypeUtil
+              .getDataBasedOnDataTypeForNoDictionaryColumn((byte[]) row[i], 
dims.getDataType());
+          // for timestamp the above method will give the original data, so it 
should be
+          // converted again to the format to be loaded (without micros)
+          if (null != preparedRow[i] && dims.getDataType() == 
DataTypes.TIMESTAMP) {
+            preparedRow[i] = (long) preparedRow[i] / 1000L;
+          }
+        } else {
+          preparedRow[i] = row[i];
+        }
+      }
+    }
+    // fill all the measures
+    for (int i = 0; i < measureCount; i++) {
+      preparedRow[dimensionColumnCount + i] = row[dimensionColumnCount + i];
+    }
+    return preparedRow;
+  }
+
+  /**
+   * This method will prepare the data from raw object that will take part in 
sorting
+   *
+   * @param row
+   * @return
+   */
   private Object[] prepareRowObjectForSorting(Object[] row) {
     ByteArrayWrapper wrapper = (ByteArrayWrapper) row[0];
     // ByteBuffer[] noDictionaryBuffer = new ByteBuffer[noDictionaryCount];
@@ -283,6 +320,11 @@ public class CompactionResultSortProcessor extends 
AbstractResultProcessor {
           preparedRow[i] = DataTypeUtil
               
.getDataBasedOnDataTypeForNoDictionaryColumn(noDictionaryKeyByIndex,
                   dims.getDataType());
+          // for timestamp the above method will give the original data, so it 
should be
+          // converted again to the format to be loaded (without micros)
+          if (null != preparedRow[i] && dims.getDataType() == 
DataTypes.TIMESTAMP) {
+            preparedRow[i] = (long) preparedRow[i] / 1000L;
+          }
         } else {
           preparedRow[i] = noDictionaryKeyByIndex;
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index 2f49ef2..791b4c6 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -131,8 +131,13 @@ public class TablePage {
               columnPageEncoderMeta, pageSize, localDictionaryGenerator, 
false);
         } else {
           if (DataTypeUtil.isPrimitiveColumn(spec.getSchemaDataType())) {
-            columnPageEncoderMeta =
-                new ColumnPageEncoderMeta(spec, spec.getSchemaDataType(), 
columnCompressor);
+            if (spec.getSchemaDataType() == DataTypes.TIMESTAMP) {
+              columnPageEncoderMeta =
+                  new ColumnPageEncoderMeta(spec, DataTypes.LONG, 
columnCompressor);
+            } else {
+              columnPageEncoderMeta =
+                  new ColumnPageEncoderMeta(spec, spec.getSchemaDataType(), 
columnCompressor);
+            }
             // create the column page according to the data type for no 
dictionary numeric columns
             if (DataTypes.isDecimal(spec.getSchemaDataType())) {
               page = ColumnPage.newDecimalPage(columnPageEncoderMeta, 
pageSize);
@@ -147,7 +152,12 @@ public class TablePage {
         if (DataTypes.VARCHAR == dataType) {
           page.setStatsCollector(LVLongStringStatsCollector.newInstance());
         } else if (DataTypeUtil.isPrimitiveColumn(spec.getSchemaDataType())) {
-          
page.setStatsCollector(PrimitivePageStatsCollector.newInstance(spec.getSchemaDataType()));
+          if (spec.getSchemaDataType() == DataTypes.TIMESTAMP) {
+            
page.setStatsCollector(PrimitivePageStatsCollector.newInstance(DataTypes.LONG));
+          } else {
+            page.setStatsCollector(
+                
PrimitivePageStatsCollector.newInstance(spec.getSchemaDataType()));
+          }
         } else {
           page.setStatsCollector(LVShortStringStatsCollector.newInstance());
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
 
b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
index eeb9cc1..0d2a889 100644
--- 
a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
+++ 
b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
@@ -156,7 +156,8 @@ public class CarbonStreamRecordWriter extends 
RecordWriter<Void, Object> {
     // initialize parser and converter
     rowParser = new RowParserImpl(dataFields, configuration);
     badRecordLogger = 
BadRecordsLoggerProvider.createBadRecordLogger(configuration);
-    converter = new RowConverterImpl(configuration.getDataFields(), 
configuration, badRecordLogger);
+    converter =
+        new RowConverterImpl(configuration.getDataFields(), configuration, 
badRecordLogger, true);
     configuration.setCardinalityFinder(converter);
     converter.initialize();
 

Reply via email to