Repository: carbondata Updated Branches: refs/heads/master 5725b7eb1 -> 982d03fea
[CARBONDATA-2163][CARBONDATA-2164] Remove spark dependency in core and processing modules This closes #2070 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/982d03fe Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/982d03fe Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/982d03fe Branch: refs/heads/master Commit: 982d03fea2a3b0869b81ce1a0f896b522a25af64 Parents: 5725b7e Author: Jacky Li <[email protected]> Authored: Mon Mar 19 00:11:49 2018 +0800 Committer: QiangCai <[email protected]> Committed: Fri Mar 23 22:21:22 2018 +0800 ---------------------------------------------------------------------- core/pom.xml | 39 +++++++--- ...feVariableLengthDimensionDataChunkStore.java | 4 +- ...afeVariableLengthDimesionDataChunkStore.java | 4 +- .../carbondata/core/memory/MemoryBlock.java | 8 -- .../impl/AbstractScannedResultCollector.java | 6 +- ...structureBasedDictionaryResultCollector.java | 10 ++- .../RestructureBasedRawResultCollector.java | 7 +- .../RestructureBasedVectorResultCollector.java | 9 +-- .../core/scan/complextypes/ArrayQueryType.java | 11 +-- .../scan/complextypes/PrimitiveQueryType.java | 26 ------- .../core/scan/complextypes/StructQueryType.java | 18 +---- .../scan/executor/util/RestructureUtil.java | 7 +- .../core/scan/filter/GenericQueryType.java | 4 - .../apache/carbondata/core/util/CarbonUtil.java | 3 +- .../carbondata/core/util/DataTypeConverter.java | 11 ++- .../core/util/DataTypeConverterImpl.java | 41 +++++++++- .../carbondata/core/util/DataTypeUtil.java | 6 +- .../complextypes/PrimitiveQueryTypeTest.java | 29 ------- .../scan/complextypes/StructQueryTypeTest.java | 5 -- .../conditional/EqualToExpressionUnitTest.java | 3 +- .../GreaterThanEqualToExpressionUnitTest.java | 3 +- .../GreaterThanExpressionUnitTest.java | 5 +- .../conditional/InExpressionUnitTest.java | 5 +- .../LessThanEqualToExpressionUnitTest.java | 5 +- .../conditional/LessThanExpressionUnitTest.java | 5 +- .../NotEqualsExpressionUnitTest.java | 5 +- .../conditional/NotInExpressionUnitTest.java | 5 +- .../carbondata/core/util/DataTypeUtilTest.java | 2 +- hadoop/pom.xml | 4 + integration/presto/pom.xml | 5 ++ .../presto/CarbonColumnVectorWrapper.java | 64 ++++++++++++++++ .../spark/util/SparkDataTypeConverterImpl.java | 42 ++++++++-- .../spark/rdd/CarbonScanPartitionRDD.scala | 3 +- .../carbondata/spark/util/CarbonScalaUtil.scala | 2 + .../vectorreader/ColumnarVectorWrapper.java | 11 +-- .../spark/rdd/CarbonDataRDDFactory.scala | 1 - .../spark/sql/SparkUnknownExpression.scala | 4 +- .../CarbonAlterTableDataTypeChangeCommand.scala | 57 ++++++++++++-- .../datasources/CarbonFileFormat.scala | 2 +- .../sql/hive/CarbonPreAggregateRules.scala | 3 +- processing/pom.xml | 6 +- .../loading/TableProcessingOperations.java | 2 - .../sort/unsafe/sort/SortDataFormat.java | 80 ++++++++++++++++++++ .../loading/sort/unsafe/sort/TimSort.java | 2 - .../unsafe/sort/UnsafeIntSortDataFormat.java | 2 - .../merger/CompactionResultSortProcessor.java | 5 +- .../partition/spliter/CarbonSplitExecutor.java | 7 +- .../carbondata/processing/store/TablePage.java | 4 +- 48 files changed, 392 insertions(+), 200 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/pom.xml ---------------------------------------------------------------------- diff --git a/core/pom.xml b/core/pom.xml index 824de0d..d9c756e 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -49,11 +49,6 @@ <artifactId>gson</artifactId> <version>2.3.1</version> </dependency> - <!--<dependency>--> - <!--<groupId>io.netty</groupId>--> - <!--<artifactId>netty-all</artifactId>--> - <!--<version>4.1.8.Final</version>--> - <!--</dependency>--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> @@ -63,6 +58,11 @@ <artifactId>hadoop-hdfs</artifactId> </dependency> <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> <groupId>org.xerial.snappy</groupId> <artifactId>snappy-java</artifactId> <version>${snappy.version}</version> @@ -78,10 +78,6 @@ <scope>test</scope> </dependency> <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_${scala.binary.version}</artifactId> - </dependency> - <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.7</version> @@ -96,6 +92,31 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>3.5</version> + </dependency> + <dependency> + <groupId>org.roaringbitmap</groupId> + <artifactId>RoaringBitmap</artifactId> + <version>0.5.11</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + <version>4.0.42.Final</version> + </dependency> + <dependency> + <groupId>net.jpountz.lz4</groupId> + <artifactId>lz4</artifactId> + <version>1.3.0</version> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java index f498c6e..09230dd 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java @@ -153,8 +153,8 @@ public class SafeVariableLengthDimensionDataChunkStore extends SafeAbsractDimens } else if (dt == DataTypes.INT) { vector.putInt(vectorRow, ByteUtil.toInt(data, currentDataOffset, length)); } else if (dt == DataTypes.LONG) { - vector.putLong(vectorRow, DataTypeUtil - .getDataBasedOnRestructuredDataType(data, vector.getBlockDataType(), + vector.putLong(vectorRow, + DataTypeUtil.getDataBasedOnRestructuredDataType(data, vector.getBlockDataType(), currentDataOffset, length)); } else if (dt == DataTypes.TIMESTAMP) { vector.putLong(vectorRow, ByteUtil.toLong(data, currentDataOffset, length) * 1000L); http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java index e1eb378..0321ee7 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java @@ -177,8 +177,8 @@ public class UnsafeVariableLengthDimesionDataChunkStore } else if (dt == DataTypes.INT) { vector.putInt(vectorRow, ByteUtil.toInt(value, 0, value.length)); } else if (dt == DataTypes.LONG) { - vector.putLong(vectorRow, DataTypeUtil - .getDataBasedOnRestructuredDataType(value, vector.getBlockDataType(), 0, + vector.putLong(vectorRow, + DataTypeUtil.getDataBasedOnRestructuredDataType(value, vector.getBlockDataType(), 0, value.length)); } else if (dt == DataTypes.TIMESTAMP) { vector.putLong(vectorRow, ByteUtil.toLong(value, 0, value.length) * 1000L); http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/memory/MemoryBlock.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/memory/MemoryBlock.java b/core/src/main/java/org/apache/carbondata/core/memory/MemoryBlock.java index d6cb184..fd4f06c 100644 --- a/core/src/main/java/org/apache/carbondata/core/memory/MemoryBlock.java +++ b/core/src/main/java/org/apache/carbondata/core/memory/MemoryBlock.java @@ -19,8 +19,6 @@ package org.apache.carbondata.core.memory; import javax.annotation.Nullable; -import org.apache.spark.unsafe.Platform; - /** * Code ported from Apache Spark {org.apache.spark.unsafe.memory} package * A consecutive block of memory, starting at a {@link MemoryLocation} with a fixed size. @@ -55,10 +53,4 @@ public class MemoryBlock extends MemoryLocation { this.isFreed = freedStatus; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { - return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8); - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java index 694271e..9ac5a06 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java @@ -75,7 +75,8 @@ public abstract class AbstractScannedResultCollector implements ScannedResultCol Object defaultValue = measureInfo.getDefaultValues()[i]; if (null != defaultValue && DataTypes.isDecimal(measureInfo.getMeasureDataTypes()[i])) { // convert data type as per the computing engine - defaultValue = DataTypeUtil.getDataTypeConverter().convertToDecimal(defaultValue); + defaultValue = + DataTypeUtil.getDataTypeConverter().convertFromBigDecimalToDecimal(defaultValue); } msrValues[i + offset] = defaultValue; } @@ -100,7 +101,8 @@ public abstract class AbstractScannedResultCollector implements ScannedResultCol bigDecimalMsrValue.setScale(carbonMeasure.getScale(), RoundingMode.HALF_UP); } // convert data type as per the computing engine - return DataTypeUtil.getDataTypeConverter().convertToDecimal(bigDecimalMsrValue); + return DataTypeUtil.getDataTypeConverter().convertFromBigDecimalToDecimal( + bigDecimalMsrValue); } else { return dataChunk.getDouble(index); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java index 8b42a4a..f0d6898 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java @@ -20,11 +20,13 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; import org.apache.carbondata.core.scan.executor.util.RestructureUtil; import org.apache.carbondata.core.scan.filter.GenericQueryType; import org.apache.carbondata.core.scan.model.ProjectionMeasure; import org.apache.carbondata.core.scan.result.BlockletScannedResult; +import org.apache.carbondata.core.util.DataTypeUtil; /** * class for handling restructure scenarios for filling result @@ -86,6 +88,9 @@ public class RestructureBasedDictionaryResultCollector extends DictionaryBasedRe if (dictionaryEncodingArray[i] || directDictionaryEncodingArray[i]) { row[order[i]] = dimensionInfo.getDefaultValues()[i]; dictionaryColumnIndex++; + } else if (queryDimensions[i].getDimension().getDataType() == DataTypes.STRING) { + row[order[i]] = DataTypeUtil.getDataTypeConverter().convertFromByteToUTF8String( + (byte[])dimensionInfo.getDefaultValues()[i]); } else { row[order[i]] = dimensionInfo.getDefaultValues()[i]; } @@ -119,8 +124,11 @@ public class RestructureBasedDictionaryResultCollector extends DictionaryBasedRe scannedResult.getMeasureChunk(measureInfo.getMeasureOrdinals()[measureExistIndex]), scannedResult.getCurrentRowId(), queryMeasure.getMeasure()); measureExistIndex++; - } else { + } else if (DataTypes.isDecimal(measureInfo.getMeasureDataTypes()[i])) { // if not then get the default value + msrValues[i + offset] = DataTypeUtil.getDataTypeConverter() + .convertFromBigDecimalToDecimal(measureDefaultValues[i]); + } else { msrValues[i + offset] = measureDefaultValues[i]; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java index 6544a75..d776b5e 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java @@ -34,9 +34,9 @@ import org.apache.carbondata.core.scan.model.ProjectionDimension; import org.apache.carbondata.core.scan.model.ProjectionMeasure; import org.apache.carbondata.core.scan.result.BlockletScannedResult; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.commons.lang3.ArrayUtils; -import org.apache.spark.unsafe.types.UTF8String; /** * It is not a collector it is just a scanned result holder. @@ -239,10 +239,11 @@ public class RestructureBasedRawResultCollector extends RawBasedResultCollector byte[] newColumnDefaultValue = null; Object defaultValue = dimensionInfo.getDefaultValues()[i]; if (null != defaultValue) { - newColumnDefaultValue = ((UTF8String) defaultValue).getBytes(); + newColumnDefaultValue = (byte[]) defaultValue; } else if (actualQueryDimensions[i].getDimension().getDataType() == DataTypes.STRING) { newColumnDefaultValue = - UTF8String.fromString(CarbonCommonConstants.MEMBER_DEFAULT_VAL).getBytes(); + DataTypeUtil.getDataTypeConverter().convertFromByteToUTF8Bytes( + CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY); } else { newColumnDefaultValue = CarbonCommonConstants.EMPTY_BYTE_ARRAY; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java index 61a2992..b95bffe 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java @@ -16,6 +16,7 @@ */ package org.apache.carbondata.core.scan.collector.impl; +import java.math.BigDecimal; import java.util.List; import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; @@ -31,9 +32,6 @@ import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; -import org.apache.spark.sql.types.Decimal; -import org.apache.spark.unsafe.types.UTF8String; - /** * It is not a collector it is just a scanned result holder. */ @@ -206,8 +204,7 @@ public class RestructureBasedVectorResultCollector extends DictionaryBasedVector } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) { vector.putLongs(columnVectorInfo.vectorOffset, columnVectorInfo.size, (long) defaultValue); } else { - vector.putBytes(columnVectorInfo.vectorOffset, columnVectorInfo.size, - ((UTF8String) defaultValue).getBytes()); + vector.putBytes(columnVectorInfo.vectorOffset, columnVectorInfo.size, (byte[])defaultValue); } } else { vector.putNulls(columnVectorInfo.vectorOffset, columnVectorInfo.size); @@ -240,7 +237,7 @@ public class RestructureBasedVectorResultCollector extends DictionaryBasedVector (long) defaultValue); } else if (DataTypes.isDecimal(dataType)) { vector.putDecimals(columnVectorInfo.vectorOffset, columnVectorInfo.size, - ((Decimal) defaultValue).toJavaBigDecimal(), measure.getPrecision()); + (BigDecimal) defaultValue, measure.getPrecision()); } else if (dataType == DataTypes.BOOLEAN) { vector.putBoolean(columnVectorInfo.vectorOffset, (Boolean) defaultValue); } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java index 30dd1dd..24c1c9b 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java @@ -24,10 +24,7 @@ import java.nio.ByteBuffer; import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; import org.apache.carbondata.core.scan.filter.GenericQueryType; import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks; - -import org.apache.spark.sql.catalyst.util.GenericArrayData; -import org.apache.spark.sql.types.ArrayType; -import org.apache.spark.sql.types.DataType; +import org.apache.carbondata.core.util.DataTypeUtil; public class ArrayQueryType extends ComplexQueryType implements GenericQueryType { @@ -82,10 +79,6 @@ public class ArrayQueryType extends ComplexQueryType implements GenericQueryType return children.getColsCount() + 1; } - @Override public DataType getSchemaType() { - return new ArrayType(null, true); - } - @Override public void fillRequiredBlockData(RawBlockletColumnChunks blockChunkHolder) throws IOException { readBlockDataChunk(blockChunkHolder); @@ -101,7 +94,7 @@ public class ArrayQueryType extends ComplexQueryType implements GenericQueryType for (int i = 0; i < dataLength; i++) { data[i] = children.getDataBasedOnDataTypeFromSurrogates(surrogateData); } - return new GenericArrayData(data); + return DataTypeUtil.getDataTypeConverter().wrapWithGenericArrayData(data); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java index b8aa912..8c75caf 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java @@ -30,14 +30,6 @@ import org.apache.carbondata.core.scan.filter.GenericQueryType; import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks; import org.apache.carbondata.core.util.DataTypeUtil; -import org.apache.spark.sql.types.BooleanType$; -import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.DateType$; -import org.apache.spark.sql.types.DoubleType$; -import org.apache.spark.sql.types.IntegerType$; -import org.apache.spark.sql.types.LongType$; -import org.apache.spark.sql.types.TimestampType$; - public class PrimitiveQueryType extends ComplexQueryType implements GenericQueryType { private String name; @@ -95,24 +87,6 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery dataOutputStream.write(currentVal); } - @Override public DataType getSchemaType() { - if (dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT) { - return IntegerType$.MODULE$; - } else if (dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE) { - return DoubleType$.MODULE$; - } else if (dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) { - return LongType$.MODULE$; - } else if (dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN) { - return BooleanType$.MODULE$; - } else if (dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.TIMESTAMP) { - return TimestampType$.MODULE$; - } else if (dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DATE) { - return DateType$.MODULE$; - } else { - return IntegerType$.MODULE$; - } - } - @Override public void fillRequiredBlockData(RawBlockletColumnChunks blockChunkHolder) throws IOException { readBlockDataChunk(blockChunkHolder); http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java index 1d4f141..1064694 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java @@ -26,12 +26,7 @@ import java.util.List; import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; import org.apache.carbondata.core.scan.filter.GenericQueryType; import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks; - -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; -import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.Metadata; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; +import org.apache.carbondata.core.util.DataTypeUtil; public class StructQueryType extends ComplexQueryType implements GenericQueryType { @@ -97,15 +92,6 @@ public class StructQueryType extends ComplexQueryType implements GenericQueryTyp } } - @Override public DataType getSchemaType() { - StructField[] fields = new StructField[children.size()]; - for (int i = 0; i < children.size(); i++) { - fields[i] = new StructField(children.get(i).getName(), null, true, - Metadata.empty()); - } - return new StructType(fields); - } - @Override public void fillRequiredBlockData(RawBlockletColumnChunks blockChunkHolder) throws IOException { readBlockDataChunk(blockChunkHolder); @@ -122,6 +108,6 @@ public class StructQueryType extends ComplexQueryType implements GenericQueryTyp fields[i] = children.get(i).getDataBasedOnDataTypeFromSurrogates(surrogateData); } - return new GenericInternalRow(fields); + return DataTypeUtil.getDataTypeConverter().wrapWithGenericRow(fields); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java index e67d822..d7247b2 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java @@ -41,8 +41,6 @@ import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.commons.lang3.ArrayUtils; -import org.apache.spark.sql.types.Decimal; -import org.apache.spark.unsafe.types.UTF8String; /** * Utility class for restructuring @@ -231,7 +229,8 @@ public class RestructureUtil { long timestampValue = ByteUtil.toLong(defaultValue, 0, defaultValue.length); noDictionaryDefaultValue = timestampValue * 1000L; } else { - noDictionaryDefaultValue = UTF8String.fromBytes(defaultValue); + noDictionaryDefaultValue = + DataTypeUtil.getDataTypeConverter().convertFromByteToUTF8Bytes(defaultValue); } } return noDictionaryDefaultValue; @@ -318,7 +317,7 @@ public class RestructureUtil { if (columnSchema.getScale() > decimal.scale()) { decimal = decimal.setScale(columnSchema.getScale(), RoundingMode.HALF_UP); } - measureDefaultValue = Decimal.apply(decimal); + measureDefaultValue = decimal; } else { value = new String(defaultValue, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); Double parsedValue = Double.valueOf(value); http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java index 214bd9d..b5d8d82 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java @@ -24,8 +24,6 @@ import java.nio.ByteBuffer; import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks; -import org.apache.spark.sql.types.DataType; - public interface GenericQueryType { String getName(); @@ -43,8 +41,6 @@ public interface GenericQueryType { void parseBlocksAndReturnComplexColumnByteArray(DimensionRawColumnChunk[] rawColumnChunks, int rowNumber, int pageNumber, DataOutputStream dataOutputStream) throws IOException; - DataType getSchemaType(); - void fillRequiredBlockData(RawBlockletColumnChunks blockChunkHolder) throws IOException; Object getDataBasedOnDataTypeFromSurrogates(ByteBuffer surrogateData); http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 06511f8..5bb1c8e 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -95,6 +95,7 @@ import org.apache.thrift.TException; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TIOStreamTransport; +import scala.StringContext; public final class CarbonUtil { @@ -656,7 +657,7 @@ public final class CarbonUtil { * @return */ public static String unescapeChar(String parseStr) { - return scala.StringContext.treatEscapes(parseStr); + return StringContext.treatEscapes(parseStr); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java index 8c9e058..7c63860 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java @@ -19,9 +19,16 @@ package org.apache.carbondata.core.util; public interface DataTypeConverter { - Object convertToDecimal(Object data); - Object convertFromByteToUTF8String(Object data); + Object convertFromStringToDecimal(Object data); + Object convertFromBigDecimalToDecimal(Object data); + Object convertFromDecimalToBigDecimal(Object data); + + Object convertFromByteToUTF8String(byte[] data); + byte[] convertFromByteToUTF8Bytes(byte[] data); byte[] convertFromStringToByte(Object data); Object convertFromStringToUTF8String(Object Data); + Object wrapWithGenericArrayData(Object data); + Object wrapWithGenericRow(Object[] fields); + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java index f1603dc..ea5740d 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java @@ -26,7 +26,8 @@ public class DataTypeConverterImpl implements DataTypeConverter, Serializable { private static final long serialVersionUID = -1718154403432354200L; - public Object convertToDecimal(Object data) { + @Override + public Object convertFromStringToDecimal(Object data) { if (null == data) { return null; } @@ -36,13 +37,35 @@ public class DataTypeConverterImpl implements DataTypeConverter, Serializable { return new BigDecimal(data.toString()); } - public Object convertFromByteToUTF8String(Object data) { + @Override + public Object convertFromBigDecimalToDecimal(Object data) { if (null == data) { return null; } - return new String((byte[]) data, CarbonCommonConstants.DEFAULT_CHARSET_CLASS); + if (data instanceof BigDecimal) { + return data; + } + return new BigDecimal(data.toString()); + } + + @Override public Object convertFromDecimalToBigDecimal(Object data) { + return convertFromBigDecimalToDecimal(data); + } + + @Override + public Object convertFromByteToUTF8String(byte[] data) { + if (null == data) { + return null; + } + return new String(data, CarbonCommonConstants.DEFAULT_CHARSET_CLASS); } + @Override + public byte[] convertFromByteToUTF8Bytes(byte[] data) { + return data; + } + + @Override public byte[] convertFromStringToByte(Object data) { if (null == data) { return null; @@ -50,10 +73,22 @@ public class DataTypeConverterImpl implements DataTypeConverter, Serializable { return data.toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS); } + @Override public Object convertFromStringToUTF8String(Object data) { if (null == data) { return null; } return data.toString(); } + + @Override + public Object wrapWithGenericArrayData(Object data) { + return data; + } + + @Override + public Object wrapWithGenericRow(Object[] fields) { + return fields; + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java index 4602cc4..a4d6094 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java @@ -103,7 +103,7 @@ public final class DataTypeUtil { new BigDecimal(msrValue).setScale(carbonMeasure.getScale(), RoundingMode.HALF_UP); BigDecimal decimal = normalizeDecimalValue(bigDecimal, carbonMeasure.getPrecision()); if (useConverter) { - return converter.convertToDecimal(decimal); + return converter.convertFromBigDecimalToDecimal(decimal); } else { return decimal; } @@ -309,7 +309,7 @@ public final class DataTypeUtil { if (data.isEmpty()) { return null; } - return converter.convertToDecimal(data); + return converter.convertFromStringToDecimal(data); } else { return converter.convertFromStringToUTF8String(data); } @@ -544,7 +544,7 @@ public final class DataTypeUtil { if (dimension.getColumnSchema().getScale() > javaDecVal.scale()) { javaDecVal = javaDecVal.setScale(dimension.getColumnSchema().getScale()); } - return getDataTypeConverter().convertToDecimal(javaDecVal); + return getDataTypeConverter().convertFromBigDecimalToDecimal(javaDecVal); } else { return getDataTypeConverter().convertFromByteToUTF8String(dataInBytes); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java b/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java index 5149136..3236f16 100644 --- a/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java +++ b/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java @@ -29,11 +29,6 @@ import org.apache.carbondata.core.util.DataTypeUtil; import mockit.Mock; import mockit.MockUp; -import org.apache.spark.sql.types.BooleanType$; -import org.apache.spark.sql.types.DoubleType$; -import org.apache.spark.sql.types.IntegerType$; -import org.apache.spark.sql.types.LongType$; -import org.apache.spark.sql.types.TimestampType$; import org.junit.BeforeClass; import org.junit.Test; @@ -78,30 +73,6 @@ public class PrimitiveQueryTypeTest { } - @Test public void testGetDataTypeForDefault() { - assertEquals(IntegerType$.MODULE$, primitiveQueryType.getSchemaType()); - } - - @Test public void testGetDataTypeForInt() { - assertEquals(IntegerType$.MODULE$, primitiveQueryTypeForInt.getSchemaType()); - } - - @Test public void testGetDataTypeForDouble() { - assertEquals(DoubleType$.MODULE$, primitiveQueryTypeForDouble.getSchemaType()); - } - - @Test public void testGetDataTypeForBoolean() { - assertEquals(BooleanType$.MODULE$, primitiveQueryTypeForBoolean.getSchemaType()); - } - - @Test public void testGetDataTypeForTimeStamp() { - assertEquals(TimestampType$.MODULE$, primitiveQueryTypeForTimeStamp.getSchemaType()); - } - - @Test public void testGetDataTypeForLong() { - assertEquals(LongType$.MODULE$, primitiveQueryTypeForLong.getSchemaType()); - } - @Test public void testGetDataBasedOnDataTypeFromSurrogates() { ByteBuffer surrogateData = ByteBuffer.allocate(10); surrogateData.put(3, (byte) 1); http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/test/java/org/apache/carbondata/core/scan/complextypes/StructQueryTypeTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/scan/complextypes/StructQueryTypeTest.java b/core/src/test/java/org/apache/carbondata/core/scan/complextypes/StructQueryTypeTest.java index 3215ef0..b09d9dd 100644 --- a/core/src/test/java/org/apache/carbondata/core/scan/complextypes/StructQueryTypeTest.java +++ b/core/src/test/java/org/apache/carbondata/core/scan/complextypes/StructQueryTypeTest.java @@ -64,9 +64,4 @@ public class StructQueryTypeTest { assertEquals(expectedValue, actualValue); } - @Test public void testGetSchemaType() { - List children = new ArrayList(); - children.add(null); - assertNotNull(structQueryType.getSchemaType()); - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpressionUnitTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpressionUnitTest.java b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpressionUnitTest.java index b2843bc..f349fc9 100644 --- a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpressionUnitTest.java +++ b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpressionUnitTest.java @@ -32,7 +32,6 @@ import org.apache.carbondata.core.scan.filter.intf.RowImpl; import mockit.Mock; import mockit.MockUp; -import org.apache.spark.sql.types.Decimal; import org.junit.Test; import static junit.framework.Assert.assertEquals; @@ -297,7 +296,7 @@ public class EqualToExpressionUnitTest { right.setColIndex(0); equalToExpression = new EqualToExpression(right, right); RowImpl value = new RowImpl(); - Decimal[] row = new Decimal[] { Decimal.apply(12345.0) }; + BigDecimal[] row = new BigDecimal[] { new BigDecimal(12345.0) }; Object objectRow[] = { row }; value.setValues(objectRow); http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpressionUnitTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpressionUnitTest.java b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpressionUnitTest.java index adff4e4..95e5935 100644 --- a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpressionUnitTest.java +++ b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpressionUnitTest.java @@ -32,7 +32,6 @@ import org.apache.carbondata.core.scan.filter.intf.RowImpl; import mockit.Mock; import mockit.MockUp; -import org.apache.spark.sql.types.Decimal; import org.junit.Test; import static junit.framework.Assert.assertEquals; @@ -210,7 +209,7 @@ public class GreaterThanEqualToExpressionUnitTest { left.setColIndex(1); greaterThanEqualToExpression = new GreaterThanEqualToExpression(left, right); RowImpl value = new RowImpl(); - Decimal[] row = new Decimal[] { Decimal.apply(12345.0) }; + BigDecimal[] row = new BigDecimal[] { new BigDecimal(12345.0) }; Object objectRow[] = { row, row }; value.setValues(objectRow); http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpressionUnitTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpressionUnitTest.java b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpressionUnitTest.java index b5decc2..38a7222 100644 --- a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpressionUnitTest.java +++ b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpressionUnitTest.java @@ -32,7 +32,6 @@ import org.apache.carbondata.core.scan.filter.intf.RowImpl; import mockit.Mock; import mockit.MockUp; -import org.apache.spark.sql.types.Decimal; import org.junit.Test; import static junit.framework.Assert.assertEquals; @@ -256,8 +255,8 @@ public class GreaterThanExpressionUnitTest { left.setColIndex(1); greaterThanExpression = new GreaterThanExpression(left, right); RowImpl value = new RowImpl(); - Decimal[] row = new Decimal[] { Decimal.apply(12345.0) }; - Decimal[] row1 = new Decimal[] { Decimal.apply(123451245.0) }; + BigDecimal[] row = new BigDecimal[] { new BigDecimal(12345.0) }; + BigDecimal[] row1 = new BigDecimal[] { new BigDecimal(123451245.0) }; Object objectRow[] = { row1, row }; value.setValues(objectRow); http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/InExpressionUnitTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/InExpressionUnitTest.java b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/InExpressionUnitTest.java index 4f4203d..3b48e44 100644 --- a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/InExpressionUnitTest.java +++ b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/InExpressionUnitTest.java @@ -33,7 +33,6 @@ import org.apache.carbondata.core.scan.filter.intf.RowImpl; import mockit.Mock; import mockit.MockUp; -import org.apache.spark.sql.types.Decimal; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -209,8 +208,8 @@ public class InExpressionUnitTest { right.setColIndex(1); inExpression = new InExpression(left, right); RowImpl value = new RowImpl(); - Decimal row = Decimal.apply(123452154.0); - Decimal row1 = Decimal.apply(123452154.0); + BigDecimal row = new BigDecimal(123452154.0); + BigDecimal row1 = new BigDecimal(123452154.0); Object objectRow[] = { row, row1 }; value.setValues(objectRow); http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpressionUnitTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpressionUnitTest.java b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpressionUnitTest.java index f1001ae..d3c8cf4 100644 --- a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpressionUnitTest.java +++ b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpressionUnitTest.java @@ -32,7 +32,6 @@ import org.apache.carbondata.core.scan.filter.intf.RowImpl; import mockit.Mock; import mockit.MockUp; -import org.apache.spark.sql.types.Decimal; import org.junit.Test; import static junit.framework.Assert.assertEquals; @@ -259,8 +258,8 @@ public class LessThanEqualToExpressionUnitTest { left.setColIndex(1); lessThanEqualToExpression = new LessThanEqualToExpression(left, right); RowImpl value = new RowImpl(); - Decimal[] row = new Decimal[] { Decimal.apply(46851.2) }; - Decimal[] row1 = new Decimal[] { Decimal.apply(45821.02) }; + BigDecimal[] row = new BigDecimal[] { new BigDecimal(46851.2) }; + BigDecimal[] row1 = new BigDecimal[] { new BigDecimal(45821.02) }; Object objectRow[] = { row1, row }; value.setValues(objectRow); http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpressionUnitTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpressionUnitTest.java b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpressionUnitTest.java index 4dfeaa3..e038a56 100644 --- a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpressionUnitTest.java +++ b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpressionUnitTest.java @@ -32,7 +32,6 @@ import org.apache.carbondata.core.scan.filter.intf.RowImpl; import mockit.Mock; import mockit.MockUp; -import org.apache.spark.sql.types.Decimal; import org.junit.Test; import static junit.framework.Assert.assertEquals; @@ -256,8 +255,8 @@ public class LessThanExpressionUnitTest { left.setColIndex(1); lessThanExpression = new LessThanExpression(left, right); RowImpl value = new RowImpl(); - Decimal[] row = new Decimal[] { Decimal.apply(256324.0) }; - Decimal[] row1 = new Decimal[] { Decimal.apply(123451245.0) }; + BigDecimal[] row = new BigDecimal[] { new BigDecimal(256324.0) }; + BigDecimal[] row1 = new BigDecimal[] { new BigDecimal(123451245.0) }; Object objectRow[] = { row1, row }; value.setValues(objectRow); http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpressionUnitTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpressionUnitTest.java b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpressionUnitTest.java index c9707e9..34448f4 100644 --- a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpressionUnitTest.java +++ b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpressionUnitTest.java @@ -32,7 +32,6 @@ import org.apache.carbondata.core.scan.filter.intf.RowImpl; import mockit.Mock; import mockit.MockUp; -import org.apache.spark.sql.types.Decimal; import org.junit.Test; import static junit.framework.Assert.assertEquals; @@ -266,8 +265,8 @@ public class NotEqualsExpressionUnitTest { left.setColIndex(0); notEqualsExpression = new NotEqualsExpression(left, right); RowImpl value = new RowImpl(); - Decimal[] row = new Decimal[] { Decimal.apply(12345.0) }; - Decimal[] row1 = new Decimal[] { Decimal.apply(1235445.0) }; + BigDecimal[] row = new BigDecimal[] { new BigDecimal(12345.0) }; + BigDecimal[] row1 = new BigDecimal[] { new BigDecimal(1235445.0) }; Object objectRow[] = { row, row1 }; value.setValues(objectRow); http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpressionUnitTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpressionUnitTest.java b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpressionUnitTest.java index 207ce5d..d31361b 100644 --- a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpressionUnitTest.java +++ b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpressionUnitTest.java @@ -33,7 +33,6 @@ import org.apache.carbondata.core.scan.filter.intf.RowImpl; import mockit.Mock; import mockit.MockUp; -import org.apache.spark.sql.types.Decimal; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -208,8 +207,8 @@ public class NotInExpressionUnitTest { right.setColIndex(1); notInExpression = new NotInExpression(left, right); RowImpl value = new RowImpl(); - Decimal row = Decimal.apply(123452154.0); - Decimal row1 = Decimal.apply(1234521215454.0); + BigDecimal row = new BigDecimal(123452154.0); + BigDecimal row1 = new BigDecimal(1234521215454.0); Object objectRow[] = { row, row1 }; value.setValues(objectRow); http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java index 0c8f926..c67da7d 100644 --- a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java +++ b/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java @@ -71,7 +71,7 @@ public class DataTypeUtilTest { java.math.BigDecimal javaDecVal = new java.math.BigDecimal(1); scala.math.BigDecimal scalaDecVal = new scala.math.BigDecimal(javaDecVal); assertEquals(getDataBasedOnDataType("1", DataTypes.createDefaultDecimalType()), - DataTypeUtil.getDataTypeConverter().convertToDecimal(scalaDecVal)); + DataTypeUtil.getDataTypeConverter().convertFromBigDecimalToDecimal(scalaDecVal)); assertEquals(getDataBasedOnDataType("default", DataTypes.NULL), DataTypeUtil.getDataTypeConverter().convertFromStringToUTF8String("default")); assertEquals(getDataBasedOnDataType((String) null, DataTypes.NULL), null); http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/hadoop/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop/pom.xml b/hadoop/pom.xml index c3964c5..916b9db 100644 --- a/hadoop/pom.xml +++ b/hadoop/pom.xml @@ -40,6 +40,10 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/integration/presto/pom.xml ---------------------------------------------------------------------- diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml index c3c7c64..de69df0 100644 --- a/integration/presto/pom.xml +++ b/integration/presto/pom.xml @@ -488,6 +488,11 @@ <artifactId>commons-io</artifactId> <version>2.4</version> </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-catalyst_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java index 78a1ea8..4560241 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java @@ -18,11 +18,30 @@ package org.apache.carbondata.presto; import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.StructField; import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; +import org.apache.spark.sql.types.ArrayType; +import org.apache.spark.sql.types.BooleanType; +import org.apache.spark.sql.types.DateType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.DoubleType; +import org.apache.spark.sql.types.FloatType; +import org.apache.spark.sql.types.IntegerType; +import org.apache.spark.sql.types.LongType; +import org.apache.spark.sql.types.NullType; +import org.apache.spark.sql.types.ShortType; +import org.apache.spark.sql.types.StringType; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.TimestampType; + public class CarbonColumnVectorWrapper implements CarbonColumnVector { private CarbonColumnVectorImpl columnVector; @@ -218,4 +237,49 @@ public class CarbonColumnVectorWrapper implements CarbonColumnVector { @Override public void setFilteredRowsExist(boolean filteredRowsExist) { this.filteredRowsExist = filteredRowsExist; } + + // TODO: this is copied from carbondata-spark-common module, use presto type instead of this + private org.apache.carbondata.core.metadata.datatype.DataType + convertSparkToCarbonDataType(org.apache.spark.sql.types.DataType dataType) { + if (dataType instanceof StringType) { + return DataTypes.STRING; + } else if (dataType instanceof ShortType) { + return DataTypes.SHORT; + } else if (dataType instanceof IntegerType) { + return DataTypes.INT; + } else if (dataType instanceof LongType) { + return DataTypes.LONG; + } else if (dataType instanceof DoubleType) { + return DataTypes.DOUBLE; + } else if (dataType instanceof FloatType) { + return DataTypes.FLOAT; + } else if (dataType instanceof DateType) { + return DataTypes.DATE; + } else if (dataType instanceof BooleanType) { + return DataTypes.BOOLEAN; + } else if (dataType instanceof TimestampType) { + return DataTypes.TIMESTAMP; + } else if (dataType instanceof NullType) { + return DataTypes.NULL; + } else if (dataType instanceof DecimalType) { + DecimalType decimal = (DecimalType) dataType; + return DataTypes.createDecimalType(decimal.precision(), decimal.scale()); + } else if (dataType instanceof ArrayType) { + org.apache.spark.sql.types.DataType elementType = ((ArrayType) dataType).elementType(); + return DataTypes.createArrayType(convertSparkToCarbonDataType(elementType)); + } else if (dataType instanceof StructType) { + StructType structType = (StructType) dataType; + org.apache.spark.sql.types.StructField[] fields = structType.fields(); + List<StructField> carbonFields = new ArrayList<>(); + for (org.apache.spark.sql.types.StructField field : fields) { + carbonFields.add( + new StructField( + field.name(), + convertSparkToCarbonDataType(field.dataType()))); + } + return DataTypes.createStructType(carbonFields); + } else { + throw new UnsupportedOperationException("getting " + dataType + " from presto"); + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java index 3670e11..6e9e0a6 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java @@ -18,9 +18,12 @@ package org.apache.carbondata.spark.util; import java.io.Serializable; +import java.math.BigDecimal; import org.apache.carbondata.core.util.DataTypeConverter; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.util.GenericArrayData; import org.apache.spark.unsafe.types.UTF8String; /** @@ -30,14 +33,26 @@ public final class SparkDataTypeConverterImpl implements DataTypeConverter, Seri private static final long serialVersionUID = -4379212832935070583L; - public Object convertToDecimal(Object data) { + @Override + public Object convertFromStringToDecimal(Object data) { + java.math.BigDecimal javaDecVal = new java.math.BigDecimal(data.toString()); + return org.apache.spark.sql.types.Decimal.apply(javaDecVal); + } + + @Override + public Object convertFromBigDecimalToDecimal(Object data) { if (null == data) { return null; } - java.math.BigDecimal javaDecVal = new java.math.BigDecimal(data.toString()); - return org.apache.spark.sql.types.Decimal.apply(javaDecVal); + return org.apache.spark.sql.types.Decimal.apply((BigDecimal)data); + } + + @Override + public Object convertFromDecimalToBigDecimal(Object data) { + return ((org.apache.spark.sql.types.Decimal) data).toJavaBigDecimal(); } + @Override public byte[] convertFromStringToByte(Object data) { if (null == data) { return null; @@ -45,17 +60,34 @@ public final class SparkDataTypeConverterImpl implements DataTypeConverter, Seri return UTF8String.fromString((String) data).getBytes(); } - public Object convertFromByteToUTF8String(Object data) { + @Override + public Object convertFromByteToUTF8String(byte[] data) { if (null == data) { return null; } - return UTF8String.fromBytes((byte[]) data); + return UTF8String.fromBytes(data); + } + + @Override + public byte[] convertFromByteToUTF8Bytes(byte[] data) { + return UTF8String.fromBytes(data).getBytes(); } + @Override public Object convertFromStringToUTF8String(Object data) { if (null == data) { return null; } return UTF8String.fromString((String) data); } + + @Override + public Object wrapWithGenericArrayData(Object data) { + return new GenericArrayData(data); + } + + @Override + public Object wrapWithGenericRow(Object[] fields) { + return new GenericInternalRow(fields); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala index 5647427..452db56 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala @@ -50,6 +50,7 @@ import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.merger.CarbonCompactionUtil import org.apache.carbondata.processing.partition.spliter.CarbonSplitExecutor import org.apache.carbondata.processing.util.CarbonLoaderUtil +import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl /** @@ -141,7 +142,7 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel, var result : java.util.List[PartitionSpliterRawResultIterator] = null try { exec = new CarbonSplitExecutor(segmentMapping, carbonTable) - result = exec.processDataBlocks(segmentId) + result = exec.processDataBlocks(segmentId, new SparkDataTypeConverterImpl()) } catch { case e: Throwable => LOGGER.error(e) http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala index 59ed8ba..3250a53 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala @@ -49,6 +49,8 @@ import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingExcep import org.apache.carbondata.processing.util.CarbonDataProcessorUtil object CarbonScalaUtil { + + // TODO: move this to spark module def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = { dataType match { case StringType => CarbonDataTypes.STRING http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java index 432d50a..9e0c102 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java @@ -36,14 +36,11 @@ class ColumnarVectorWrapper implements CarbonColumnVector { private boolean filteredRowsExist; - private DataType dataType; - private DataType blockDataType; ColumnarVectorWrapper(ColumnVector columnVector, boolean[] filteredRows) { this.columnVector = columnVector; this.filteredRows = filteredRows; - this.dataType = CarbonScalaUtil.convertSparkToCarbonDataType(columnVector.dataType()); } @Override public void putBoolean(int rowId, boolean value) { @@ -117,16 +114,16 @@ class ColumnarVectorWrapper implements CarbonColumnVector { @Override public void putDecimal(int rowId, BigDecimal value, int precision) { if (!filteredRows[rowId]) { - Decimal toDecimal = org.apache.spark.sql.types.Decimal.apply(value); + Decimal toDecimal = Decimal.apply(value); columnVector.putDecimal(counter++, toDecimal, precision); } } @Override public void putDecimals(int rowId, int count, BigDecimal value, int precision) { + Decimal decimal = Decimal.apply(value); for (int i = 0; i < count; i++) { if (!filteredRows[rowId]) { - Decimal toDecimal = org.apache.spark.sql.types.Decimal.apply(value); - columnVector.putDecimal(counter++, toDecimal, precision); + columnVector.putDecimal(counter++, decimal, precision); } rowId++; } @@ -210,7 +207,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector { } @Override public DataType getType() { - return dataType; + return CarbonScalaUtil.convertSparkToCarbonDataType(columnVector.dataType()); } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 25ddd0b..0d00023 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -1079,5 +1079,4 @@ object CarbonDataRDDFactory { hadoopConf ).collect() } - } http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala index 1de66c1..b7df9b4 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala @@ -45,8 +45,8 @@ class SparkUnknownExpression( val values = carbonRowInstance.getValues.toSeq.map { case s: String => org.apache.spark.unsafe.types.UTF8String.fromString(s) - case d: java.math.BigDecimal => - org.apache.spark.sql.types.Decimal.apply(d) + case d: java.math.BigDecimal => org.apache.spark.sql.types.Decimal.apply(d) + case b: Array[Byte] => org.apache.spark.unsafe.types.UTF8String.fromBytes(b) case value => value } try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala index c8f7ac7..ff17cfd 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala @@ -20,16 +20,17 @@ package org.apache.spark.sql.execution.command.schema import scala.collection.JavaConverters._ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} -import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, MetadataCommand} +import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, DataTypeInfo, MetadataCommand} import org.apache.spark.sql.hive.CarbonSessionCatalog import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn import org.apache.carbondata.events.{AlterTableDataTypeChangePostEvent, AlterTableDataTypeChangePreEvent, OperationContext, OperationListenerBus} -import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo} -import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil} +import org.apache.carbondata.format.SchemaEvolutionEntry +import org.apache.carbondata.spark.util.DataTypeConverterUtil private[sql] case class CarbonAlterTableDataTypeChangeCommand( alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel) @@ -65,8 +66,7 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand( } val carbonColumn = carbonColumns.filter(_.getColName.equalsIgnoreCase(columnName)) if (carbonColumn.size == 1) { - CarbonScalaUtil - .validateColumnDataType(alterTableDataTypeChangeModel.dataTypeInfo, carbonColumn.head) + validateColumnDataType(alterTableDataTypeChangeModel.dataTypeInfo, carbonColumn.head) } else { LOGGER.audit(s"Alter table change data type request has failed. " + s"Column $columnName is invalid") @@ -119,4 +119,51 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand( } Seq.empty } + + /** + * This method will validate a column for its data type and check whether the column data type + * can be modified and update if conditions are met. + */ + private def validateColumnDataType( + dataTypeInfo: DataTypeInfo, + carbonColumn: CarbonColumn): Unit = { + carbonColumn.getDataType.getName match { + case "INT" => + if (!dataTypeInfo.dataType.equals("bigint") && !dataTypeInfo.dataType.equals("long")) { + sys.error(s"Given column ${ carbonColumn.getColName } with data type " + + s"${carbonColumn.getDataType.getName} cannot be modified. " + + s"Int can only be changed to bigInt or long") + } + case "DECIMAL" => + if (!dataTypeInfo.dataType.equals("decimal")) { + sys.error(s"Given column ${ carbonColumn.getColName } with data type" + + s" ${ carbonColumn.getDataType.getName} cannot be modified." + + s" Decimal can be only be changed to Decimal of higher precision") + } + if (dataTypeInfo.precision <= carbonColumn.getColumnSchema.getPrecision) { + sys.error(s"Given column ${carbonColumn.getColName} cannot be modified. " + + s"Specified precision value ${dataTypeInfo.precision} should be " + + s"greater than current precision value " + + s"${carbonColumn.getColumnSchema.getPrecision}") + } else if (dataTypeInfo.scale < carbonColumn.getColumnSchema.getScale) { + sys.error(s"Given column ${carbonColumn.getColName} cannot be modified. " + + s"Specified scale value ${dataTypeInfo.scale} should be greater or " + + s"equal to current scale value ${carbonColumn.getColumnSchema.getScale}") + } else { + // difference of precision and scale specified by user should not be less than the + // difference of already existing precision and scale else it will result in data loss + val carbonColumnPrecisionScaleDiff = carbonColumn.getColumnSchema.getPrecision - + carbonColumn.getColumnSchema.getScale + val dataInfoPrecisionScaleDiff = dataTypeInfo.precision - dataTypeInfo.scale + if (dataInfoPrecisionScaleDiff < carbonColumnPrecisionScaleDiff) { + sys.error(s"Given column ${carbonColumn.getColName} cannot be modified. " + + s"Specified precision and scale values will lead to data loss") + } + } + case _ => + sys.error(s"Given column ${carbonColumn.getColName} with data type " + + s"${carbonColumn.getDataType.getName} cannot be modified. " + + s"Only Int and Decimal data types are allowed for modification") + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala index 2eed988..3cb46f7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala @@ -51,7 +51,7 @@ import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWrit import org.apache.carbondata.hadoop.internal.ObjectArrayWritable import org.apache.carbondata.hadoop.util.ObjectSerializationUtil import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption} -import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util} +import org.apache.carbondata.spark.util.{CarbonScalaUtil, SparkDataTypeConverterImpl, Util} class CarbonFileFormat extends FileFormat http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala index 7b4bc0d..d2ffac7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala @@ -1141,7 +1141,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule if(null == columnSchema) { null } else { - new QueryColumn(columnSchema.getColumnSchema, + new QueryColumn( + columnSchema.getColumnSchema, isFilterColumn, timeseriesFunction.toLowerCase) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/processing/pom.xml ---------------------------------------------------------------------- diff --git a/processing/pom.xml b/processing/pom.xml index dfabaa2..648810d 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -39,11 +39,7 @@ <artifactId>carbondata-core</artifactId> <version>${project.version}</version> </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_${scala.binary.version}</artifactId> - </dependency> - <dependency> + <dependency> <groupId>com.univocity</groupId> <artifactId>univocity-parsers</artifactId> <version>2.2.1</version> http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java index bbc3697..bc28ace 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java @@ -40,9 +40,7 @@ import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; import org.apache.carbondata.processing.util.CarbonLoaderUtil; import org.apache.commons.lang3.StringUtils; -import org.apache.spark.annotation.DeveloperApi; -@DeveloperApi public class TableProcessingOperations { private static final LogService LOGGER = LogServiceFactory.getLogService(CarbonLoaderUtil.class.getName()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/SortDataFormat.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/SortDataFormat.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/SortDataFormat.java new file mode 100644 index 0000000..bcf283e --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/SortDataFormat.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.loading.sort.unsafe.sort; + +/** + * Ported from Apache Spark. + * + * Abstraction for sorting an arbitrary input buffer of data. This interface requires determining + * the sort key for a given element index, as well as swapping elements and moving data from one + * buffer to another. + * Example format: an array of numbers, where each element is also the key. + * See [[KVArraySortDataFormat]] for a more exciting format. + * Note: Declaring and instantiating multiple subclasses of this class would prevent JIT inlining + * overridden methods and hence decrease the shuffle performance. + * + * @tparam K Type of the sort key of each element + * @tparam Buffer Internal data structure used by a particular format (e.g., Array[Int]). + */ +// TODO: Making Buffer a real trait would be a better abstraction, but adds some complexity. +abstract class SortDataFormat<K, Buffer> { + + /** + * Creates a new mutable key for reuse. This should be implemented if you want to override + * [[getKey(Buffer, Int, K)]]. + */ + K newKey() { + return null; + } + + /** + * Return the sort key for the element at the given index. + */ + protected abstract K getKey(Buffer data, int pos); + + /** + * Returns the sort key for the element at the given index and reuse the input key if possible. + * The default implementation ignores the reuse parameter and invokes [[getKey(Buffer, Int]]. + * If you want to override this method, you must implement [[newKey()]]. + */ + K getKey(Buffer data, int pos, K reuse) { + return getKey(data, pos); + } + + /** + * Swap two elements. + */ + abstract void swap(Buffer data, int pos0, int pos1); + + /** + * Copy a single element from src(srcPos) to dst(dstPos). + */ + abstract void copyElement(Buffer src, int srcPos, Buffer dst, int dstPos); + + /** + * Copy a range of elements starting at src(srcPos) to dst, starting at dstPos. + * Overlapping ranges are allowed. + */ + abstract void copyRange(Buffer src, int srcPos, Buffer dst, int dstPos, int length); + + /** + * Allocates a Buffer that can hold up to 'length' elements. + * All elements of the buffer should be considered invalid until data is explicitly copied in. + */ + abstract Buffer allocate(int length); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/TimSort.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/TimSort.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/TimSort.java index dac3b47..377edfc 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/TimSort.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/TimSort.java @@ -18,8 +18,6 @@ package org.apache.carbondata.processing.loading.sort.unsafe.sort; import java.util.Comparator; -import org.apache.spark.util.collection.SortDataFormat; - /** * A port of the Apache Spark's TimSort and they originally ported from Android TimSort class, * which utilizes a "stable, adaptive, iterative mergesort." http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/UnsafeIntSortDataFormat.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/UnsafeIntSortDataFormat.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/UnsafeIntSortDataFormat.java index 92962d9..0c205d7 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/UnsafeIntSortDataFormat.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/UnsafeIntSortDataFormat.java @@ -20,8 +20,6 @@ import org.apache.carbondata.core.memory.IntPointerBuffer; import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage; import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow; -import org.apache.spark.util.collection.SortDataFormat; - /** * Interface implementation for utilities to sort the data. */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index e02f3ab..850ceca 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -36,6 +36,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.scan.result.iterator.RawResultIterator; import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException; import org.apache.carbondata.processing.sort.sortdata.SingleThreadFinalSortFilesMerger; @@ -47,8 +48,6 @@ import org.apache.carbondata.processing.store.CarbonFactHandler; import org.apache.carbondata.processing.store.CarbonFactHandlerFactory; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; -import org.apache.spark.sql.types.Decimal; - /** * This class will process the query result and convert the data * into a format compatible for data load @@ -295,7 +294,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { private Object getConvertedMeasureValue(Object value, DataType type) { if (DataTypes.isDecimal(type)) { if (value != null) { - value = ((Decimal) value).toJavaBigDecimal(); + value = DataTypeUtil.getDataTypeConverter().convertFromDecimalToBigDecimal(value); } return value; } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java index b18207d..173a5c0 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java @@ -31,7 +31,7 @@ import org.apache.carbondata.core.datastore.block.TaskBlockInfo; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; import org.apache.carbondata.core.scan.result.iterator.PartitionSpliterRawResultIterator; -import org.apache.carbondata.core.util.DataTypeConverterImpl; +import org.apache.carbondata.core.util.DataTypeConverter; /** * Used to read carbon blocks when add/split partition @@ -46,10 +46,11 @@ public class CarbonSplitExecutor extends AbstractCarbonQueryExecutor { this.carbonTable = carbonTable; } - public List<PartitionSpliterRawResultIterator> processDataBlocks(String segmentId) + public List<PartitionSpliterRawResultIterator> processDataBlocks( + String segmentId, DataTypeConverter converter) throws QueryExecutionException, IOException { List<TableBlockInfo> list = null; - queryModel = carbonTable.createQueryModelWithProjectAllColumns(new DataTypeConverterImpl()); + queryModel = carbonTable.createQueryModelWithProjectAllColumns(converter); queryModel.setForcedDetailRawQuery(true); List<PartitionSpliterRawResultIterator> resultList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java index ff33823..f22d1c1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java @@ -47,9 +47,9 @@ import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.processing.datatypes.GenericDataType; -import org.apache.spark.sql.types.Decimal; /** * Represent a page data for all columns, we store its data in columnar layout, so that @@ -184,7 +184,7 @@ public class TablePage { if (DataTypes.isDecimal(measurePages[i].getDataType()) && model.isCompactionFlow() && value != null) { - value = ((Decimal) value).toJavaBigDecimal(); + value = DataTypeUtil.getDataTypeConverter().convertFromDecimalToBigDecimal(value); } measurePages[i].putData(rowId, value); }
