http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java index 29e3060..29a4098 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java @@ -27,6 +27,7 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; /** * Implementation for BloomFilter DataMap to rebuild the datamap for main table with existing data @@ -61,8 +62,12 @@ public class BloomDataMapBuilder extends AbstractBloomDataMapWriter implements D } @Override - protected byte[] convertNonDictionaryValue(int indexColIdx, byte[] value) { - return value; + protected byte[] convertNonDictionaryValue(int indexColIdx, Object value) { + // no dictionary measure columns will be of original data, so convert it to bytes + if (DataTypeUtil.isPrimitiveColumn(indexColumns.get(indexColIdx).getDataType())) { + return CarbonUtil.getValueAsBytes(indexColumns.get(indexColIdx).getDataType(), value); + } + return (byte[]) value; } @Override
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java index cad9787..61bd036 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java @@ -29,6 +29,7 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.Predicate; @@ -73,11 +74,14 @@ public class BloomDataMapWriter extends AbstractBloomDataMapWriter { } } - protected byte[] convertNonDictionaryValue(int indexColIdx, byte[] value) { + protected byte[] convertNonDictionaryValue(int indexColIdx, Object value) { if (DataTypes.VARCHAR == indexColumns.get(indexColIdx).getDataType()) { - return DataConvertUtil.getRawBytesForVarchar(value); + return DataConvertUtil.getRawBytesForVarchar((byte[]) value); + } else if (DataTypeUtil.isPrimitiveColumn(indexColumns.get(indexColIdx).getDataType())) { + // get bytes for the original value of the no dictionary column + return CarbonUtil.getValueAsBytes(indexColumns.get(indexColIdx).getDataType(), value); } else { - return DataConvertUtil.getRawBytes(value); + return DataConvertUtil.getRawBytes((byte[]) value); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java index 7cd241a..5525941 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java @@ -245,7 +245,7 @@ public class StoreCreator { date.setEncodingList(encodings); date.setColumnUniqueId(UUID.randomUUID().toString()); date.setDimensionColumn(true); - date.setColumnReferenceId(id.getColumnUniqueId()); + date.setColumnReferenceId(date.getColumnUniqueId()); date.setSchemaOrdinal(schemaOrdinal++); if (sortColumns.contains(date.getColumnName())) { date.setSortColumn(true); @@ -263,7 +263,7 @@ public class StoreCreator { if (sortColumns.contains(country.getColumnName())) { country.setSortColumn(true); } - country.setColumnReferenceId(id.getColumnUniqueId()); + country.setColumnReferenceId(country.getColumnUniqueId()); columnSchemas.add(country); ColumnSchema name = new ColumnSchema(); @@ -276,7 +276,7 @@ public class StoreCreator { if (sortColumns.contains(name.getColumnName())) { name.setSortColumn(true); } - name.setColumnReferenceId(id.getColumnUniqueId()); + name.setColumnReferenceId(name.getColumnUniqueId()); columnSchemas.add(name); ColumnSchema phonetype = new ColumnSchema(); @@ -289,7 +289,7 @@ public class StoreCreator { if (sortColumns.contains(phonetype.getColumnName())) { phonetype.setSortColumn(true); } - phonetype.setColumnReferenceId(id.getColumnUniqueId()); + phonetype.setColumnReferenceId(phonetype.getColumnUniqueId()); columnSchemas.add(phonetype); ColumnSchema serialname = new ColumnSchema(); @@ -302,7 +302,7 @@ public class StoreCreator { if (sortColumns.contains(serialname.getColumnName())) { serialname.setSortColumn(true); } - serialname.setColumnReferenceId(id.getColumnUniqueId()); + serialname.setColumnReferenceId(serialname.getColumnUniqueId()); columnSchemas.add(serialname); ColumnSchema salary = new ColumnSchema(); salary.setColumnName("salary"); @@ -310,11 +310,13 @@ public class StoreCreator { salary.setEncodingList(new ArrayList<Encoding>()); salary.setColumnUniqueId(UUID.randomUUID().toString()); salary.setDimensionColumn(false); - salary.setColumnReferenceId(id.getColumnUniqueId()); + salary.setColumnReferenceId(salary.getColumnUniqueId()); salary.setSchemaOrdinal(schemaOrdinal++); columnSchemas.add(salary); - tableSchema.setListOfColumns(columnSchemas); + // rearrange the column schema based on the sort order, if sort columns exists + List<ColumnSchema> columnSchemas1 = reArrangeColumnSchema(columnSchemas); + tableSchema.setListOfColumns(columnSchemas1); SchemaEvolution schemaEvol = new SchemaEvolution(); schemaEvol.setSchemaEvolutionEntryList(new ArrayList<SchemaEvolutionEntry>()); tableSchema.setSchemaEvolution(schemaEvol); @@ -352,6 +354,29 @@ public class StoreCreator { return CarbonMetadata.getInstance().getCarbonTable(tableInfo.getTableUniqueName()); } + private List<ColumnSchema> reArrangeColumnSchema(List<ColumnSchema> columnSchemas) { + List<ColumnSchema> newColumnSchema = new ArrayList<>(columnSchemas.size()); + // add sort columns first + for (ColumnSchema columnSchema : columnSchemas) { + if (columnSchema.isSortColumn()) { + newColumnSchema.add(columnSchema); + } + } + // add other dimension columns + for (ColumnSchema columnSchema : columnSchemas) { + if (!columnSchema.isSortColumn() && columnSchema.isDimensionColumn()) { + newColumnSchema.add(columnSchema); + } + } + // add measure columns + for (ColumnSchema columnSchema : columnSchemas) { + if (!columnSchema.isDimensionColumn()) { + newColumnSchema.add(columnSchema); + } + } + return newColumnSchema; + } + private void writeDictionary(String factFilePath, CarbonTable table) throws Exception { BufferedReader reader = new BufferedReader(new InputStreamReader( new FileInputStream(factFilePath), "UTF-8")); http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index be40b13..e810829 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -86,7 +86,9 @@ object DataLoadProcessBuilderOnSpark { val sortParameters = SortParameters.createSortParameters(configuration) val rowComparator: Comparator[Array[AnyRef]] = if (sortParameters.getNoDictionaryCount > 0) { - new NewRowComparator(sortParameters.getNoDictionaryDimnesionColumn) + new NewRowComparator(sortParameters.getNoDictionaryDimnesionColumn, + sortParameters.getNoDictionarySortColumn, + sortParameters.getNoDictDataType) } else { new NewRowComparatorForNormalDims(sortParameters.getDimColCount) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala index c97732a..727191c 100644 --- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala +++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala @@ -805,7 +805,7 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll { model.setSegmentId("0") store.createCarbonStore(model) FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/testdb/testtable/Fact/Part0/Segment_0/0")) - store.setSortColumns(new util.ArrayList[String](Seq("country,phonetype").asJava)) + store.setSortColumns(new util.ArrayList[String](Seq("country","phonetype").asJava)) model = store.createTableAndLoadModel(false) model.setSegmentId("1") store.createCarbonStore(model) http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala index 0584fb1..1897c87 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala @@ -52,7 +52,7 @@ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSch import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.core.util.{CarbonUtil, TaskMetricsMap} +import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil, TaskMetricsMap} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.datamap.bloom.DataConvertUtil import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus} @@ -264,8 +264,17 @@ class RawBytesReadSupport(segmentProperties: SegmentProperties, indexColumns: Ar rtn(i) = if (indexCol2IdxInDictArray.contains(col.getColName)) { surrogatKeys(indexCol2IdxInDictArray(col.getColName)).toInt.asInstanceOf[Integer] } else if (indexCol2IdxInNoDictArray.contains(col.getColName)) { - data(0).asInstanceOf[ByteArrayWrapper].getNoDictionaryKeyByIndex( + val bytes = data(0).asInstanceOf[ByteArrayWrapper].getNoDictionaryKeyByIndex( indexCol2IdxInNoDictArray(col.getColName)) + // no dictionary primitive columns are expected to be in original data while loading, + // so convert it to original data + if (DataTypeUtil.isPrimitiveColumn(col.getDataType)) { + val dataFromBytes = DataTypeUtil + .getDataBasedOnDataTypeForNoDictionaryColumn(bytes, col.getDataType) + dataFromBytes + } else { + bytes + } } else { // measures start from 1 val value = data(1 + indexCol2IdxInMeasureArray(col.getColName)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala index ad6823d..fcb6110 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala @@ -42,10 +42,10 @@ class CarbonGetTableDetailCommandTestCase extends QueryTest with BeforeAndAfterA assertResult(2)(result.length) assertResult("table_info1")(result(0).getString(0)) - // 2087 is the size of carbon table. Note that since 1.5.0, we add additional compressor name in metadata - assertResult(2216)(result(0).getLong(1)) + // 2220 is the size of carbon table. Note that since 1.5.0, we add additional compressor name in metadata + assertResult(2220)(result(0).getLong(1)) assertResult("table_info2")(result(1).getString(0)) - assertResult(2216)(result(1).getLong(1)) + assertResult(2220)(result(1).getLong(1)) } override def afterAll: Unit = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java index 4d85296..616edeb 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java @@ -28,6 +28,7 @@ import org.apache.carbondata.core.keygenerator.KeyGenerator; import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.BucketingInfo; import org.apache.carbondata.core.metadata.schema.SortColumnRangeInfo; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; @@ -339,6 +340,45 @@ public class CarbonDataLoadConfiguration { return type; } + /** + * Get the data types of the no dictionary and the complex dimensions of the table + * + * @return + */ + public CarbonColumn[] getNoDictAndComplexDimensions() { + List<Integer> noDicOrCompIndexes = new ArrayList<>(dataFields.length); + int noDicCount = 0; + for (int i = 0; i < dataFields.length; i++) { + if (dataFields[i].getColumn().isDimension() && ( + !(dataFields[i].getColumn().hasEncoding(Encoding.DICTIONARY)) || dataFields[i].getColumn() + .isComplex())) { + noDicOrCompIndexes.add(i); + noDicCount++; + } + } + + CarbonColumn[] dims = new CarbonColumn[noDicCount]; + for (int i = 0; i < dims.length; i++) { + dims[i] = dataFields[noDicOrCompIndexes.get(i)].getColumn(); + } + return dims; + } + + /** + * Get the sort column mapping of the table + * + * @return + */ + public boolean[] getSortColumnMapping() { + boolean[] sortColumnMapping = new boolean[dataFields.length]; + for (int i = 0; i < sortColumnMapping.length; i++) { + if (dataFields[i].getColumn().getColumnSchema().isSortColumn()) { + sortColumnMapping[i] = true; + } + } + return sortColumnMapping; + } + public int[] calcDimensionLengths() { int[] dimLensWithComplex = getCardinalityFinder().getCardinality(); if (!isSortTable()) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java index 86f273d..7dfe95f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java @@ -32,6 +32,7 @@ import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation; +import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.processing.datatypes.ArrayDataType; import org.apache.carbondata.processing.datatypes.GenericDataType; import org.apache.carbondata.processing.datatypes.PrimitiveDataType; @@ -111,6 +112,11 @@ public class FieldEncoderFactory { createComplexDataType(dataField, absoluteTableIdentifier, client, useOnePass, localCache, index, nullFormat, isEmptyBadRecord), index); } else { + // if the no dictionary column is a numeric column then treat is as measure col + // so that the adaptive encoding can be applied on it easily + if (DataTypeUtil.isPrimitiveColumn(dataField.getColumn().getDataType())) { + return new MeasureFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord); + } return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord); } } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java index 9cbd607..20278e4 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java @@ -20,8 +20,6 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.processing.loading.DataField; import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder; @@ -39,10 +37,6 @@ public class MeasureFieldConverterImpl implements FieldConverter { private int index; - private DataType dataType; - - private CarbonMeasure measure; - private String nullformat; private boolean isEmptyBadRecord; @@ -51,8 +45,6 @@ public class MeasureFieldConverterImpl implements FieldConverter { public MeasureFieldConverterImpl(DataField dataField, String nullformat, int index, boolean isEmptyBadRecord) { - this.dataType = dataField.getColumn().getDataType(); - this.measure = (CarbonMeasure) dataField.getColumn(); this.nullformat = nullformat; this.index = index; this.isEmptyBadRecord = isEmptyBadRecord; @@ -73,20 +65,20 @@ public class MeasureFieldConverterImpl implements FieldConverter { Object output; boolean isNull = CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(literalValue); if (literalValue == null || isNull) { - String message = logHolder.getColumnMessageMap().get(measure.getColName()); + String message = logHolder.getColumnMessageMap().get(dataField.getColumn().getColName()); if (null == message) { - message = CarbonDataProcessorUtil - .prepareFailureReason(measure.getColName(), measure.getDataType()); - logHolder.getColumnMessageMap().put(measure.getColName(), message); + message = CarbonDataProcessorUtil.prepareFailureReason(dataField.getColumn().getColName(), + dataField.getColumn().getDataType()); + logHolder.getColumnMessageMap().put(dataField.getColumn().getColName(), message); } return null; } else if (literalValue.length() == 0) { if (isEmptyBadRecord) { - String message = logHolder.getColumnMessageMap().get(measure.getColName()); + String message = logHolder.getColumnMessageMap().get(dataField.getColumn().getColName()); if (null == message) { - message = CarbonDataProcessorUtil - .prepareFailureReason(measure.getColName(), measure.getDataType()); - logHolder.getColumnMessageMap().put(measure.getColName(), message); + message = CarbonDataProcessorUtil.prepareFailureReason(dataField.getColumn().getColName(), + dataField.getColumn().getDataType()); + logHolder.getColumnMessageMap().put(dataField.getColumn().getColName(), message); } logHolder.setReason(message); } @@ -96,18 +88,24 @@ public class MeasureFieldConverterImpl implements FieldConverter { } else { try { if (dataField.isUseActualData()) { - output = - DataTypeUtil.getMeasureValueBasedOnDataType(literalValue, dataType, measure, true); + output = DataTypeUtil + .getMeasureValueBasedOnDataType(literalValue, dataField.getColumn().getDataType(), + dataField.getColumn().getColumnSchema().getScale(), + dataField.getColumn().getColumnSchema().getPrecision(), true); } else { - output = DataTypeUtil.getMeasureValueBasedOnDataType(literalValue, dataType, measure); + output = DataTypeUtil + .getMeasureValueBasedOnDataType(literalValue, dataField.getColumn().getDataType(), + dataField.getColumn().getColumnSchema().getScale(), + dataField.getColumn().getColumnSchema().getPrecision()); } return output; } catch (NumberFormatException e) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Can not convert value to Numeric type value. Value considered as null."); } - logHolder.setReason( - CarbonDataProcessorUtil.prepareFailureReason(measure.getColName(), dataType)); + logHolder.setReason(CarbonDataProcessorUtil + .prepareFailureReason(dataField.getColumn().getColName(), + dataField.getColumn().getDataType())); return null; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java index 64b64f5..3a325a4 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java @@ -21,7 +21,10 @@ import java.util.Comparator; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.util.comparator.SerializableComparator; /** * comparator for the converted row. The row has not been rearranged as 3-parted yet. @@ -30,23 +33,38 @@ import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer; public class RawRowComparator implements Comparator<CarbonRow> { private int[] sortColumnIndices; private boolean[] isSortColumnNoDict; + private DataType[] noDicDataTypes; - public RawRowComparator(int[] sortColumnIndices, boolean[] isSortColumnNoDict) { + public RawRowComparator(int[] sortColumnIndices, boolean[] isSortColumnNoDict, + DataType[] noDicDataTypes) { this.sortColumnIndices = sortColumnIndices; this.isSortColumnNoDict = isSortColumnNoDict; + this.noDicDataTypes = noDicDataTypes; } @Override public int compare(CarbonRow o1, CarbonRow o2) { int diff = 0; int i = 0; + int noDicIdx = 0; for (int colIdx : sortColumnIndices) { if (isSortColumnNoDict[i]) { - byte[] colA = (byte[]) o1.getObject(colIdx); - byte[] colB = (byte[]) o2.getObject(colIdx); - diff = UnsafeComparer.INSTANCE.compareTo(colA, colB); - if (diff != 0) { - return diff; + if (DataTypeUtil.isPrimitiveColumn(noDicDataTypes[noDicIdx])) { + // for no dictionary numeric column get comparator based on the data type + SerializableComparator comparator = org.apache.carbondata.core.util.comparator.Comparator + .getComparator(noDicDataTypes[noDicIdx]); + int difference = comparator.compare(o1.getObject(colIdx), o2.getObject(colIdx)); + if (difference != 0) { + return difference; + } + noDicIdx++; + } else { + byte[] colA = (byte[]) o1.getObject(colIdx); + byte[] colB = (byte[]) o2.getObject(colIdx); + diff = UnsafeComparer.INSTANCE.compareTo(colA, colB); + if (diff != 0) { + return diff; + } } } else { int colA = (int) o1.getObject(colIdx); http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java index 1ad7879..844e45e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java @@ -23,7 +23,7 @@ package org.apache.carbondata.processing.loading.row; */ public class IntermediateSortTempRow { private int[] dictSortDims; - private byte[][] noDictSortDims; + private Object[] noDictSortDims; /** * this will be used for intermediate merger when * no sort field and measure field will not be @@ -35,14 +35,14 @@ public class IntermediateSortTempRow { */ private Object[] measures; - public IntermediateSortTempRow(int[] dictSortDims, byte[][] noDictSortDims, + public IntermediateSortTempRow(int[] dictSortDims, Object[] noDictSortDims, byte[] noSortDimsAndMeasures) { this.dictSortDims = dictSortDims; this.noDictSortDims = noDictSortDims; this.noSortDimsAndMeasures = noSortDimsAndMeasures; } - public IntermediateSortTempRow(int[] dictSortDims, byte[][] noDictSortDims, + public IntermediateSortTempRow(int[] dictSortDims, Object[] noDictSortDims, Object[] measures) { this.dictSortDims = dictSortDims; this.noDictSortDims = noDictSortDims; @@ -57,7 +57,7 @@ public class IntermediateSortTempRow { return measures; } - public byte[][] getNoDictSortDims() { + public Object[] getNoDictSortDims() { return noDictSortDims; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java index 697f590..edfd317 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java @@ -23,10 +23,13 @@ import java.io.IOException; import java.io.Serializable; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.memory.CarbonUnsafe; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.util.CarbonUnsafeUtil; import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.core.util.NonDictionaryUtil; import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; @@ -65,6 +68,14 @@ public class SortStepRowHandler implements Serializable { private DataType[] dataTypes; + private DataType[] noDictSortDataTypes; + + private boolean[] noDictSortColMapping; + + private DataType[] noDictNoSortDataTypes; + + private boolean[] noDictNoSortColMapping; + /** * constructor * @param tableFieldStat table field stat @@ -85,6 +96,16 @@ public class SortStepRowHandler implements Serializable { this.complexDimIdx = tableFieldStat.getComplexDimIdx(); this.measureIdx = tableFieldStat.getMeasureIdx(); this.dataTypes = tableFieldStat.getMeasureDataType(); + this.noDictSortDataTypes = tableFieldStat.getNoDictSortDataType(); + noDictSortColMapping = new boolean[noDictSortDataTypes.length]; + for (int i = 0; i < noDictSortDataTypes.length; i++) { + noDictSortColMapping[i] = DataTypeUtil.isPrimitiveColumn(noDictSortDataTypes[i]); + } + this.noDictNoSortDataTypes = tableFieldStat.getNoDictNoSortDataType(); + noDictNoSortColMapping = new boolean[noDictNoSortDataTypes.length]; + for (int i = 0; i < noDictNoSortDataTypes.length; i++) { + noDictNoSortColMapping[i] = DataTypeUtil.isPrimitiveColumn(noDictNoSortDataTypes[i]); + } } /** @@ -108,8 +129,8 @@ public class SortStepRowHandler implements Serializable { try { int[] dictDims = new int[this.dictSortDimCnt + this.dictNoSortDimCnt]; - byte[][] nonDictArray = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt - + this.varcharDimCnt + this.complexDimCnt ][]; + Object[] nonDictArray = new Object[this.noDictSortDimCnt + this.noDictNoSortDimCnt + + this.varcharDimCnt + this.complexDimCnt]; Object[] measures = new Object[this.measureCnt]; // convert dict & data @@ -125,19 +146,19 @@ public class SortStepRowHandler implements Serializable { // convert no-dict & sort idxAcc = 0; for (int idx = 0; idx < this.noDictSortDimCnt; idx++) { - nonDictArray[idxAcc++] = (byte[]) row[this.noDictSortDimIdx[idx]]; + nonDictArray[idxAcc++] = row[this.noDictSortDimIdx[idx]]; } // convert no-dict & no-sort for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) { - nonDictArray[idxAcc++] = (byte[]) row[this.noDictNoSortDimIdx[idx]]; + nonDictArray[idxAcc++] = row[this.noDictNoSortDimIdx[idx]]; } // convert varchar dims for (int idx = 0; idx < this.varcharDimCnt; idx++) { - nonDictArray[idxAcc++] = (byte[]) row[this.varcharDimIdx[idx]]; + nonDictArray[idxAcc++] = row[this.varcharDimIdx[idx]]; } // convert complex dims for (int idx = 0; idx < this.complexDimCnt; idx++) { - nonDictArray[idxAcc++] = (byte[]) row[this.complexDimIdx[idx]]; + nonDictArray[idxAcc++] = row[this.complexDimIdx[idx]]; } // convert measure data @@ -178,7 +199,7 @@ public class SortStepRowHandler implements Serializable { public IntermediateSortTempRow readWithoutNoSortFieldConvert( DataInputStream inputStream) throws IOException { int[] dictSortDims = new int[this.dictSortDimCnt]; - byte[][] noDictSortDims = new byte[this.noDictSortDimCnt][]; + Object[] noDictSortDims = new Object[this.noDictSortDimCnt]; // read dict & sort dim data for (int idx = 0; idx < this.dictSortDimCnt; idx++) { @@ -187,10 +208,8 @@ public class SortStepRowHandler implements Serializable { // read no-dict & sort data for (int idx = 0; idx < this.noDictSortDimCnt; idx++) { - short len = inputStream.readShort(); - byte[] bytes = new byte[len]; - inputStream.readFully(bytes); - noDictSortDims[idx] = bytes; + // for no dict measure column get the original data + noDictSortDims[idx] = getDataForNoDictSortColumn(inputStream, idx); } // read no-dict dims & measures @@ -213,9 +232,9 @@ public class SortStepRowHandler implements Serializable { public IntermediateSortTempRow readWithNoSortFieldConvert( DataInputStream inputStream) throws IOException { int[] dictSortDims = new int[this.dictSortDimCnt + this.dictNoSortDimCnt]; - byte[][] noDictSortDims = - new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt - + this.complexDimCnt][]; + Object[] noDictSortDims = + new Object[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt + + this.complexDimCnt]; // read dict & sort dim data for (int idx = 0; idx < this.dictSortDimCnt; idx++) { @@ -224,10 +243,8 @@ public class SortStepRowHandler implements Serializable { // read no-dict & sort data for (int idx = 0; idx < this.noDictSortDimCnt; idx++) { - short len = inputStream.readShort(); - byte[] bytes = new byte[len]; - inputStream.readFully(bytes); - noDictSortDims[idx] = bytes; + // for no dict measure column get the original data + noDictSortDims[idx] = getDataForNoDictSortColumn(inputStream, idx); } // read no-dict dims & measures @@ -240,8 +257,63 @@ public class SortStepRowHandler implements Serializable { return new IntermediateSortTempRow(dictSortDims, noDictSortDims,measure); } + /** + * Return the data from the stream according to the column type + * + * @param inputStream + * @param idx + * @throws IOException + */ + private Object getDataForNoDictSortColumn(DataInputStream inputStream, int idx) + throws IOException { + if (this.noDictSortColMapping[idx]) { + return readDataFromStream(inputStream, idx); + } else { + short len = inputStream.readShort(); + byte[] bytes = new byte[len]; + inputStream.readFully(bytes); + return bytes; + } + } + + /** + * Read the data from the stream + * + * @param inputStream + * @param idx + * @return + * @throws IOException + */ + private Object readDataFromStream(DataInputStream inputStream, int idx) throws IOException { + DataType dataType = noDictSortDataTypes[idx]; + Object data = null; + if (!inputStream.readBoolean()) { + return null; + } + if (dataType == DataTypes.BOOLEAN) { + data = inputStream.readBoolean(); + } else if (dataType == DataTypes.BYTE) { + data = inputStream.readByte(); + } else if (dataType == DataTypes.SHORT) { + data = inputStream.readShort(); + } else if (dataType == DataTypes.INT) { + data = inputStream.readInt(); + } else if (dataType == DataTypes.LONG) { + data = inputStream.readLong(); + } else if (dataType == DataTypes.DOUBLE) { + data = inputStream.readDouble(); + } else if (dataType == DataTypes.FLOAT) { + data = inputStream.readFloat(); + } else if (dataType == DataTypes.BYTE_ARRAY || DataTypes.isDecimal(dataType)) { + byte[] bytes = + inputStream.readUTF().getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); + data = bytes; + } + return data; + } + private void unpackNoSortFromBytes(byte[] noSortDimsAndMeasures, int[] dictDims, - byte[][] noDictDims, Object[] measures) { + Object[] noDictDims, Object[] measures) { ByteBuffer rowBuffer = ByteBuffer.wrap(noSortDimsAndMeasures); // read dict_no_sort for (int i = dictSortDimCnt; i < dictDims.length; i++) { @@ -251,10 +323,15 @@ public class SortStepRowHandler implements Serializable { int noDictIndex = noDictSortDimCnt; // read no_dict_no_sort for (int i = 0; i < noDictNoSortDimCnt; i++) { - short len = rowBuffer.getShort(); - byte[] bytes = new byte[len]; - rowBuffer.get(bytes); - noDictDims[noDictIndex++] = bytes; + // for no dict measure column get the original data + if (this.noDictNoSortColMapping[i]) { + noDictDims[noDictIndex++] = getDataFromRowBuffer(noDictNoSortDataTypes[i], rowBuffer); + } else { + short len = rowBuffer.getShort(); + byte[] bytes = new byte[len]; + rowBuffer.get(bytes); + noDictDims[noDictIndex++] = bytes; + } } // read varchar dims @@ -275,39 +352,49 @@ public class SortStepRowHandler implements Serializable { // read measure int measureCnt = measures.length; - DataType tmpDataType; Object tmpContent; for (short idx = 0 ; idx < measureCnt; idx++) { - if ((byte) 0 == rowBuffer.get()) { - measures[idx] = null; - continue; - } + tmpContent = getDataFromRowBuffer(dataTypes[idx], rowBuffer); + measures[idx] = tmpContent; + } + } - tmpDataType = dataTypes[idx]; - if (DataTypes.BOOLEAN == tmpDataType) { - if ((byte) 1 == rowBuffer.get()) { - tmpContent = true; - } else { - tmpContent = false; - } - } else if (DataTypes.SHORT == tmpDataType) { - tmpContent = rowBuffer.getShort(); - } else if (DataTypes.INT == tmpDataType) { - tmpContent = rowBuffer.getInt(); - } else if (DataTypes.LONG == tmpDataType) { - tmpContent = rowBuffer.getLong(); - } else if (DataTypes.DOUBLE == tmpDataType) { - tmpContent = rowBuffer.getDouble(); - } else if (DataTypes.isDecimal(tmpDataType)) { - short len = rowBuffer.getShort(); - byte[] decimalBytes = new byte[len]; - rowBuffer.get(decimalBytes); - tmpContent = DataTypeUtil.byteToBigDecimal(decimalBytes); + /** + * Retrieve/Get the data from the row buffer. + * + * @param tmpDataType + * @param rowBuffer + * @return + */ + private Object getDataFromRowBuffer(DataType tmpDataType, ByteBuffer rowBuffer) { + Object tmpContent; + if ((byte) 0 == rowBuffer.get()) { + return null; + } + + if (DataTypes.BOOLEAN == tmpDataType) { + if ((byte) 1 == rowBuffer.get()) { + tmpContent = true; } else { - throw new IllegalArgumentException("Unsupported data type: " + tmpDataType); + tmpContent = false; } - measures[idx] = tmpContent; + } else if (DataTypes.SHORT == tmpDataType) { + tmpContent = rowBuffer.getShort(); + } else if (DataTypes.INT == tmpDataType) { + tmpContent = rowBuffer.getInt(); + } else if (DataTypes.LONG == tmpDataType) { + tmpContent = rowBuffer.getLong(); + } else if (DataTypes.DOUBLE == tmpDataType) { + tmpContent = rowBuffer.getDouble(); + } else if (DataTypes.isDecimal(tmpDataType)) { + short len = rowBuffer.getShort(); + byte[] decimalBytes = new byte[len]; + rowBuffer.get(decimalBytes); + tmpContent = DataTypeUtil.byteToBigDecimal(decimalBytes); + } else { + throw new IllegalArgumentException("Unsupported data type: " + tmpDataType); } + return tmpContent; } /** @@ -327,9 +414,14 @@ public class SortStepRowHandler implements Serializable { // write no-dict & sort dim for (int idx = 0; idx < this.noDictSortDimCnt; idx++) { - byte[] bytes = sortTempRow.getNoDictSortDims()[idx]; - outputStream.writeShort(bytes.length); - outputStream.write(bytes); + if (this.noDictSortColMapping[idx]) { + // write the original data to the stream + writeDataToStream(sortTempRow.getNoDictSortDims()[idx], outputStream, idx); + } else { + byte[] bytes = (byte[]) sortTempRow.getNoDictSortDims()[idx]; + outputStream.writeShort(bytes.length); + outputStream.write(bytes); + } } // write packed no-sort dim & measure @@ -359,9 +451,14 @@ public class SortStepRowHandler implements Serializable { // write no-dict & sort for (int idx = 0; idx < this.noDictSortDimCnt; idx++) { - byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]]; - outputStream.writeShort(bytes.length); - outputStream.write(bytes); + if (this.noDictSortColMapping[idx]) { + // write the original data to the stream + writeDataToStream(row[this.noDictSortDimIdx[idx]], outputStream, idx); + } else { + byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]]; + outputStream.writeShort(bytes.length); + outputStream.write(bytes); + } } // pack no-sort @@ -376,6 +473,46 @@ public class SortStepRowHandler implements Serializable { } /** + * Write the data to stream + * + * @param data + * @param outputStream + * @param idx + * @throws IOException + */ + private void writeDataToStream(Object data, DataOutputStream outputStream, int idx) + throws IOException { + DataType dataType = noDictSortDataTypes[idx]; + if (null == data) { + outputStream.writeBoolean(false); + } else { + outputStream.writeBoolean(true); + if (dataType == DataTypes.BOOLEAN) { + outputStream.writeBoolean((boolean) data); + } else if (dataType == DataTypes.BYTE) { + outputStream.writeByte((byte) data); + } else if (dataType == DataTypes.SHORT) { + outputStream.writeShort((short) data); + } else if (dataType == DataTypes.INT) { + outputStream.writeInt((int) data); + } else if (dataType == DataTypes.LONG) { + outputStream.writeLong((long) data); + } else if (dataType == DataTypes.DOUBLE) { + outputStream.writeDouble((double) data); + } else if (DataTypes.isDecimal(dataType)) { + BigDecimal val = (BigDecimal) data; + byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val); + outputStream.writeShort(bigDecimalInBytes.length); + outputStream.write(bigDecimalInBytes); + } else if (dataType == DataTypes.FLOAT) { + outputStream.writeFloat((float) data); + } else if (dataType == DataTypes.BYTE_ARRAY) { + outputStream.writeUTF(data.toString()); + } + } + } + + /** * Read intermediate sort temp row from unsafe memory. * This method is used during merge sort phase for off-heap sort. * @@ -430,9 +567,9 @@ public class SortStepRowHandler implements Serializable { int size = 0; int[] dictSortDims = new int[this.dictSortDimCnt + this.dictNoSortDimCnt]; - byte[][] noDictSortDims = - new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt - + this.complexDimCnt][]; + Object[] noDictSortDims = + new Object[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt + + this.complexDimCnt]; // read dict & sort dim for (int idx = 0; idx < dictSortDimCnt; idx++) { @@ -444,11 +581,24 @@ public class SortStepRowHandler implements Serializable { for (int idx = 0; idx < this.noDictSortDimCnt; idx++) { short length = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size); size += 2; - byte[] bytes = new byte[length]; - CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, - bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length); - size += length; - noDictSortDims[idx] = bytes; + if (this.noDictSortColMapping[idx]) { + // get the original data from the unsafe memory + if (0 == length) { + // if the length is 0, the the data is null + noDictSortDims[idx] = null; + } else { + Object data = CarbonUnsafeUtil + .getDataFromUnsafe(noDictSortDataTypes[idx], baseObject, address, size, length); + size += length; + noDictSortDims[idx] = data; + } + } else { + byte[] bytes = new byte[length]; + CarbonUnsafe.getUnsafe() + .copyMemory(baseObject, address + size, bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length); + size += length; + noDictSortDims[idx] = bytes; + } } // read no-sort dims & measures @@ -487,13 +637,26 @@ public class SortStepRowHandler implements Serializable { for (int idx = 0; idx < noDictSortDimCnt; idx++) { short length = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size); size += 2; - byte[] bytes = new byte[length]; - CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, - bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length); - size += length; + if (this.noDictSortColMapping[idx]) { + // get the original data from unsafe memory + if (0 == length) { + // if the length is 0, then the data is null + writeDataToStream(null, outputStream, idx); + } else { + Object data = CarbonUnsafeUtil + .getDataFromUnsafe(noDictSortDataTypes[idx], baseObject, address, size, length); + size += length; + writeDataToStream(data, outputStream, idx); + } + } else { + byte[] bytes = new byte[length]; + CarbonUnsafe.getUnsafe() + .copyMemory(baseObject, address + size, bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length); + size += length; - outputStream.writeShort(length); - outputStream.write(bytes); + outputStream.writeShort(length); + outputStream.write(bytes); + } } // packed no-sort & measure @@ -534,13 +697,31 @@ public class SortStepRowHandler implements Serializable { // write no-dict & sort for (int idx = 0; idx < this.noDictSortDimCnt; idx++) { - byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]]; - CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) bytes.length); - size += 2; - CarbonUnsafe.getUnsafe() - .copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, address + size, - bytes.length); - size += bytes.length; + if (this.noDictSortColMapping[idx]) { + Object data = row[this.noDictSortDimIdx[idx]]; + if (null == data) { + // if the data is null, then write only the length as 0. + CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) 0); + size += 2; + } else { + int sizeInBytes = this.noDictSortDataTypes[idx].getSizeInBytes(); + CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) sizeInBytes); + size += 2; + // put data to unsafe according to the data types + CarbonUnsafeUtil + .putDataToUnsafe(noDictSortDataTypes[idx], data, baseObject, address, size, + sizeInBytes); + size += sizeInBytes; + } + } else { + byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]]; + CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) bytes.length); + size += 2; + CarbonUnsafe.getUnsafe() + .copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, address + size, + bytes.length); + size += bytes.length; + } } // convert pack no-sort @@ -574,9 +755,15 @@ public class SortStepRowHandler implements Serializable { } // convert no-dict & no-sort for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) { - byte[] bytes = (byte[]) row[this.noDictNoSortDimIdx[idx]]; - rowBuffer.putShort((short) bytes.length); - rowBuffer.put(bytes); + if (this.noDictNoSortColMapping[idx]) { + // put the original data to buffer + putDataToRowBuffer(this.noDictNoSortDataTypes[idx], row[this.noDictNoSortDimIdx[idx]], + rowBuffer); + } else { + byte[] bytes = (byte[]) row[this.noDictNoSortDimIdx[idx]]; + rowBuffer.putShort((short) bytes.length); + rowBuffer.put(bytes); + } } // convert varchar dims for (int idx = 0; idx < this.varcharDimCnt; idx++) { @@ -592,37 +779,45 @@ public class SortStepRowHandler implements Serializable { } // convert measure - Object tmpValue; - DataType tmpDataType; for (int idx = 0; idx < this.measureCnt; idx++) { - tmpValue = row[this.measureIdx[idx]]; - tmpDataType = this.dataTypes[idx]; - if (null == tmpValue) { - rowBuffer.put((byte) 0); - continue; - } - rowBuffer.put((byte) 1); - if (DataTypes.BOOLEAN == tmpDataType) { - if ((boolean) tmpValue) { - rowBuffer.put((byte) 1); - } else { - rowBuffer.put((byte) 0); - } - } else if (DataTypes.SHORT == tmpDataType) { - rowBuffer.putShort((Short) tmpValue); - } else if (DataTypes.INT == tmpDataType) { - rowBuffer.putInt((Integer) tmpValue); - } else if (DataTypes.LONG == tmpDataType) { - rowBuffer.putLong((Long) tmpValue); - } else if (DataTypes.DOUBLE == tmpDataType) { - rowBuffer.putDouble((Double) tmpValue); - } else if (DataTypes.isDecimal(tmpDataType)) { - byte[] decimalBytes = DataTypeUtil.bigDecimalToByte((BigDecimal) tmpValue); - rowBuffer.putShort((short) decimalBytes.length); - rowBuffer.put(decimalBytes); + putDataToRowBuffer(this.dataTypes[idx], row[this.measureIdx[idx]], rowBuffer); + } + } + + /** + * Put the data to the row buffer + * + * @param tmpDataType + * @param tmpValue + * @param rowBuffer + */ + private void putDataToRowBuffer(DataType tmpDataType, Object tmpValue, ByteBuffer rowBuffer) { + if (null == tmpValue) { + rowBuffer.put((byte) 0); + return; + } + rowBuffer.put((byte) 1); + if (DataTypes.BOOLEAN == tmpDataType) { + if ((boolean) tmpValue) { + rowBuffer.put((byte) 1); } else { - throw new IllegalArgumentException("Unsupported data type: " + tmpDataType); + rowBuffer.put((byte) 0); } + } else if (DataTypes.SHORT == tmpDataType) { + rowBuffer.putShort((Short) tmpValue); + } else if (DataTypes.INT == tmpDataType) { + rowBuffer.putInt((Integer) tmpValue); + } else if (DataTypes.LONG == tmpDataType) { + rowBuffer.putLong((Long) tmpValue); + } else if (DataTypes.DOUBLE == tmpDataType) { + rowBuffer.putDouble((Double) tmpValue); + } else if (DataTypes.isDecimal(tmpDataType)) { + byte[] decimalBytes = DataTypeUtil.bigDecimalToByte((BigDecimal) tmpValue); + rowBuffer.putShort((short) decimalBytes.length); + rowBuffer.put(decimalBytes); + } else { + throw new IllegalArgumentException("Unsupported data type: " + tmpDataType); } } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java index 8f29cee..b0109fa 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java @@ -20,7 +20,11 @@ package org.apache.carbondata.processing.loading.sort.unsafe.comparator; import java.util.Comparator; import org.apache.carbondata.core.memory.CarbonUnsafe; +import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer; +import org.apache.carbondata.core.util.CarbonUnsafeUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.util.comparator.SerializableComparator; import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage; import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow; import org.apache.carbondata.processing.sort.sortdata.TableFieldStat; @@ -52,6 +56,7 @@ public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> { long rowA = rowL.address; long rowB = rowR.address; int sizeInDictPartA = 0; + int noDicSortIdx = 0; int sizeInNonDictPartA = 0; int sizeInDictPartB = 0; @@ -60,25 +65,50 @@ public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> { if (isNoDictionary) { short lengthA = CarbonUnsafe.getUnsafe().getShort(baseObjectL, rowA + dictSizeInMemory + sizeInNonDictPartA); - byte[] byteArr1 = new byte[lengthA]; sizeInNonDictPartA += 2; - CarbonUnsafe.getUnsafe() - .copyMemory(baseObjectL, rowA + dictSizeInMemory + sizeInNonDictPartA, - byteArr1, CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthA); - sizeInNonDictPartA += lengthA; - short lengthB = CarbonUnsafe.getUnsafe().getShort(baseObjectR, rowB + dictSizeInMemory + sizeInNonDictPartB); - byte[] byteArr2 = new byte[lengthB]; sizeInNonDictPartB += 2; - CarbonUnsafe.getUnsafe() - .copyMemory(baseObjectR, rowB + dictSizeInMemory + sizeInNonDictPartB, - byteArr2, CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthB); - sizeInNonDictPartB += lengthB; + DataType dataType = tableFieldStat.getNoDictDataType()[noDicSortIdx++]; + if (DataTypeUtil.isPrimitiveColumn(dataType)) { + Object data1 = null; + if (0 != lengthA) { + data1 = CarbonUnsafeUtil + .getDataFromUnsafe(dataType, baseObjectL, rowA + dictSizeInMemory, + sizeInNonDictPartA, lengthA); + sizeInNonDictPartA += lengthA; + } + Object data2 = null; + if (0 != lengthB) { + data2 = CarbonUnsafeUtil + .getDataFromUnsafe(dataType, baseObjectR, rowB + dictSizeInMemory, + sizeInNonDictPartB, lengthB); + sizeInNonDictPartB += lengthB; + } + // use the data type based comparator for the no dictionary encoded columns + SerializableComparator comparator = + org.apache.carbondata.core.util.comparator.Comparator.getComparator(dataType); + int difference = comparator.compare(data1, data2); + if (difference != 0) { + return difference; + } + } else { + byte[] byteArr1 = new byte[lengthA]; + CarbonUnsafe.getUnsafe() + .copyMemory(baseObjectL, rowA + dictSizeInMemory + sizeInNonDictPartA, byteArr1, + CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthA); + sizeInNonDictPartA += lengthA; + + byte[] byteArr2 = new byte[lengthB]; + CarbonUnsafe.getUnsafe() + .copyMemory(baseObjectR, rowB + dictSizeInMemory + sizeInNonDictPartB, byteArr2, + CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthB); + sizeInNonDictPartB += lengthB; - int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2); - if (difference != 0) { - return difference; + int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2); + if (difference != 0) { + return difference; + } } } else { int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObjectL, rowA + sizeInDictPartA); http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java index 102b057..b805d37 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java @@ -19,6 +19,7 @@ package org.apache.carbondata.processing.loading.sort.unsafe.holder; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage; import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeInMemoryIntermediateDataMerger; @@ -43,6 +44,8 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder { private IntermediateSortTempRow currentRow; + private DataType[] noDictDataType; + public UnsafeFinalMergePageHolder(UnsafeInMemoryIntermediateDataMerger merger, boolean[] noDictSortColumnMapping) { this.actualSize = merger.getEntryCount(); @@ -52,8 +55,10 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder { for (UnsafeCarbonRowPage rowPage: rowPages) { rowPage.setReadConvertedNoSortField(); } + this.noDictDataType = rowPages[0].getTableFieldStat().getNoDictDataType(); LOGGER.info("Processing unsafe inmemory rows page with size : " + actualSize); - this.comparator = new IntermediateSortTempRowComparator(noDictSortColumnMapping); + this.comparator = + new IntermediateSortTempRowComparator(noDictSortColumnMapping, noDictDataType); } public boolean hasNext() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java index 02ffd68..baa9e71 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java @@ -45,7 +45,8 @@ public class UnsafeInmemoryHolder implements SortTempChunkHolder { this.rowPage = rowPage; LOGGER.info("Processing unsafe inmemory rows page with size : " + actualSize); this.comparator = new IntermediateSortTempRowComparator( - rowPage.getTableFieldStat().getIsSortColNoDictFlags()); + rowPage.getTableFieldStat().getIsSortColNoDictFlags(), + rowPage.getTableFieldStat().getNoDictDataType()); this.rowPage.setReadConvertedNoSortField(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java index 7c3c056..a991d4c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java @@ -109,7 +109,8 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder { this.tableFieldStat = new TableFieldStat(parameters); this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat); this.executorService = Executors.newFixedThreadPool(1); - comparator = new IntermediateSortTempRowComparator(parameters.getNoDictionarySortColumn()); + comparator = new IntermediateSortTempRowComparator(parameters.getNoDictionarySortColumn(), + parameters.getNoDictDataType()); this.convertNoSortFields = convertNoSortFields; initialize(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java index ac13d24..7683bbc 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java @@ -273,19 +273,19 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces int dictIndex = 0; int nonDicIndex = 0; int[] dim = new int[this.dimensionCount]; - byte[][] nonDicArray = new byte[this.noDictWithComplextCount][]; + Object[] nonDicArray = new Object[this.noDictWithComplextCount]; // read dimension values int dimCount = 0; for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) { if (isNoDictionaryDimensionColumn[dimCount]) { - nonDicArray[nonDicIndex++] = (byte[]) row.getObject(dimCount); + nonDicArray[nonDicIndex++] = row.getObject(dimCount); } else { dim[dictIndex++] = (int) row.getObject(dimCount); } } for (; dimCount < this.dimensionWithComplexCount; dimCount++) { - nonDicArray[nonDicIndex++] = (byte[]) row.getObject(dimCount); + nonDicArray[nonDicIndex++] = row.getObject(dimCount); } Object[] measures = new Object[measureCount]; http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java index e3bc97f..ae9ec3d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java @@ -44,6 +44,7 @@ import org.apache.carbondata.processing.loading.partition.impl.RangePartitionerI import org.apache.carbondata.processing.loading.partition.impl.RawRowComparator; import org.apache.carbondata.processing.loading.row.CarbonRowBatch; import org.apache.carbondata.processing.util.CarbonBadRecordUtil; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; import org.apache.commons.lang3.StringUtils; @@ -134,12 +135,16 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte // sort the range bounds (sort in carbon is a little different from what we think) Arrays.sort(convertedSortColumnRanges, new RawRowComparator(sortColumnRangeInfo.getSortColumnIndex(), - sortColumnRangeInfo.getIsSortColumnNoDict())); + sortColumnRangeInfo.getIsSortColumnNoDict(), CarbonDataProcessorUtil + .getNoDictDataTypes(configuration.getTableIdentifier().getDatabaseName(), + configuration.getTableIdentifier().getTableName()))); // range partitioner to dispatch rows by sort columns this.partitioner = new RangePartitionerImpl(convertedSortColumnRanges, new RawRowComparator(sortColumnRangeInfo.getSortColumnIndex(), - sortColumnRangeInfo.getIsSortColumnNoDict())); + sortColumnRangeInfo.getIsSortColumnNoDict(), CarbonDataProcessorUtil + .getNoDictDataTypes(configuration.getTableIdentifier().getDatabaseName(), + configuration.getTableIdentifier().getTableName()))); } // only convert sort column fields http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java index ce8b62f..b921675 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java @@ -266,8 +266,13 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce Object[] newData = new Object[data.length]; for (int i = 0; i < data.length; i++) { if (i < noDictionaryMapping.length && noDictionaryMapping[i]) { - newData[i] = DataTypeUtil - .getBytesDataDataTypeForNoDictionaryColumn(data[orderOfData[i]], dataTypes[i]); + if (DataTypeUtil.isPrimitiveColumn(dataTypes[i])) { + // keep the no dictionary measure column as original data + newData[i] = data[orderOfData[i]]; + } else { + newData[i] = DataTypeUtil + .getBytesDataDataTypeForNoDictionaryColumn(data[orderOfData[i]], dataTypes[i]); + } } else { // if this is a complex column then recursively comver the data into Byte Array. if (dataTypes[i].isComplexType()) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index 0fc229a..1aa6da8 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -91,6 +91,8 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { * boolean mapping for no dictionary columns in schema */ private boolean[] noDictionaryColMapping; + + private boolean[] sortColumnMapping; /** * boolean mapping for long string dimension */ @@ -275,7 +277,15 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { preparedRow[i] = dictionaryValues[dictionaryIndex++]; } else { // no dictionary dims - preparedRow[i] = wrapper.getNoDictionaryKeyByIndex(noDictionaryIndex++); + byte[] noDictionaryKeyByIndex = wrapper.getNoDictionaryKeyByIndex(noDictionaryIndex++); + if (DataTypeUtil.isPrimitiveColumn(dims.getDataType())) { + // no dictionary measure columns are expected as original data + preparedRow[i] = DataTypeUtil + .getDataBasedOnDataTypeForNoDictionaryColumn(noDictionaryKeyByIndex, + dims.getDataType()); + } else { + preparedRow[i] = noDictionaryKeyByIndex; + } } } // fill all the measures @@ -357,6 +367,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { measureCount = carbonTable.getMeasureByTableName(tableName).size(); List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName); noDictionaryColMapping = new boolean[dimensions.size()]; + sortColumnMapping = new boolean[dimensions.size()]; isVarcharDimMapping = new boolean[dimensions.size()]; int i = 0; for (CarbonDimension dimension : dimensions) { @@ -364,6 +375,9 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { i++; continue; } + if (dimension.isSortColumn()) { + sortColumnMapping[i] = true; + } noDictionaryColMapping[i] = true; if (dimension.getColumnSchema().getDataType() == DataTypes.VARCHAR) { isVarcharDimMapping[i] = true; @@ -395,8 +409,8 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { return SortParameters .createSortParameters(carbonTable, carbonLoadModel.getDatabaseName(), tableName, dimensionColumnCount, segmentProperties.getComplexDimensions().size(), measureCount, - noDictionaryCount, segmentId, - carbonLoadModel.getTaskNo(), noDictionaryColMapping, isVarcharDimMapping, true); + noDictionaryCount, segmentId, carbonLoadModel.getTaskNo(), noDictionaryColMapping, + sortColumnMapping, isVarcharDimMapping, true); } /** @@ -404,14 +418,8 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { * sort temp files */ private void initializeFinalThreadMergerForMergeSort() { - boolean[] noDictionarySortColumnMapping = null; - if (noDictionaryColMapping.length == this.segmentProperties.getNumberOfSortColumns()) { - noDictionarySortColumnMapping = noDictionaryColMapping; - } else { - noDictionarySortColumnMapping = new boolean[this.segmentProperties.getNumberOfSortColumns()]; - System.arraycopy(noDictionaryColMapping, 0, - noDictionarySortColumnMapping, 0, noDictionarySortColumnMapping.length); - } + boolean[] noDictionarySortColumnMapping = CarbonDataProcessorUtil + .getNoDictSortColMapping(carbonTable.getDatabaseName(), carbonTable.getTableName()); sortParameters.setNoDictionarySortColumn(noDictionarySortColumnMapping); String[] sortTempFileLocation = CarbonDataProcessorUtil.arrayAppend(tempStoreLocation, CarbonCommonConstants.FILE_SEPARATOR, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java index b877d52..2911c05 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java @@ -34,6 +34,7 @@ import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.metadata.CarbonMetadata; import org.apache.carbondata.core.metadata.SegmentFileStore; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.scan.result.iterator.RawResultIterator; import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; import org.apache.carbondata.core.util.ByteUtil; @@ -53,6 +54,8 @@ public class RowResultMergerProcessor extends AbstractResultProcessor { private SegmentProperties segprop; private CarbonLoadModel loadModel; private PartitionSpec partitionSpec; + + CarbonColumn[] noDicAndComplexColumns; /** * record holder heap */ @@ -86,6 +89,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor { setDataFileAttributesInModel(loadModel, compactionType, carbonFactDataHandlerModel); carbonFactDataHandlerModel.setCompactionFlow(true); carbonFactDataHandlerModel.setSegmentId(loadModel.getSegmentId()); + this.noDicAndComplexColumns = carbonFactDataHandlerModel.getNoDictAndComplexColumns(); dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel); } @@ -200,7 +204,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor { * @throws SliceMergerException */ private void addRow(Object[] carbonTuple) throws SliceMergerException { - CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segprop); + CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segprop, noDicAndComplexColumns); try { this.dataHandler.addDataToStore(row); } catch (CarbonDataWriterException e) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java index 2dc79a3..00fbc7a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java @@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.datastore.row.WriteStepRowUtil; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.carbondata.processing.store.CarbonDataFileAttributes; import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar; @@ -37,6 +38,8 @@ public class RowResultProcessor { private CarbonFactHandler dataHandler; private SegmentProperties segmentProperties; + private CarbonColumn[] noDicAndComplexColumns; + private static final LogService LOGGER = LogServiceFactory.getLogService(RowResultProcessor.class.getName()); @@ -59,6 +62,7 @@ public class RowResultProcessor { //Note: set compaction flow just to convert decimal type carbonFactDataHandlerModel.setCompactionFlow(true); carbonFactDataHandlerModel.setSegmentId(loadModel.getSegmentId()); + noDicAndComplexColumns = carbonFactDataHandlerModel.getNoDictAndComplexColumns(); dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel); } @@ -97,7 +101,8 @@ public class RowResultProcessor { } private void addRow(Object[] carbonTuple) throws CarbonDataWriterException { - CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segmentProperties); + CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segmentProperties, + noDicAndComplexColumns); try { this.dataHandler.addDataToStore(row); } catch (CarbonDataWriterException e) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java index 9b6d1e8..54fa99e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java @@ -19,7 +19,10 @@ package org.apache.carbondata.processing.sort.sortdata; import java.util.Comparator; +import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.util.comparator.SerializableComparator; import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; /** @@ -31,11 +34,15 @@ public class IntermediateSortTempRowComparator implements Comparator<Intermediat */ private boolean[] isSortColumnNoDictionary; + private DataType[] noDicSortDataTypes; + /** * @param isSortColumnNoDictionary isSortColumnNoDictionary */ - public IntermediateSortTempRowComparator(boolean[] isSortColumnNoDictionary) { + public IntermediateSortTempRowComparator(boolean[] isSortColumnNoDictionary, + DataType[] noDicSortDataTypes) { this.isSortColumnNoDictionary = isSortColumnNoDictionary; + this.noDicSortDataTypes = noDicSortDataTypes; } /** @@ -45,18 +52,31 @@ public class IntermediateSortTempRowComparator implements Comparator<Intermediat int diff = 0; int dictIndex = 0; int nonDictIndex = 0; + int noDicTypeIdx = 0; for (boolean isNoDictionary : isSortColumnNoDictionary) { if (isNoDictionary) { - byte[] byteArr1 = rowA.getNoDictSortDims()[nonDictIndex]; - byte[] byteArr2 = rowB.getNoDictSortDims()[nonDictIndex]; - nonDictIndex++; + if (DataTypeUtil.isPrimitiveColumn(noDicSortDataTypes[noDicTypeIdx])) { + // use data types based comparator for the no dictionary measure columns + SerializableComparator comparator = org.apache.carbondata.core.util.comparator.Comparator + .getComparator(noDicSortDataTypes[noDicTypeIdx]); + int difference = comparator.compare(rowA.getNoDictSortDims()[nonDictIndex], + rowB.getNoDictSortDims()[nonDictIndex]); + if (difference != 0) { + return difference; + } + noDicTypeIdx++; + } else { + byte[] byteArr1 = (byte[]) rowA.getNoDictSortDims()[nonDictIndex]; + byte[] byteArr2 = (byte[]) rowB.getNoDictSortDims()[nonDictIndex]; - int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2); - if (difference != 0) { - return difference; + int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2); + if (difference != 0) { + return difference; + } } + nonDictIndex++; } else { int dimFieldA = rowA.getDictSortDims()[dictIndex]; int dimFieldB = rowB.getDictSortDims()[dictIndex]; http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java index f47ecc7..4dff644 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java @@ -20,7 +20,10 @@ package org.apache.carbondata.processing.sort.sortdata; import java.io.Serializable; import java.util.Comparator; +import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.util.comparator.SerializableComparator; public class NewRowComparator implements Comparator<Object[]>, Serializable { private static final long serialVersionUID = -1739874611112709436L; @@ -28,13 +31,20 @@ public class NewRowComparator implements Comparator<Object[]>, Serializable { /** * mapping of dictionary dimensions and no dictionary of sort_column. */ - private boolean[] noDictionarySortColumnMaping; + private boolean[] noDicDimColMapping; + + private DataType[] noDicDataTypes; + + private boolean[] noDicSortColumnMapping; /** - * @param noDictionarySortColumnMaping + * @param noDicDimColMapping */ - public NewRowComparator(boolean[] noDictionarySortColumnMaping) { - this.noDictionarySortColumnMaping = noDictionarySortColumnMaping; + public NewRowComparator(boolean[] noDicDimColMapping, boolean[] noDicSortColumnMapping, + DataType[] noDicDataTypes) { + this.noDicDimColMapping = noDicDimColMapping; + this.noDicSortColumnMapping = noDicSortColumnMapping; + this.noDicDataTypes = noDicDataTypes; } /** @@ -43,15 +53,31 @@ public class NewRowComparator implements Comparator<Object[]>, Serializable { public int compare(Object[] rowA, Object[] rowB) { int diff = 0; int index = 0; + int dataTypeIdx = 0; + int noDicSortIdx = 0; - for (boolean isNoDictionary : noDictionarySortColumnMaping) { - if (isNoDictionary) { - byte[] byteArr1 = (byte[]) rowA[index]; - byte[] byteArr2 = (byte[]) rowB[index]; + for (int i = 0; i < noDicDimColMapping.length; i++) { + if (noDicDimColMapping[i]) { + if (noDicSortColumnMapping[noDicSortIdx++]) { + if (DataTypeUtil.isPrimitiveColumn(noDicDataTypes[dataTypeIdx])) { + // use data types based comparator for the no dictionary measure columns + SerializableComparator comparator = + org.apache.carbondata.core.util.comparator.Comparator + .getComparator(noDicDataTypes[dataTypeIdx]); + int difference = comparator.compare(rowA[index], rowB[index]); + if (difference != 0) { + return difference; + } + dataTypeIdx++; + } else { + byte[] byteArr1 = (byte[]) rowA[index]; + byte[] byteArr2 = (byte[]) rowB[index]; - int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2); - if (difference != 0) { - return difference; + int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2); + if (difference != 0) { + return difference; + } + } } } else { int dimFieldA = (int) rowA[index]; http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java index a5caf7b..730c729 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java @@ -203,7 +203,9 @@ public class SortDataRows { toSort = new Object[entryCount][]; System.arraycopy(recordHolderList, 0, toSort, 0, entryCount); if (parameters.getNumberOfNoDictSortColumns() > 0) { - Arrays.sort(toSort, new NewRowComparator(parameters.getNoDictionarySortColumn())); + Arrays.sort(toSort, new NewRowComparator(parameters.getNoDictionaryDimnesionColumn(), + parameters.getNoDictionarySortColumn(), + parameters.getNoDictDataType())); } else { Arrays.sort(toSort, new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns())); } @@ -315,7 +317,8 @@ public class SortDataRows { long startTime = System.currentTimeMillis(); if (parameters.getNumberOfNoDictSortColumns() > 0) { Arrays.sort(recordHolderArray, - new NewRowComparator(parameters.getNoDictionarySortColumn())); + new NewRowComparator(parameters.getNoDictionaryDimnesionColumn(), + parameters.getNoDictionarySortColumn(), parameters.getNoDictDataType())); } else { Arrays.sort(recordHolderArray, new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns()));