Repository: carbondata Updated Branches: refs/heads/master 786db2171 -> 5443b227b
[CARBONDATA-2947] Adaptive encoding support for timestamp no dictionary and Refactor ColumnPageWrapper Support adaptive encoding for Timestamp data type in case of no dictionary column This closes #2736 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5443b227 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5443b227 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5443b227 Branch: refs/heads/master Commit: 5443b227b806eecc81299f3c50d54d0b66fb00bb Parents: 786db21 Author: dhatchayani <[email protected]> Authored: Wed Sep 19 19:09:04 2018 +0530 Committer: ravipesala <[email protected]> Committed: Tue Sep 25 16:08:39 2018 +0530 ---------------------------------------------------------------------- .../datastore/chunk/DimensionColumnPage.java | 12 +++ .../chunk/impl/AbstractDimensionColumnPage.java | 10 +++ .../chunk/store/ColumnPageWrapper.java | 91 ++++++++++++++++++-- .../page/encoding/DefaultEncodingFactory.java | 3 +- .../core/datastore/row/WriteStepRowUtil.java | 7 ++ .../core/scan/executor/util/QueryUtil.java | 32 +++++++ .../carbondata/core/scan/filter/FilterUtil.java | 19 ++-- ...RowLevelRangeLessThanFilterExecuterImpl.java | 8 +- .../carbondata/core/util/CarbonUnsafeUtil.java | 4 +- .../carbondata/core/util/DataTypeUtil.java | 80 +++++++++++++++-- .../datamap/bloom/BloomCoarseGrainDataMap.java | 2 +- .../datamap/bloom/DataConvertUtil.java | 2 +- .../datamap/IndexDataMapRebuildRDD.scala | 11 ++- .../TestStreamingTableOperation.scala | 2 +- .../converter/impl/FieldEncoderFactory.java | 22 +++-- .../impl/MeasureFieldConverterImpl.java | 45 +++++++--- .../converter/impl/RowConverterImpl.java | 17 +++- .../loading/sort/SortStepRowHandler.java | 11 ++- .../merger/CompactionResultSortProcessor.java | 44 +++++++++- .../carbondata/processing/store/TablePage.java | 16 +++- .../streaming/CarbonStreamRecordWriter.java | 3 +- 21 files changed, 385 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/core/src/main/java/org/apache/carbondata/core/datastore/chunk/DimensionColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/DimensionColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/DimensionColumnPage.java index 50fa09a..fa2b73e 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/DimensionColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/DimensionColumnPage.java @@ -16,6 +16,8 @@ */ package org.apache.carbondata.core.datastore.chunk; +import java.util.BitSet; + import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; /** @@ -102,4 +104,14 @@ public interface DimensionColumnPage { */ void freeMemory(); + /** + * to check whether the page is adaptive encoded + */ + boolean isAdaptiveEncoded(); + + /** + * to get the null bit sets in case of adaptive encoded page + */ + BitSet getNullBits(); + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/AbstractDimensionColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/AbstractDimensionColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/AbstractDimensionColumnPage.java index d400952..fdf57a9 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/AbstractDimensionColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/AbstractDimensionColumnPage.java @@ -16,6 +16,8 @@ */ package org.apache.carbondata.core.datastore.chunk.impl; +import java.util.BitSet; + import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage; import org.apache.carbondata.core.datastore.chunk.store.DimensionDataChunkStore; @@ -44,6 +46,14 @@ public abstract class AbstractDimensionColumnPage implements DimensionColumnPage return dataChunkStore.isExplicitSorted(); } + @Override public boolean isAdaptiveEncoded() { + return false; + } + + @Override public BitSet getNullBits() { + return null; + } + /** * Below method to get the data based in row id * http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java index 71cfc46..098287e 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java @@ -18,6 +18,8 @@ package org.apache.carbondata.core.datastore.chunk.store; +import java.util.BitSet; + import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.ColumnType; import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage; @@ -102,9 +104,7 @@ public class ColumnPageWrapper implements DimensionColumnPage { if (isExplicitSorted) { rowId = invertedReverseIndex[rowId]; } - byte[] value = getChunkData(rowId, true); - int length = value.length; - QueryUtil.putDataToVector(vector, value, vectorRow, length); + QueryUtil.putDataToVector(vector, getActualData(rowId, true), vectorRow); } } @@ -132,7 +132,7 @@ public class ColumnPageWrapper implements DimensionColumnPage { if (null != localDictionary) { return localDictionary .getDictionaryValue(CarbonUtil.getSurrogateInternal(columnPage.getBytes(rowId), 0, 3)); - } else if ((columnType == ColumnType.COMPLEX_PRIMITIVE && this.isAdaptivePrimitive()) || ( + } else if ((columnType == ColumnType.COMPLEX_PRIMITIVE && isAdaptiveEncoded()) || ( columnType == ColumnType.PLAIN_VALUE && DataTypeUtil.isPrimitiveColumn(srcDataType))) { if (!isRowIdChanged && columnPage.getNullBits().get(rowId) && columnType == ColumnType.COMPLEX_PRIMITIVE) { @@ -181,7 +181,7 @@ public class ColumnPageWrapper implements DimensionColumnPage { } else { throw new RuntimeException("unsupported type: " + targetDataType); } - } else if ((columnType == ColumnType.COMPLEX_PRIMITIVE && !this.isAdaptivePrimitive())) { + } else if ((columnType == ColumnType.COMPLEX_PRIMITIVE && !isAdaptiveEncoded())) { if (!isRowIdChanged && columnPage.getNullBits().get(rowId)) { return CarbonCommonConstants.EMPTY_BYTE_ARRAY; } @@ -205,6 +205,81 @@ public class ColumnPageWrapper implements DimensionColumnPage { } } + private Object getActualData(int rowId, boolean isRowIdChanged) { + ColumnType columnType = columnPage.getColumnSpec().getColumnType(); + DataType srcDataType = columnPage.getColumnSpec().getSchemaDataType(); + DataType targetDataType = columnPage.getDataType(); + if (null != localDictionary) { + return localDictionary + .getDictionaryValue(CarbonUtil.getSurrogateInternal(columnPage.getBytes(rowId), 0, 3)); + } else if ((columnType == ColumnType.COMPLEX_PRIMITIVE && this.isAdaptiveEncoded()) || ( + columnType == ColumnType.PLAIN_VALUE && DataTypeUtil.isPrimitiveColumn(srcDataType))) { + if (!isRowIdChanged && columnPage.getNullBits().get(rowId) + && columnType == ColumnType.COMPLEX_PRIMITIVE) { + // if this row is null, return default null represent in byte array + return CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY; + } + if (!isRowIdChanged && columnPage.getNullBits().get(rowId)) { + // if this row is null, return default null represent in byte array + return CarbonCommonConstants.EMPTY_BYTE_ARRAY; + } + if (srcDataType == DataTypes.DOUBLE || srcDataType == DataTypes.FLOAT) { + double doubleData = columnPage.getDouble(rowId); + if (srcDataType == DataTypes.FLOAT) { + return (float) doubleData; + } else { + return doubleData; + } + } else if (DataTypes.isDecimal(srcDataType)) { + throw new RuntimeException("unsupported type: " + srcDataType); + } else if ((srcDataType == DataTypes.BYTE) || (srcDataType == DataTypes.BOOLEAN) || ( + srcDataType == DataTypes.SHORT) || (srcDataType == DataTypes.SHORT_INT) || (srcDataType + == DataTypes.INT) || (srcDataType == DataTypes.LONG) || (srcDataType + == DataTypes.TIMESTAMP)) { + long longData = columnPage.getLong(rowId); + if ((srcDataType == DataTypes.BYTE)) { + return (byte) longData; + } else if (srcDataType == DataTypes.BOOLEAN) { + byte out = (byte) longData; + return ByteUtil.toBoolean(out); + } else if (srcDataType == DataTypes.SHORT) { + return (short) longData; + } else if (srcDataType == DataTypes.SHORT_INT) { + return (int) longData; + } else if (srcDataType == DataTypes.INT) { + return (int) longData; + } else { + // timestamp and long + return longData; + } + } else if ((targetDataType == DataTypes.STRING) || (targetDataType == DataTypes.VARCHAR) || ( + targetDataType == DataTypes.BYTE_ARRAY)) { + return columnPage.getBytes(rowId); + } else { + throw new RuntimeException("unsupported type: " + targetDataType); + } + } else if ((columnType == ColumnType.COMPLEX_PRIMITIVE && !this.isAdaptiveEncoded())) { + if (!isRowIdChanged && columnPage.getNullBits().get(rowId)) { + return CarbonCommonConstants.EMPTY_BYTE_ARRAY; + } + if ((srcDataType == DataTypes.BYTE) || (srcDataType == DataTypes.BOOLEAN)) { + byte[] out = new byte[1]; + out[0] = (columnPage.getByte(rowId)); + return ByteUtil.toBoolean(out); + } else if (srcDataType == DataTypes.BYTE_ARRAY) { + return columnPage.getBytes(rowId); + } else if (srcDataType == DataTypes.DOUBLE) { + return columnPage.getDouble(rowId); + } else if (srcDataType == targetDataType) { + return columnPage.getBytes(rowId); + } else { + throw new RuntimeException("unsupported type: " + targetDataType); + } + } else { + return columnPage.getBytes(rowId); + } + } + @Override public int getInvertedIndex(int rowId) { return invertedIndex[rowId]; @@ -239,8 +314,12 @@ public class ColumnPageWrapper implements DimensionColumnPage { } } - public boolean isAdaptivePrimitive() { + @Override public boolean isAdaptiveEncoded() { return isAdaptivePrimitivePage; } + @Override public BitSet getNullBits() { + return columnPage.getNullBits(); + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java index 9e8d853..146d5dd 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java @@ -121,7 +121,8 @@ public class DefaultEncodingFactory extends EncodingFactory { } else if (dataType == DataTypes.BYTE || dataType == DataTypes.SHORT || dataType == DataTypes.INT || - dataType == DataTypes.LONG) { + dataType == DataTypes.LONG || + dataType == DataTypes.TIMESTAMP) { return selectCodecByAlgorithmForIntegral(stats, false, columnSpec).createEncoder(null); } else if (DataTypes.isDecimal(dataType)) { return createEncoderForDecimalDataTypeMeasure(columnPage, columnSpec); http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java b/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java index 3d9de56..fe4e10e 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java @@ -20,6 +20,7 @@ package org.apache.carbondata.core.datastore.row; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.KeyGenerator; +import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; import org.apache.carbondata.core.util.DataTypeUtil; @@ -68,6 +69,12 @@ public class WriteStepRowUtil { noDictKeys[i] = DataTypeUtil .getDataBasedOnDataTypeForNoDictionaryColumn(noDictionaryKeys[i], noDicAndComplexColumns[i].getDataType()); + // for timestamp the above method will give the original data, so it should be + // converted again to the format to be loaded (without micros) + if (null != noDictKeys[i] + && noDicAndComplexColumns[i].getDataType() == DataTypes.TIMESTAMP) { + noDictKeys[i] = (long) noDictKeys[i] / 1000L; + } } else { noDictKeys[i] = noDictionaryKeys[i]; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java index efe3e55..9fb0857 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java @@ -783,4 +783,36 @@ public class QueryUtil { } } } + + /** + * Put the data to vector + * + * @param vector + * @param value + * @param vectorRow + */ + public static void putDataToVector(CarbonColumnVector vector, Object value, int vectorRow) { + DataType dt = vector.getType(); + if (value.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY) || value + .equals(CarbonCommonConstants.EMPTY_BYTE_ARRAY)) { + vector.putNull(vectorRow); + } else { + if (dt == DataTypes.STRING) { + vector.putBytes(vectorRow, (byte[]) value); + } else if (dt == DataTypes.BOOLEAN) { + vector.putBoolean(vectorRow, (boolean) value); + } else if (dt == DataTypes.BYTE) { + vector.putByte(vectorRow, (byte) value); + } else if (dt == DataTypes.SHORT) { + vector.putShort(vectorRow, (short) value); + } else if (dt == DataTypes.INT) { + vector.putInt(vectorRow, (int) value); + } else if (dt == DataTypes.LONG) { + vector.putLong(vectorRow, + DataTypeUtil.getDataBasedOnRestructuredDataType(value, vector.getBlockDataType())); + } else if (dt == DataTypes.TIMESTAMP) { + vector.putLong(vectorRow, (long) value * 1000L); + } + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java index ba6a033..b4354d2 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java @@ -1944,12 +1944,21 @@ public final class FilterUtil { * @param dimensionColumnPage * @param bitSet */ - public static void removeNullValues(DimensionColumnPage dimensionColumnPage, - BitSet bitSet, byte[] defaultValue) { + public static void removeNullValues(DimensionColumnPage dimensionColumnPage, BitSet bitSet, + byte[] defaultValue) { if (!bitSet.isEmpty()) { - for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) { - if (dimensionColumnPage.compareTo(i, defaultValue) == 0) { - bitSet.flip(i); + if (null != dimensionColumnPage.getNullBits() && !dimensionColumnPage.getNullBits().isEmpty() + && !dimensionColumnPage.isExplicitSorted() && !dimensionColumnPage.isAdaptiveEncoded()) { + for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) { + if (dimensionColumnPage.getNullBits().get(i)) { + bitSet.flip(i); + } + } + } else { + for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) { + if (dimensionColumnPage.compareTo(i, defaultValue) == 0) { + bitSet.flip(i); + } } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecuterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecuterImpl.java index f0773d5..7c48180 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecuterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecuterImpl.java @@ -178,16 +178,14 @@ public class RowLevelRangeLessThanFilterExecuterImpl extends RowLevelFilterExecu .compareTo(filterValues[k], CarbonCommonConstants.EMPTY_BYTE_ARRAY) == 0) { return true; } - // filter value should be in range of max and min value i.e - // max>filtervalue>min - // so filter-max should be negative + // so filter-min should be positive Object data = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(filterValues[k], dataType); SerializableComparator comparator = Comparator.getComparator(dataType); int minCompare = comparator.compare(data, minValue); // if any filter value is in range than this block needs to be - // scanned less than equal to max range. - if (minCompare >= 0) { + // scanned less than min range. + if (minCompare > 0) { isScanRequired = true; break; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/core/src/main/java/org/apache/carbondata/core/util/CarbonUnsafeUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUnsafeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUnsafeUtil.java index 28cec5f..e383196 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUnsafeUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUnsafeUtil.java @@ -44,7 +44,7 @@ public class CarbonUnsafeUtil { CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) data); } else if (dataType == DataTypes.INT) { CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, (int) data); - } else if (dataType == DataTypes.LONG) { + } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) { CarbonUnsafe.getUnsafe().putLong(baseObject, address + size, (long) data); } else if (DataTypes.isDecimal(dataType) || dataType == DataTypes.DOUBLE) { CarbonUnsafe.getUnsafe().putDouble(baseObject, address + size, (double) data); @@ -79,7 +79,7 @@ public class CarbonUnsafeUtil { data = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size); } else if (dataType == DataTypes.INT) { data = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size); - } else if (dataType == DataTypes.LONG) { + } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) { data = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size); } else if (DataTypes.isDecimal(dataType) || dataType == DataTypes.DOUBLE) { data = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size); http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java index 4059316..fbcbee5 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java @@ -78,7 +78,6 @@ public final class DataTypeUtil { * * @param msrValue * @param dataType - * @param carbonMeasure * @return */ public static Object getMeasureValueBasedOnDataType(String msrValue, DataType dataType, @@ -91,7 +90,6 @@ public final class DataTypeUtil { * * @param msrValue * @param dataType - * @param carbonMeasure * @return */ public static Object getMeasureValueBasedOnDataType(String msrValue, DataType dataType, @@ -126,6 +124,60 @@ public final class DataTypeUtil { } } + /** + * This method will convert a given value to its specific type + * + * @param dimValue + * @param dataType + * @return + */ + public static Object getNoDictionaryValueBasedOnDataType(String dimValue, DataType dataType, + int scale, int precision, boolean useConverter, String timeStampFormat) { + if (dataType == DataTypes.BOOLEAN) { + return BooleanConvert.parseBoolean(dimValue); + } else if (DataTypes.isDecimal(dataType)) { + BigDecimal bigDecimal = + new BigDecimal(dimValue).setScale(scale, RoundingMode.HALF_UP); + BigDecimal decimal = normalizeDecimalValue(bigDecimal, precision); + if (useConverter) { + return converter.convertFromBigDecimalToDecimal(decimal); + } else { + return decimal; + } + } else if (dataType == DataTypes.SHORT) { + return Short.parseShort(dimValue); + } else if (dataType == DataTypes.INT) { + return Integer.parseInt(dimValue); + } else if (dataType == DataTypes.LONG) { + return Long.valueOf(dimValue); + } else if (dataType == DataTypes.FLOAT) { + return Float.parseFloat(dimValue); + } else if (dataType == DataTypes.BYTE) { + return Byte.parseByte(dimValue); + } else if (dataType == DataTypes.TIMESTAMP) { + Date dateToStr = null; + DateFormat dateFormatter = null; + try { + if (null != timeStampFormat && !timeStampFormat.trim().isEmpty()) { + dateFormatter = new SimpleDateFormat(timeStampFormat); + dateFormatter.setLenient(false); + } else { + dateFormatter = timeStampformatter.get(); + } + dateToStr = dateFormatter.parse(dimValue); + return dateToStr.getTime(); + } catch (ParseException e) { + throw new NumberFormatException(e.getMessage()); + } + } else { + Double parsedValue = Double.valueOf(dimValue); + if (Double.isInfinite(parsedValue) || Double.isNaN(parsedValue)) { + return null; + } + return parsedValue; + } + } + public static Object getMeasureObjectFromDataType(byte[] data, DataType dataType) { if (data == null || data.length == 0) { return null; @@ -1032,6 +1084,24 @@ public final class DataTypeUtil { } /** + * Method to type case the data based on modified data type. This method will used for + * retrieving the data after change in data type restructure operation + * + * @param data + * @param restructureDataType + * @return + */ + public static long getDataBasedOnRestructuredDataType(Object data, DataType restructureDataType) { + long value = 0L; + if (restructureDataType == DataTypes.INT) { + value = (int) data; + } else if (restructureDataType == DataTypes.LONG) { + value = (long) data; + } + return value; + } + + /** * Check if the column is a no dictionary primitive column * * @param dataType @@ -1039,9 +1109,9 @@ public final class DataTypeUtil { */ public static boolean isPrimitiveColumn(DataType dataType) { if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE || dataType == DataTypes.SHORT - || dataType == DataTypes.INT || dataType == DataTypes.LONG || DataTypes.isDecimal(dataType) - || dataType == DataTypes.FLOAT || dataType == DataTypes.DOUBLE - || dataType == DataTypes.BYTE_ARRAY) { + || dataType == DataTypes.INT || dataType == DataTypes.LONG + || dataType == DataTypes.TIMESTAMP || DataTypes.isDecimal(dataType) + || dataType == DataTypes.FLOAT || dataType == DataTypes.DOUBLE) { return true; } return false; http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java index 2e2d94b..344ec09 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java @@ -140,7 +140,7 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap { dataField.setTimestampFormat(tsFormat); FieldConverter fieldConverter = FieldEncoderFactory.getInstance() .createFieldEncoder(dataField, absoluteTableIdentifier, i, nullFormat, null, false, - localCaches[i], false, parentTablePath); + localCaches[i], false, parentTablePath, false); this.name2Converters.put(indexedColumn.get(i).getColName(), fieldConverter); } } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/DataConvertUtil.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/DataConvertUtil.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/DataConvertUtil.java index f59202d..c639905 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/DataConvertUtil.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/DataConvertUtil.java @@ -61,7 +61,7 @@ public class DataConvertUtil { return (short) 0; } else if (dataType == DataTypes.INT) { return 0; - } else if (dataType == DataTypes.LONG) { + } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) { return 0L; } else if (dataType == DataTypes.DOUBLE) { return 0.0; http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala index 1897c87..e3fec10 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala @@ -269,8 +269,17 @@ class RawBytesReadSupport(segmentProperties: SegmentProperties, indexColumns: Ar // no dictionary primitive columns are expected to be in original data while loading, // so convert it to original data if (DataTypeUtil.isPrimitiveColumn(col.getDataType)) { - val dataFromBytes = DataTypeUtil + var dataFromBytes = DataTypeUtil .getDataBasedOnDataTypeForNoDictionaryColumn(bytes, col.getDataType) + if (dataFromBytes == null) { + dataFromBytes = DataConvertUtil + .getNullValueForMeasure(col.getDataType, col.getColumnSchema.getScale) + } + // for timestamp the above method will give the original data, so it should be + // converted again to the format to be loaded (without micros) + if (null != dataFromBytes && col.getDataType == DataTypes.TIMESTAMP) { + dataFromBytes = (dataFromBytes.asInstanceOf[Long] / 1000L).asInstanceOf[Object]; + } dataFromBytes } else { bytes http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index baf4664..c4e3517 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -1003,7 +1003,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { checkAnswer( sql("select * from stream_table_filter where updated is null"), Seq(Row(null, "", "", null, null, null, null, null, null))) - assert(1 == partitionNums("select * from stream_table_filter where updated is null")) + assert(3 == partitionNums("select * from stream_table_filter where updated is null")) checkAnswer( sql("select * from stream_table_filter where id is null and updated is not null"), http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java index 7dfe95f..435cf24 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java @@ -58,16 +58,24 @@ public class FieldEncoderFactory { /** * Creates the FieldConverter for all dimensions, for measures return null. * - * @param dataField column schema - * @param absoluteTableIdentifier table identifier - * @param index index of column in the row. + * @param dataField column schema + * @param absoluteTableIdentifier table identifier + * @param index index of column in the row + * @param nullFormat null format of the field + * @param client + * @param useOnePass + * @param localCache * @param isEmptyBadRecord + * @param parentTablePath + * @param isConvertToBinary whether the no dictionary field to be converted to binary or not * @return + * @throws IOException */ public FieldConverter createFieldEncoder(DataField dataField, AbsoluteTableIdentifier absoluteTableIdentifier, int index, String nullFormat, DictionaryClient client, Boolean useOnePass, Map<Object, Integer> localCache, - boolean isEmptyBadRecord, String parentTablePath) throws IOException { + boolean isEmptyBadRecord, String parentTablePath, boolean isConvertToBinary) + throws IOException { // Converters are only needed for dimensions and measures it return null. if (dataField.getColumn().isDimension()) { if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY) && @@ -112,9 +120,11 @@ public class FieldEncoderFactory { createComplexDataType(dataField, absoluteTableIdentifier, client, useOnePass, localCache, index, nullFormat, isEmptyBadRecord), index); } else { - // if the no dictionary column is a numeric column then treat is as measure col + // if the no dictionary column is a numeric column and no need to convert to binary + // then treat it is as measure col // so that the adaptive encoding can be applied on it easily - if (DataTypeUtil.isPrimitiveColumn(dataField.getColumn().getDataType())) { + if (DataTypeUtil.isPrimitiveColumn(dataField.getColumn().getDataType()) + && !isConvertToBinary) { return new MeasureFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord); } return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord); http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java index 20278e4..212037b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java @@ -20,6 +20,7 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.processing.loading.DataField; import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder; @@ -71,6 +72,9 @@ public class MeasureFieldConverterImpl implements FieldConverter { dataField.getColumn().getDataType()); logHolder.getColumnMessageMap().put(dataField.getColumn().getColName(), message); } + if (dataField.getColumn().isDimension()) { + logHolder.setReason(message); + } return null; } else if (literalValue.length() == 0) { if (isEmptyBadRecord) { @@ -87,21 +91,42 @@ public class MeasureFieldConverterImpl implements FieldConverter { return null; } else { try { - if (dataField.isUseActualData()) { - output = DataTypeUtil - .getMeasureValueBasedOnDataType(literalValue, dataField.getColumn().getDataType(), - dataField.getColumn().getColumnSchema().getScale(), - dataField.getColumn().getColumnSchema().getPrecision(), true); + // in case of no dictionary dimension + if (dataField.getColumn().isDimension()) { + String dateFormat = null; + if (dataField.getColumn().getDataType() == DataTypes.DATE) { + dateFormat = dataField.getDateFormat(); + } else if (dataField.getColumn().getDataType() == DataTypes.TIMESTAMP) { + dateFormat = dataField.getTimestampFormat(); + } + if (dataField.isUseActualData()) { + output = DataTypeUtil.getNoDictionaryValueBasedOnDataType(literalValue, + dataField.getColumn().getDataType(), + dataField.getColumn().getColumnSchema().getScale(), + dataField.getColumn().getColumnSchema().getPrecision(), true, dateFormat); + } else { + output = DataTypeUtil.getNoDictionaryValueBasedOnDataType(literalValue, + dataField.getColumn().getDataType(), + dataField.getColumn().getColumnSchema().getScale(), + dataField.getColumn().getColumnSchema().getPrecision(), false, dateFormat); + } } else { - output = DataTypeUtil - .getMeasureValueBasedOnDataType(literalValue, dataField.getColumn().getDataType(), - dataField.getColumn().getColumnSchema().getScale(), - dataField.getColumn().getColumnSchema().getPrecision()); + if (dataField.isUseActualData()) { + output = DataTypeUtil + .getMeasureValueBasedOnDataType(literalValue, dataField.getColumn().getDataType(), + dataField.getColumn().getColumnSchema().getScale(), + dataField.getColumn().getColumnSchema().getPrecision(), true); + } else { + output = DataTypeUtil + .getMeasureValueBasedOnDataType(literalValue, dataField.getColumn().getDataType(), + dataField.getColumn().getColumnSchema().getScale(), + dataField.getColumn().getColumnSchema().getPrecision()); + } } return output; } catch (NumberFormatException e) { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Can not convert value to Numeric type value. Value considered as null."); + LOGGER.debug("Cannot convert value to Numeric type value. Value considered as null."); } logHolder.setReason(CarbonDataProcessorUtil .prepareFailureReason(dataField.getColumn().getColName(), http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java index a5e5138..2d4e167 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java @@ -69,6 +69,8 @@ public class RowConverterImpl implements RowConverter { private Map<Object, Integer>[] localCaches; + private boolean isConvertToBinary; + public RowConverterImpl(DataField[] fields, CarbonDataLoadConfiguration configuration, BadRecordsLogger badRecordLogger) { this.fields = fields; @@ -76,6 +78,14 @@ public class RowConverterImpl implements RowConverter { this.badRecordLogger = badRecordLogger; } + public RowConverterImpl(DataField[] fields, CarbonDataLoadConfiguration configuration, + BadRecordsLogger badRecordLogger, boolean isConvertToBinary) { + this.fields = fields; + this.configuration = configuration; + this.badRecordLogger = badRecordLogger; + this.isConvertToBinary = isConvertToBinary; + } + @Override public void initialize() throws IOException { String nullFormat = @@ -95,7 +105,7 @@ public class RowConverterImpl implements RowConverter { FieldConverter fieldConverter = FieldEncoderFactory.getInstance() .createFieldEncoder(fields[i], configuration.getTableIdentifier(), i, nullFormat, client, configuration.getUseOnePass(), localCaches[i], isEmptyBadRecord, - configuration.getParentTablePath()); + configuration.getParentTablePath(), isConvertToBinary); fieldConverterList.add(fieldConverter); } CarbonTimeStatisticsFactory.getLoadStatisticsInstance() @@ -199,7 +209,8 @@ public class RowConverterImpl implements RowConverter { @Override public RowConverter createCopyForNewThread() { RowConverterImpl converter = - new RowConverterImpl(this.fields, this.configuration, this.badRecordLogger); + new RowConverterImpl(this.fields, this.configuration, this.badRecordLogger, + this.isConvertToBinary); List<FieldConverter> fieldConverterList = new ArrayList<>(); DictionaryClient client = createDictionaryClient(); dictClients.add(client); @@ -215,7 +226,7 @@ public class RowConverterImpl implements RowConverter { fieldConverter = FieldEncoderFactory.getInstance() .createFieldEncoder(fields[i], configuration.getTableIdentifier(), i, nullFormat, client, configuration.getUseOnePass(), localCaches[i], isEmptyBadRecord, - configuration.getParentTablePath()); + configuration.getParentTablePath(), isConvertToBinary); } catch (IOException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java index 1262fde..fa12dcc 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java @@ -300,7 +300,7 @@ public class SortStepRowHandler implements Serializable { data = inputStream.readShort(); } else if (dataType == DataTypes.INT) { data = inputStream.readInt(); - } else if (dataType == DataTypes.LONG) { + } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) { data = inputStream.readLong(); } else if (dataType == DataTypes.DOUBLE) { data = inputStream.readDouble(); @@ -384,7 +384,7 @@ public class SortStepRowHandler implements Serializable { tmpContent = rowBuffer.getShort(); } else if (DataTypes.INT == tmpDataType) { tmpContent = rowBuffer.getInt(); - } else if (DataTypes.LONG == tmpDataType) { + } else if (DataTypes.LONG == tmpDataType || DataTypes.TIMESTAMP == tmpDataType) { tmpContent = rowBuffer.getLong(); } else if (DataTypes.DOUBLE == tmpDataType) { tmpContent = rowBuffer.getDouble(); @@ -501,7 +501,7 @@ public class SortStepRowHandler implements Serializable { outputStream.writeShort((short) data); } else if (dataType == DataTypes.INT) { outputStream.writeInt((int) data); - } else if (dataType == DataTypes.LONG) { + } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) { outputStream.writeLong((long) data); } else if (dataType == DataTypes.DOUBLE) { outputStream.writeDouble((double) data); @@ -715,6 +715,9 @@ public class SortStepRowHandler implements Serializable { size += 2; } else { int sizeInBytes = this.noDictSortDataTypes[idx].getSizeInBytes(); + if (this.noDictSortDataTypes[idx] == DataTypes.TIMESTAMP) { + sizeInBytes = DataTypes.LONG.getSizeInBytes(); + } CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) sizeInBytes); size += 2; // put data to unsafe according to the data types @@ -829,7 +832,7 @@ public class SortStepRowHandler implements Serializable { reUsableByteArrayDataOutputStream.writeShort((Short) tmpValue); } else if (DataTypes.INT == tmpDataType) { reUsableByteArrayDataOutputStream.writeInt((Integer) tmpValue); - } else if (DataTypes.LONG == tmpDataType) { + } else if (DataTypes.LONG == tmpDataType || DataTypes.TIMESTAMP == tmpDataType) { reUsableByteArrayDataOutputStream.writeLong((Long) tmpValue); } else if (DataTypes.DOUBLE == tmpDataType) { reUsableByteArrayDataOutputStream.writeDouble((Double) tmpValue); http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index 1aa6da8..6133016 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -231,7 +231,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { if (CompactionType.STREAMING == compactionType) { while (resultIterator.hasNext()) { // the input iterator of streaming segment is already using raw row - addRowForSorting(resultIterator.next()); + addRowForSorting(prepareStreamingRowObjectForSorting(resultIterator.next())); isRecordFound = true; } } else { @@ -256,6 +256,43 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { * @param row * @return */ + private Object[] prepareStreamingRowObjectForSorting(Object[] row) { + List<CarbonDimension> dimensions = segmentProperties.getDimensions(); + Object[] preparedRow = new Object[dimensions.size() + measureCount]; + for (int i = 0; i < dimensions.size(); i++) { + CarbonDimension dims = dimensions.get(i); + if (dims.hasEncoding(Encoding.DICTIONARY)) { + // dictionary + preparedRow[i] = row[i]; + } else { + // no dictionary dims + if (DataTypeUtil.isPrimitiveColumn(dims.getDataType()) && !dims.isComplex()) { + // no dictionary measure columns are expected as original data + preparedRow[i] = DataTypeUtil + .getDataBasedOnDataTypeForNoDictionaryColumn((byte[]) row[i], dims.getDataType()); + // for timestamp the above method will give the original data, so it should be + // converted again to the format to be loaded (without micros) + if (null != preparedRow[i] && dims.getDataType() == DataTypes.TIMESTAMP) { + preparedRow[i] = (long) preparedRow[i] / 1000L; + } + } else { + preparedRow[i] = row[i]; + } + } + } + // fill all the measures + for (int i = 0; i < measureCount; i++) { + preparedRow[dimensionColumnCount + i] = row[dimensionColumnCount + i]; + } + return preparedRow; + } + + /** + * This method will prepare the data from raw object that will take part in sorting + * + * @param row + * @return + */ private Object[] prepareRowObjectForSorting(Object[] row) { ByteArrayWrapper wrapper = (ByteArrayWrapper) row[0]; // ByteBuffer[] noDictionaryBuffer = new ByteBuffer[noDictionaryCount]; @@ -283,6 +320,11 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { preparedRow[i] = DataTypeUtil .getDataBasedOnDataTypeForNoDictionaryColumn(noDictionaryKeyByIndex, dims.getDataType()); + // for timestamp the above method will give the original data, so it should be + // converted again to the format to be loaded (without micros) + if (null != preparedRow[i] && dims.getDataType() == DataTypes.TIMESTAMP) { + preparedRow[i] = (long) preparedRow[i] / 1000L; + } } else { preparedRow[i] = noDictionaryKeyByIndex; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java index 2f49ef2..791b4c6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java @@ -131,8 +131,13 @@ public class TablePage { columnPageEncoderMeta, pageSize, localDictionaryGenerator, false); } else { if (DataTypeUtil.isPrimitiveColumn(spec.getSchemaDataType())) { - columnPageEncoderMeta = - new ColumnPageEncoderMeta(spec, spec.getSchemaDataType(), columnCompressor); + if (spec.getSchemaDataType() == DataTypes.TIMESTAMP) { + columnPageEncoderMeta = + new ColumnPageEncoderMeta(spec, DataTypes.LONG, columnCompressor); + } else { + columnPageEncoderMeta = + new ColumnPageEncoderMeta(spec, spec.getSchemaDataType(), columnCompressor); + } // create the column page according to the data type for no dictionary numeric columns if (DataTypes.isDecimal(spec.getSchemaDataType())) { page = ColumnPage.newDecimalPage(columnPageEncoderMeta, pageSize); @@ -147,7 +152,12 @@ public class TablePage { if (DataTypes.VARCHAR == dataType) { page.setStatsCollector(LVLongStringStatsCollector.newInstance()); } else if (DataTypeUtil.isPrimitiveColumn(spec.getSchemaDataType())) { - page.setStatsCollector(PrimitivePageStatsCollector.newInstance(spec.getSchemaDataType())); + if (spec.getSchemaDataType() == DataTypes.TIMESTAMP) { + page.setStatsCollector(PrimitivePageStatsCollector.newInstance(DataTypes.LONG)); + } else { + page.setStatsCollector( + PrimitivePageStatsCollector.newInstance(spec.getSchemaDataType())); + } } else { page.setStatsCollector(LVShortStringStatsCollector.newInstance()); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5443b227/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java index eeb9cc1..0d2a889 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java @@ -156,7 +156,8 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> { // initialize parser and converter rowParser = new RowParserImpl(dataFields, configuration); badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(configuration); - converter = new RowConverterImpl(configuration.getDataFields(), configuration, badRecordLogger); + converter = + new RowConverterImpl(configuration.getDataFields(), configuration, badRecordLogger, true); configuration.setCardinalityFinder(converter); converter.initialize();
