fixed cast exception for integer data types in RestructureBasedVectorResultCollector
fixed cast exception for double datatype Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/8c5c00c0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/8c5c00c0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/8c5c00c0 Branch: refs/heads/branch-1.1 Commit: 8c5c00c0c169d74adb72f13b8a781c893a6ad791 Parents: 8478939 Author: kunal642 <kunal.kap...@knoldus.in> Authored: Fri Apr 7 18:56:16 2017 +0530 Committer: Venkata Ramana G <ramana.gollam...@huawei.com> Committed: Thu Apr 13 19:15:54 2017 +0530 ---------------------------------------------------------------------- .../RestructureBasedVectorResultCollector.java | 28 ++++++++--- .../scan/executor/util/RestructureUtil.java | 50 +++++++++++++++++++- .../scan/executor/util/RestructureUtilTest.java | 2 +- .../carbondata/spark/load/CarbonLoaderUtil.java | 2 +- .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 6 +-- .../execution/command/AlterTableCommands.scala | 4 -- .../apache/spark/sql/hive/CarbonMetastore.scala | 8 ++-- .../vectorreader/AddColumnTestCases.scala | 44 +++++++++++++++++ 8 files changed, 123 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8c5c00c0/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java index 99b7f01..c09ec12 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java @@ -16,7 +16,6 @@ */ package org.apache.carbondata.core.scan.collector.impl; -import java.math.BigDecimal; import java.util.List; import org.apache.carbondata.core.metadata.encoder.Encoding; @@ -36,10 +35,13 @@ import org.apache.spark.sql.types.Decimal; */ public class RestructureBasedVectorResultCollector extends DictionaryBasedVectorResultCollector { + private Object[] measureDefaultValues = null; + public RestructureBasedVectorResultCollector(BlockExecutionInfo blockExecutionInfos) { super(blockExecutionInfos); queryDimensions = tableBlockExecutionInfos.getActualQueryDimensions(); queryMeasures = tableBlockExecutionInfos.getActualQueryMeasures(); + measureDefaultValues = new Object[queryMeasures.length]; allColumnInfo = new ColumnVectorInfo[queryDimensions.length + queryMeasures.length]; createVectorForNewlyAddedDimensions(); createVectorForNewlyAddedMeasures(); @@ -68,10 +70,23 @@ public class RestructureBasedVectorResultCollector extends DictionaryBasedVector // add a dummy column vector result collector object ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo(); allColumnInfo[queryMeasures[i].getQueryOrder()] = columnVectorInfo; + measureDefaultValues[i] = getMeasureDefaultValue(queryMeasures[i].getMeasure()); } } } + /** + * Gets the default value for each CarbonMeasure + * @param carbonMeasure + * @return + */ + private Object getMeasureDefaultValue(CarbonMeasure carbonMeasure) { + return RestructureUtil.getMeasureDefaultValueByType(carbonMeasure.getColumnSchema(), + carbonMeasure.getDefaultValue()); + } + + + @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) { throw new UnsupportedOperationException("collectData is not supported here"); } @@ -177,11 +192,11 @@ public class RestructureBasedVectorResultCollector extends DictionaryBasedVector private void fillDataForNonExistingMeasures() { for (int i = 0; i < tableBlockExecutionInfos.getActualQueryMeasures().length; i++) { if (!measureInfo.getMeasureExists()[i]) { + int queryOrder = tableBlockExecutionInfos.getActualQueryMeasures()[i].getQueryOrder(); CarbonMeasure measure = tableBlockExecutionInfos.getActualQueryMeasures()[i].getMeasure(); - ColumnVectorInfo columnVectorInfo = allColumnInfo[i]; - CarbonColumnVector vector = allColumnInfo[i].vector; - Object defaultValue = RestructureUtil - .getMeasureDefaultValue(measure.getColumnSchema(), measure.getDefaultValue()); + ColumnVectorInfo columnVectorInfo = allColumnInfo[queryOrder]; + CarbonColumnVector vector = columnVectorInfo.vector; + Object defaultValue = measureDefaultValues[i]; if (null == defaultValue) { vector.putNulls(columnVectorInfo.vectorOffset, columnVectorInfo.size); } else { @@ -199,9 +214,8 @@ public class RestructureBasedVectorResultCollector extends DictionaryBasedVector (long) defaultValue); break; case DECIMAL: - Decimal convertToSparkType = Decimal.apply((BigDecimal) defaultValue); vector.putDecimals(columnVectorInfo.vectorOffset, columnVectorInfo.size, - convertToSparkType, measure.getPrecision()); + (Decimal) defaultValue, measure.getPrecision()); break; default: vector.putDoubles(columnVectorInfo.vectorOffset, columnVectorInfo.size, http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8c5c00c0/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java index 955f1f1..ab0ed55 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java @@ -39,6 +39,7 @@ import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.commons.lang3.ArrayUtils; +import org.apache.spark.sql.types.Decimal; import org.apache.spark.unsafe.types.UTF8String; /** @@ -295,7 +296,54 @@ public class RestructureUtil { new String(defaultValue, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); Double parsedValue = Double.valueOf(value); if (!Double.isInfinite(parsedValue) && !Double.isNaN(parsedValue)) { - measureDefaultValue = value; + measureDefaultValue = parsedValue; + } + } + } + return measureDefaultValue; + } + + /** + * Gets the default value based on the column data type. + * + * @param columnSchema + * @param defaultValue + * @return + */ + public static Object getMeasureDefaultValueByType(ColumnSchema columnSchema, + byte[] defaultValue) { + Object measureDefaultValue = null; + if (!isDefaultValueNull(defaultValue)) { + String value = null; + switch (columnSchema.getDataType()) { + case SHORT: + value = + new String(defaultValue, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); + measureDefaultValue = Short.parseShort(value); + break; + case INT: + value = + new String(defaultValue, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); + measureDefaultValue = Integer.parseInt(value); + break; + case LONG: + value = + new String(defaultValue, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); + measureDefaultValue = Long.parseLong(value); + break; + case DECIMAL: + BigDecimal decimal = DataTypeUtil.byteToBigDecimal(defaultValue); + if (columnSchema.getScale() > decimal.scale()) { + decimal = decimal.setScale(columnSchema.getScale(), RoundingMode.HALF_UP); + } + measureDefaultValue = Decimal.apply(decimal); + break; + default: + value = + new String(defaultValue, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); + Double parsedValue = Double.valueOf(value); + if (!Double.isInfinite(parsedValue) && !Double.isNaN(parsedValue)) { + measureDefaultValue = parsedValue; } } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8c5c00c0/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java b/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java index 5387823..df14381 100644 --- a/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java +++ b/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java @@ -129,7 +129,7 @@ public class RestructureUtilTest { MeasureInfo measureInfo = blockExecutionInfo.getMeasureInfo(); boolean[] measuresExist = { true, true, false }; assertThat(measureInfo.getMeasureExists(), is(equalTo(measuresExist))); - Object[] defaultValues = { null, null, "3" }; + Object[] defaultValues = { null, null, 3.0 }; assertThat(measureInfo.getDefaultValues(), is(equalTo(defaultValues))); } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8c5c00c0/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java index 036e574..27a8e3a 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java @@ -184,7 +184,7 @@ public final class CarbonLoaderUtil { try { long startTime = System.currentTimeMillis(); File file = new File(localStoreLocation); - CarbonUtil.deleteFoldersAndFiles(file.getParentFile()); + CarbonUtil.deleteFoldersAndFiles(file); LOGGER.info( "Deleted the local store location" + localStoreLocation + " : TIme taken: " + ( System.currentTimeMillis() - startTime)); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8c5c00c0/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 9d2c245..9ca9163 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -150,12 +150,12 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { protected val DECIMAL = carbonKeyWord("DECIMAL") protected val DOUBLE = carbonKeyWord("DOUBLE") protected val FLOAT = carbonKeyWord("FLOAT") - protected val SHORT = carbonKeyWord("SMALLINT") + protected val SHORT = carbonKeyWord("SHORT") protected val INT = carbonKeyWord("INT") protected val BIGINT = carbonKeyWord("BIGINT") protected val ARRAY = carbonKeyWord("ARRAY") protected val STRUCT = carbonKeyWord("STRUCT") - + protected val SMALLINT = carbonKeyWord("SMALLINT") protected val CHANGE = carbonKeyWord("CHANGE") protected val TBLPROPERTIES = carbonKeyWord("TBLPROPERTIES") @@ -856,7 +856,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { protected lazy val primitiveTypes = STRING ^^^ "string" | INTEGER ^^^ "integer" | TIMESTAMP ^^^ "timestamp" | NUMERIC ^^^ "numeric" | - BIGINT ^^^ "bigint" | SHORT ^^^ "smallint" | + BIGINT ^^^ "bigint" | (SHORT | SMALLINT) ^^^ "smallint" | INT ^^^ "int" | DOUBLE ^^^ "double" | FLOAT ^^^ "double" | decimalType | DATE ^^^ "date" | charType http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8c5c00c0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala index 0be0bdf..e380217 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala @@ -82,8 +82,6 @@ private[sql] case class AlterTableAddColumns( schemaEvolutionEntry.setAdded(newCols.toList.asJava) val thriftTable = schemaConverter .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) - thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) - .setTime_stamp(System.currentTimeMillis) AlterTableUtil .updateSchemaInfo(carbonTable, schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry), @@ -301,8 +299,6 @@ private[sql] case class AlterTableDropColumns( } } // add deleted columns to schema evolution history and update the schema - tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) - .setTime_stamp(System.currentTimeMillis) val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis) schemaEvolutionEntry.setRemoved(deletedColumnSchema.toList.asJava) AlterTableUtil http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8c5c00c0/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala index 6f74960..1f5736e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala @@ -315,12 +315,12 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) { carbonStorePath: String) (sparkSession: SparkSession): String = { val schemaConverter = new ThriftWrapperSchemaConverterImpl + thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry) val wrapperTableInfo = schemaConverter .fromExternalToWrapperTableInfo(thriftTableInfo, - carbonTableIdentifier.getDatabaseName, - carbonTableIdentifier.getTableName, - carbonStorePath) - thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry) + carbonTableIdentifier.getDatabaseName, + carbonTableIdentifier.getTableName, + carbonStorePath) createSchemaThriftFile(wrapperTableInfo, thriftTableInfo, carbonTableIdentifier.getDatabaseName, http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8c5c00c0/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala index 747af05..3b4a25c 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala @@ -187,6 +187,50 @@ class AddColumnTestCases extends QueryTest with BeforeAndAfterAll { checkAnswer(sql("select distinct(CUST_NAME) from carbon_new"),Row("testuser")) } + test("test to check if intField returns correct result") { + sql("DROP TABLE IF EXISTS carbon_table") + sql("CREATE TABLE carbon_table(intField int,stringField string,charField string,timestampField timestamp, decimalField decimal(6,2)) STORED BY 'carbondata'") + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE carbon_table options('FILEHEADER'='intField,stringField,charField,timestampField,decimalField')") + sql( + "Alter table carbon_table add columns(newField int) TBLPROPERTIES" + + "('DEFAULT.VALUE.newField'='67890')") + checkAnswer(sql("select distinct(newField) from carbon_table"), Row(67890)) + sql("DROP TABLE IF EXISTS carbon_table") + } + + test("test to check if shortField returns correct result") { + sql("DROP TABLE IF EXISTS carbon_table") + sql("CREATE TABLE carbon_table(intField int,stringField string,charField string,timestampField timestamp, decimalField decimal(6,2)) STORED BY 'carbondata'") + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE carbon_table options('FILEHEADER'='intField,stringField,charField,timestampField,decimalField')") + sql( + "Alter table carbon_table add columns(newField short) TBLPROPERTIES" + + "('DEFAULT.VALUE.newField'='1')") + checkAnswer(sql("select distinct(newField) from carbon_table"), Row(1)) + sql("DROP TABLE IF EXISTS carbon_table") + } + + test("test to check if doubleField returns correct result") { + sql("DROP TABLE IF EXISTS carbon_table") + sql("CREATE TABLE carbon_table(intField int,stringField string,charField string,timestampField timestamp, decimalField decimal(6,2)) STORED BY 'carbondata'") + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE carbon_table options('FILEHEADER'='intField,stringField,charField,timestampField,decimalField')") + sql( + "Alter table carbon_table add columns(newField double) TBLPROPERTIES" + + "('DEFAULT.VALUE.newField'='1457567.87')") + checkAnswer(sql("select distinct(newField) from carbon_table"), Row(1457567.87)) + sql("DROP TABLE IF EXISTS carbon_table") + } + + test("test to check if decimalField returns correct result") { + sql("DROP TABLE IF EXISTS carbon_table") + sql("CREATE TABLE carbon_table(intField int,stringField string,charField string,timestampField timestamp, decimalField decimal(6,2)) STORED BY 'carbondata'") + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE carbon_table options('FILEHEADER'='intField,stringField,charField,timestampField,decimalField')") + sql( + "Alter table carbon_table add columns(newField decimal(5,2)) TBLPROPERTIES" + + "('DEFAULT.VALUE.newField'='21.87')") + checkAnswer(sql("select distinct(newField) from carbon_table"), Row(21.87)) + sql("DROP TABLE IF EXISTS carbon_table") + } + override def afterAll { sql("DROP TABLE IF EXISTS addcolumntest")