Repository: carbondata Updated Branches: refs/heads/master f0f4d7d09 -> 30457c415
[CARBONDATA-1822][Spark-Integration] Support DDL to register the CarbonData table from existing carbon table data Problem: Support DDL command to create the table from existing table data. Solution: Existing spark refresh table DDL command will be overridden in the carbon to support the register carbon table with hivemetastore. REFRESH TABLE .; This closes #1583 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/30457c41 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/30457c41 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/30457c41 Branch: refs/heads/master Commit: 30457c41511ed22c9980cfe575e3f277cc7b4c33 Parents: f0f4d7d Author: mohammadshahidkhan <[email protected]> Authored: Thu Nov 23 20:02:14 2017 +0530 Committer: ravipesala <[email protected]> Committed: Thu Dec 7 18:52:30 2017 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 5 + .../ThriftWrapperSchemaConverterImpl.java | 7 +- .../carbondata/hadoop/util/SchemaReader.java | 37 ++- .../carbondata/events/RefreshTableEvents.scala | 36 +++ .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 47 ++-- .../command/carbonTableSchemaCommon.scala | 30 +- .../carbondata/spark/util/CarbonSparkUtil.scala | 47 ++++ .../org/apache/spark/sql/CarbonSource.scala | 4 +- .../management/RefreshCarbonTableCommand.scala | 208 ++++++++++++++ .../CreatePreAggregateTableCommand.scala | 4 +- .../table/CarbonCreateTableCommand.scala | 33 ++- .../sql/execution/strategy/DDLStrategy.scala | 12 +- .../spark/sql/hive/CarbonHiveMetaStore.scala | 2 - .../apache/spark/sql/hive/CarbonMetaStore.scala | 11 +- .../sql/parser/CarbonSpark2SqlParser.scala | 3 + .../spark/sql/parser/CarbonSparkSqlParser.scala | 7 +- .../register/TestRegisterCarbonTable.scala | 281 +++++++++++++++++++ 17 files changed, 694 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 72d8b0c..5fb08a3 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1380,6 +1380,11 @@ public final class CarbonCommonConstants { */ public static final String TABLE_COMMENT = "comment"; + /** + * this will be used to provide comment for table + */ + public static final String COLUMN_COMMENT = "comment"; + public static final String BITSET_PIPE_LINE_DEFAULT = "true"; /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java index c1e68da..945a40c 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java @@ -194,9 +194,12 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter { thriftColumnSchema.setColumnReferenceId(wrapperColumnSchema.getColumnReferenceId()); thriftColumnSchema.setSchemaOrdinal(wrapperColumnSchema.getSchemaOrdinal()); if (wrapperColumnSchema.isSortColumn()) { - Map<String, String> properties = new HashMap<String, String>(); + Map<String, String> properties = wrapperColumnSchema.getColumnProperties(); + if (null == properties) { + properties = new HashMap<String, String>(); + thriftColumnSchema.setColumnProperties(properties); + } properties.put(CarbonCommonConstants.SORT_COLUMNS, "true"); - thriftColumnSchema.setColumnProperties(properties); } thriftColumnSchema.setAggregate_function(wrapperColumnSchema.getAggFunction()); if (null != wrapperColumnSchema.getTimeSeriesFunction() && !wrapperColumnSchema http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java index 47afc9c..c0f8816 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java @@ -21,16 +21,15 @@ import java.io.IOException; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.CarbonMetadata; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.converter.SchemaConverter; import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.TableInfo; -import org.apache.carbondata.core.reader.ThriftReader; +import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; -import org.apache.thrift.TBase; - /** * TODO: It should be removed after store manager implementation. */ @@ -46,18 +45,8 @@ public class SchemaReader { FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)) { String tableName = identifier.getCarbonTableIdentifier().getTableName(); - ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() { - public TBase create() { - return new org.apache.carbondata.format.TableInfo(); - } - }; - ThriftReader thriftReader = - new ThriftReader(carbonTablePath.getSchemaFilePath(), createTBase); - thriftReader.open(); org.apache.carbondata.format.TableInfo tableInfo = - (org.apache.carbondata.format.TableInfo) thriftReader.read(); - thriftReader.close(); - + CarbonUtil.readSchemaFile(carbonTablePath.getSchemaFilePath()); SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( tableInfo, @@ -71,4 +60,24 @@ public class SchemaReader { throw new IOException("File does not exist: " + schemaFilePath); } } + /** + * the method returns the Wrapper TableInfo + * + * @param absoluteTableIdentifier + * @return + */ + public static TableInfo getTableInfo(AbsoluteTableIdentifier absoluteTableIdentifier) + throws IOException { + CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); + org.apache.carbondata.format.TableInfo thriftTableInfo = + CarbonUtil.readSchemaFile(carbonTablePath.getSchemaFilePath()); + ThriftWrapperSchemaConverterImpl thriftWrapperSchemaConverter = + new ThriftWrapperSchemaConverterImpl(); + CarbonTableIdentifier carbonTableIdentifier = + absoluteTableIdentifier.getCarbonTableIdentifier(); + TableInfo tableInfo = thriftWrapperSchemaConverter + .fromExternalToWrapperTableInfo(thriftTableInfo, carbonTableIdentifier.getDatabaseName(), + carbonTableIdentifier.getTableName(), absoluteTableIdentifier.getTablePath()); + return tableInfo; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark-common/src/main/scala/org/apache/carbondata/events/RefreshTableEvents.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/RefreshTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/RefreshTableEvents.scala new file mode 100644 index 0000000..662f9e7 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/RefreshTableEvents.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.events + +import org.apache.spark.sql._ + +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier + +/** + * Class for handling operations before start of a table registration process. + * Example usage: For validation purpose + */ +case class RefreshTablePreExecutionEvent(sparkSession: SparkSession, + identifier: AbsoluteTableIdentifier) extends Event with TableEventInfo + +/** + * Class for handling operations after finish of registration process. + * Example usage: For validation purpose + */ +case class RefreshTablePostExecutionEvent(sparkSession: SparkSession, + identifier: AbsoluteTableIdentifier) extends Event with TableEventInfo http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 8972b3d..081e5cf 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -1044,48 +1044,48 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { dataType match { case "string" => Field(field.column, Some("String"), field.name, Some(null), field.parent, - field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema - ) + field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema, + field.columnComment) case "smallint" => Field(field.column, Some("SmallInt"), field.name, Some(null), field.parent, field.storeType, field.schemaOrdinal, - field.precision, field.scale, field.rawSchema) + field.precision, field.scale, field.rawSchema, field.columnComment) case "integer" | "int" => Field(field.column, Some("Integer"), field.name, Some(null), field.parent, field.storeType, field.schemaOrdinal, - field.precision, field.scale, field.rawSchema) + field.precision, field.scale, field.rawSchema, field.columnComment) case "long" => Field(field.column, Some("Long"), field.name, Some(null), field.parent, - field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema - ) + field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema, + field.columnComment) case "double" => Field(field.column, Some("Double"), field.name, Some(null), field.parent, - field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema - ) + field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema, + field.columnComment) case "float" => Field(field.column, Some("Double"), field.name, Some(null), field.parent, - field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema - ) + field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema, + field.columnComment) case "timestamp" => Field(field.column, Some("Timestamp"), field.name, Some(null), field.parent, field.storeType, field.schemaOrdinal, - field.precision, field.scale, field.rawSchema) + field.precision, field.scale, field.rawSchema, field.columnComment) case "numeric" => Field(field.column, Some("Numeric"), field.name, Some(null), field.parent, - field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema - ) + field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema, + field.columnComment) case "array" => Field(field.column, Some("Array"), field.name, field.children.map(f => f.map(normalizeType(_))), field.parent, field.storeType, field.schemaOrdinal, - field.precision, field.scale, field.rawSchema) + field.precision, field.scale, field.rawSchema, field.columnComment) case "struct" => Field(field.column, Some("Struct"), field.name, field.children.map(f => f.map(normalizeType(_))), field.parent, field.storeType, field.schemaOrdinal, - field.precision, field.scale, field.rawSchema) + field.precision, field.scale, field.rawSchema, field.columnComment) case "bigint" => Field(field.column, Some("BigInt"), field.name, Some(null), field.parent, - field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema - ) + field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema, + field.columnComment) case "decimal" => Field(field.column, Some("Decimal"), field.name, Some(null), field.parent, - field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema - ) + field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema, + field.columnComment) // checking if the nested data type contains the child type as decimal(10,0), // if it is present then extracting the precision and scale. resetting the data type // with Decimal. @@ -1098,7 +1098,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { field.parent, field.storeType, field.schemaOrdinal, precision, scale, - field.rawSchema + field.rawSchema, + field.columnComment ) case _ => field @@ -1109,10 +1110,12 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { field.dataType.getOrElse("NIL") match { case "Array" => Field(field.column, Some("Array"), field.name, field.children.map(f => f.map(appendParentForEachChild(_, field.column))), field.parent, - field.storeType, field.schemaOrdinal, rawSchema = field.rawSchema) + field.storeType, field.schemaOrdinal, rawSchema = field.rawSchema, + columnComment = field.columnComment) case "Struct" => Field(field.column, Some("Struct"), field.name, field.children.map(f => f.map(appendParentForEachChild(_, field.column))), field.parent, - field.storeType, field.schemaOrdinal, rawSchema = field.rawSchema) + field.storeType, field.schemaOrdinal, rawSchema = field.rawSchema, + columnComment = field.columnComment) case _ => field } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index 37663ea..6a109f4 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -67,7 +67,8 @@ case class Field(column: String, var dataType: Option[String], name: Option[Stri children: Option[List[Field]], parent: String = null, storeType: Option[String] = Some("columnar"), var schemaOrdinal: Int = -1, - var precision: Int = 0, var scale: Int = 0, var rawSchema: String = "") { + var precision: Int = 0, var scale: Int = 0, var rawSchema: String = "", + var columnComment: String = "") { override def equals(o: Any) : Boolean = o match { case that: Field => that.column.equalsIgnoreCase(this.column) @@ -320,13 +321,8 @@ class AlterTableColumnSchemaGenerator( // TODO: move this to carbon store API object TableNewProcessor { def apply( - cm: TableModel, - identifier: AbsoluteTableIdentifier): TableInfo = { - new TableNewProcessor( - cm, - identifier.getDatabaseName, - identifier.getTableName, - identifier.getTablePath).process + cm: TableModel): TableInfo = { + new TableNewProcessor(cm).process } def createColumnSchema( @@ -373,7 +369,7 @@ object TableNewProcessor { } } -class TableNewProcessor(cm: TableModel, dbName: String, tableName: String, tablePath: String) { +class TableNewProcessor(cm: TableModel) { def getAllChildren(fieldChildren: Option[List[Field]]): Seq[ColumnSchema] = { var allColumns: Seq[ColumnSchema] = Seq[ColumnSchema]() @@ -445,6 +441,14 @@ class TableNewProcessor(cm: TableModel, dbName: String, tableName: String, table } // TODO: Need to fill RowGroupID, converted type // & Number of Children after DDL finalization + if (field.columnComment.nonEmpty) { + var columnProperties = columnSchema.getColumnProperties + if (columnProperties == null) { + columnProperties = new util.HashMap[String, String]() + columnSchema.setColumnProperties(columnProperties) + } + columnProperties.put(CarbonCommonConstants.COLUMN_COMMENT, field.columnComment) + } columnSchema } @@ -622,12 +626,12 @@ class TableNewProcessor(cm: TableModel, dbName: String, tableName: String, table partitionInfo.setColumnSchemaList(partitionCols) tableSchema.setPartitionInfo(partitionInfo) } - tableSchema.setTableName(tableName) + tableSchema.setTableName(cm.tableName) tableSchema.setListOfColumns(allColumns.asJava) tableSchema.setSchemaEvalution(schemaEvol) - tableInfo.setTablePath(tablePath) - tableInfo.setDatabaseName(dbName) - tableInfo.setTableUniqueName(CarbonTable.buildUniqueName(dbName, tableName)) + tableInfo.setDatabaseName(cm.databaseNameOp.getOrElse(null)) + tableInfo.setTableUniqueName(CarbonTable.buildUniqueName(cm.databaseNameOp.getOrElse(null), + cm.tableName)) tableInfo.setLastUpdatedTime(System.currentTimeMillis()) tableInfo.setFactTable(tableSchema) tableInfo http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala index 47f5344..7cc3d11 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala @@ -21,8 +21,10 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.hive.{CarbonMetaData, CarbonRelation, DictionaryMap} +import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn case class TransformHolder(rdd: Any, mataData: CarbonMetaData) @@ -51,4 +53,49 @@ object CarbonSparkUtil { table) } + /** + * return's the formatted column comment if column comment is present else empty("") + * + * @param carbonColumn + * @return + */ + def getColumnComment(carbonColumn: CarbonColumn): String = { + { + val columnProperties = carbonColumn.getColumnProperties + if (columnProperties != null) { + val comment: String = columnProperties.get(CarbonCommonConstants.COLUMN_COMMENT) + if (comment != null && comment != null) { + return " comment \"" + comment + "\"" + } + } + "" + } + } + + /** + * the method return's raw schema + * + * @param carbonRelation + * @return + */ + def getRawSchema(carbonRelation: CarbonRelation): String = { + val fields = new Array[String]( + carbonRelation.dimensionsAttr.size + carbonRelation.measureAttr.size) + val carbonTable = carbonRelation.carbonTable + carbonRelation.dimensionsAttr.foreach(attr => { + val carbonDimension = carbonTable.getDimensionByName(carbonRelation.tableName, attr.name) + val carbonColumn = carbonTable.getColumnByName(carbonRelation.tableName, attr.name) + val columnComment = getColumnComment(carbonColumn) + fields(carbonDimension.getSchemaOrdinal) = + '`' + attr.name + '`' + ' ' + attr.dataType.catalogString + columnComment + }) + carbonRelation.measureAttr.foreach(msrAtrr => { + val carbonMeasure = carbonTable.getMeasureByName(carbonRelation.tableName, msrAtrr.name) + val carbonColumn = carbonTable.getColumnByName(carbonRelation.tableName, msrAtrr.name) + val columnComment = getColumnComment(carbonColumn) + fields(carbonMeasure.getSchemaOrdinal) = + '`' + msrAtrr.name + '`' + ' ' + msrAtrr.dataType.catalogString + columnComment + }) + fields.mkString(",") + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index 7fb146c..e61b636 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -308,7 +308,9 @@ object CarbonSource { metaStore: CarbonMetaStore, properties: Map[String, String]): Map[String, String] = { val model = createTableInfoFromParams(properties, dataSchema, identifier) - val tableInfo: TableInfo = TableNewProcessor(model, identifier) + val tableInfo: TableInfo = TableNewProcessor(model) + tableInfo.setTablePath(identifier.getTablePath) + tableInfo.setDatabaseName(identifier.getDatabaseName) val schemaEvolutionEntry = new SchemaEvolutionEntry schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime) tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry) http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala new file mode 100644 index 0000000..45ed298 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.command.MetadataCommand +import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand +import org.apache.spark.sql.util.CarbonException + +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, TableInfo} +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.events.{OperationContext, OperationListenerBus, RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent} +import org.apache.carbondata.hadoop.util.SchemaReader + +/** + * Command to register carbon table from existing carbon table data + */ +case class RefreshCarbonTableCommand( + databaseNameOp: Option[String], + tableName: String) + extends MetadataCommand { + val LOGGER: LogService = + LogServiceFactory.getLogService(this.getClass.getName) + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val databaseName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) + // Steps + // 1. get table path + // 2. perform the below steps + // 2.1 check if the table already register with hive then ignore and continue with the next + // schema + // 2.2 register the table with the hive check if the table being registered has aggregate table + // then do the below steps + // 2.2.1 validate that all the aggregate tables are copied at the store location. + // 2.2.2 Register the aggregate tables + val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName)(sparkSession) + val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName) + // 2.1 check if the table already register with hive then ignore and continue with the next + // schema + if (!sparkSession.sessionState.catalog.listTables(databaseName) + .exists(_.table.equalsIgnoreCase(tableName))) { + val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier) + // check the existence of the schema file to know its a carbon table + val schemaFilePath = carbonTablePath.getSchemaFilePath + // if schema file does not exist then the table will either non carbon table or stale + // carbon table + if (FileFactory.isFileExist(schemaFilePath, FileFactory.getFileType(schemaFilePath))) { + // read TableInfo + val tableInfo = SchemaReader.getTableInfo(absoluteTableIdentifier) + // 2.2 register the table with the hive check if the table being registered has + // aggregate table then do the below steps + // 2.2.1 validate that all the aggregate tables are copied at the store location. + val dataMapSchemaList = tableInfo.getDataMapSchemaList + if (null != dataMapSchemaList && dataMapSchemaList.size() != 0) { + // validate all the aggregate tables are copied at the storeLocation + val allExists = validateAllAggregateTablePresent(databaseName, + dataMapSchemaList, sparkSession) + if (!allExists) { + // fail the register operation + val msg = s"Table registration with Database name [$databaseName] and Table name " + + s"[$tableName] failed. All the aggregate Tables for table [$tableName] is" + + s" not copied under database [$databaseName]" + LOGGER.audit(msg) + CarbonException.analysisException(msg) + } + // 2.2.1 Register the aggregate tables to hive + registerAggregates(databaseName, dataMapSchemaList)(sparkSession) + } + registerTableWithHive(databaseName, tableName, tableInfo)(sparkSession) + } else { + LOGGER.audit( + s"Table registration with Database name [$databaseName] and Table name [$tableName] " + + s"failed." + + s"Table [$tableName] either non carbon table or stale carbon table under database " + + s"[$databaseName]") + } + } else { + LOGGER.audit( + s"Table registration with Database name [$databaseName] and Table name [$tableName] " + + s"failed." + + s"Table [$tableName] either already exists or registered under database [$databaseName]") + } + // update the schema modified time + metaStore.updateAndTouchSchemasUpdatedTime() + Seq.empty + } + + /** + * the method prepare the data type for raw column + * + * @param column + * @return + */ + def prepareDataType(column: ColumnSchema): String = { + column.getDataType.getName.toLowerCase() match { + case "decimal" => + "decimal(" + column.getPrecision + "," + column.getScale + ")" + case others => + others + } + } + + /** + * The method register the carbon table with hive + * + * @param dbName + * @param tableName + * @param tableInfo + * @param sparkSession + * @return + */ + def registerTableWithHive(dbName: String, + tableName: String, + tableInfo: TableInfo)(sparkSession: SparkSession): Any = { + val operationContext = new OperationContext + try { + val refreshTablePreExecutionEvent: RefreshTablePreExecutionEvent = + new RefreshTablePreExecutionEvent(sparkSession, + tableInfo.getOrCreateAbsoluteTableIdentifier()) + OperationListenerBus.getInstance.fireEvent(refreshTablePreExecutionEvent, operationContext) + CarbonCreateTableCommand(tableInfo, ifNotExistsSet = false).run(sparkSession) + LOGGER.audit(s"Table registration with Database name [$dbName] and Table name " + + s"[$tableName] is successful.") + } catch { + case e: AnalysisException => throw e + case e: Exception => + throw e + } + val refreshTablePostExecutionEvent: RefreshTablePostExecutionEvent = + new RefreshTablePostExecutionEvent(sparkSession, + tableInfo.getOrCreateAbsoluteTableIdentifier()) + OperationListenerBus.getInstance.fireEvent(refreshTablePostExecutionEvent, operationContext) + } + + /** + * The method validate that all the aggregate table are physically present + * + * @param dataMapSchemaList + * @param sparkSession + */ + def validateAllAggregateTablePresent(dbName: String, dataMapSchemaList: util.List[DataMapSchema], + sparkSession: SparkSession): Boolean = { + var fileExist = false + dataMapSchemaList.asScala.foreach(dataMap => { + val tableName = dataMap.getChildSchema.getTableName + val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, + new CarbonTableIdentifier(dbName, tableName, dataMap.getChildSchema.getTableId)) + val schemaFilePath = carbonTablePath.getSchemaFilePath + try { + fileExist = FileFactory.isFileExist(schemaFilePath, FileFactory.getFileType(schemaFilePath)) + } catch { + case e: Exception => + fileExist = false + } + if (!fileExist) { + return fileExist; + } + }) + return true + } + + /** + * The method iterates over all the aggregate tables and register them to hive + * + * @param dataMapSchemaList + * @return + */ + def registerAggregates(dbName: String, + dataMapSchemaList: util.List[DataMapSchema])(sparkSession: SparkSession): Any = { + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore + dataMapSchemaList.asScala.foreach(dataMap => { + val tableName = dataMap.getChildSchema.getTableName + if (!sparkSession.sessionState.catalog.listTables(dbName) + .exists(_.table.equalsIgnoreCase(tableName))) { + val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession) + val absoluteTableIdentifier = AbsoluteTableIdentifier + .from(tablePath, dbName, tableName) + val tableInfo = SchemaReader.getTableInfo(absoluteTableIdentifier) + registerTableWithHive(dbName, tableName, tableInfo)(sparkSession) + } + }) + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala index 1c23d3a..9a84450 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.statusmanager.SegmentStatusManager /** @@ -89,7 +90,8 @@ case class CreatePreAggregateTableCommand( tableModel.dataMapRelation = Some(fieldRelationMap) val tablePath = CarbonEnv.getTablePath(tableModel.databaseNameOp, tableModel.tableName)(sparkSession) - CarbonCreateTableCommand(tableModel, Some(tablePath)).run(sparkSession) + CarbonCreateTableCommand(TableNewProcessor(tableModel), + tableModel.ifNotExistsSet, Some(tablePath)).run(sparkSession) val table = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession) val tableInfo = table.getTableInfo http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala index 3ab221c..78b9634 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala @@ -29,25 +29,34 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.exception.InvalidConfigurationException import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.core.metadata.schema.table.TableInfo +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.events.{CreateTablePostExecutionEvent, CreateTablePreExecutionEvent, OperationContext, OperationListenerBus} +import org.apache.carbondata.spark.util.CarbonSparkUtil case class CarbonCreateTableCommand( - cm: TableModel, + tableInfo: TableInfo, + ifNotExistsSet: Boolean = false, tableLocation: Option[String] = None, createDSTable: Boolean = true) extends MetadataCommand { override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - val tableName = cm.tableName - val dbName = CarbonEnv.getDatabaseName(cm.databaseNameOp)(sparkSession) + val tableName = tableInfo.getFactTable.getTableName + var databaseOpt : Option[String] = None + if(tableInfo.getDatabaseName != null) { + databaseOpt = Some(tableInfo.getDatabaseName) + } + val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession) + // set dbName and tableUnique Name in the table info + tableInfo.setDatabaseName(dbName) + tableInfo.setTableUniqueName(CarbonTable.buildUniqueName(dbName, tableName)) LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tableName]") if (sparkSession.sessionState.catalog.listTables(dbName) .exists(_.table.equalsIgnoreCase(tableName))) { - if (!cm.ifNotExistsSet) { + if (!ifNotExistsSet) { LOGGER.audit( s"Table creation with Database name [$dbName] and Table name [$tableName] failed. " + s"Table [$tableName] already exists under database [$dbName]") @@ -56,9 +65,9 @@ case class CarbonCreateTableCommand( } val tablePath = tableLocation.getOrElse( - CarbonEnv.getTablePath(cm.databaseNameOp, tableName)(sparkSession)) + CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession)) + tableInfo.setTablePath(tablePath) val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName) - val tableInfo: TableInfo = TableNewProcessor(cm, tableIdentifier) // Add validation for sort scope when create table val sortScope = tableInfo.getFactTable.getTableProperties.asScala @@ -81,15 +90,13 @@ case class CarbonCreateTableCommand( val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tableIdentifier) if (createDSTable) { try { - val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size) - cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f) - cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f) - - sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null) val tablePath = tableIdentifier.getTablePath + val carbonRelation = CarbonSparkUtil.createCarbonRelation(tableInfo, tablePath) + val rawSchema = CarbonSparkUtil.getRawSchema(carbonRelation) + sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null) sparkSession.sql( s"""CREATE TABLE $dbName.$tableName - |(${ fields.map(f => f.rawSchema).mkString(",") }) + |(${ rawSchema }) |USING org.apache.spark.sql.CarbonSource |OPTIONS ( | tableName "$tableName", http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index 6c8cf44..9ae1979 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -22,15 +22,16 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonInsertIntoCommand, CarbonLoadDataCommand} +import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonInsertIntoCommand, CarbonLoadDataCommand, RefreshCarbonTableCommand} import org.apache.spark.sql.execution.command.partition.CarbonShowCarbonPartitionsCommand import org.apache.spark.sql.execution.command.schema._ import org.apache.spark.sql.execution.command.table.{CarbonDescribeFormattedCommand, CarbonDropTableCommand} import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand} import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTableCommand} +import org.apache.spark.sql.execution.datasources.RefreshTable import org.apache.spark.util.FileUtils -import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.processing.merger.CompactionType import org.apache.carbondata.spark.exception.MalformedCarbonCommandException @@ -40,7 +41,8 @@ import org.apache.carbondata.spark.exception.MalformedCarbonCommandException */ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { - + val LOGGER: LogService = + LogServiceFactory.getLogService(this.getClass.getName) def apply(plan: LogicalPlan): Seq[SparkPlan] = { plan match { case LoadDataCommand(identifier, path, isLocal, isOverwrite, partition) @@ -193,6 +195,10 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { ExecutedCommandExec( CarbonAlterTableUnsetCommand(tableName, propKeys, ifExists, isView)) :: Nil } + case RefreshTable(tableIdentifier) => + RefreshCarbonTableCommand(tableIdentifier.database, + tableIdentifier.table).run(sparkSession) + ExecutedCommandExec(RefreshTable(tableIdentifier)) :: Nil case _ => Nil } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala index 4b33c40..066cb1c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala @@ -237,6 +237,4 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { sparkSession, schemaConverter) } - - } http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala index 696342f..5a8e58b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala @@ -42,6 +42,7 @@ trait CarbonMetaStore { /** * Create spark session from paramters. + * * @param parameters * @param absIdentifier * @param sparkSession @@ -52,8 +53,8 @@ trait CarbonMetaStore { def tableExists( - table: String, - databaseOp: Option[String] = None)(sparkSession: SparkSession): Boolean + table: String, + databaseOp: Option[String] = None)(sparkSession: SparkSession): Boolean def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean @@ -101,6 +102,7 @@ trait CarbonMetaStore { def revertTableSchemaForPreAggCreationFailure(absoluteTableIdentifier: AbsoluteTableIdentifier, thriftTableInfo: org.apache.carbondata.format.TableInfo)(sparkSession: SparkSession): String + /** * Prepare Thrift Schema from wrapper TableInfo and write to disk */ @@ -108,6 +110,7 @@ trait CarbonMetaStore { /** * Generates schema string to save it in hive metastore + * * @param tableInfo * @return */ @@ -134,16 +137,14 @@ trait CarbonMetaStore { def checkSchemasModifiedTimeAndReloadTables() - def isReadFromHiveMetaStore : Boolean + def isReadFromHiveMetaStore: Boolean def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] def getThriftTableInfo(tablePath: CarbonTablePath)(sparkSession: SparkSession): TableInfo def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable] - } - /** * Factory for Carbon metastore */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 343db49..ffa2d32 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -522,8 +522,10 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { def getFields(schema: Seq[StructField]): Seq[Field] = { schema.map { col => var columnComment: String = "" + var plainComment: String = "" if (col.getComment().isDefined) { columnComment = " comment \"" + col.getComment().get + "\"" + plainComment = col.getComment().get } val x = if (col.dataType.catalogString == "float") { @@ -553,6 +555,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { f.dataType = Some("double") } f.rawSchema = x + f.columnComment = plainComment f } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala index 26529f9..55d784f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.parser.ParserUtils._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkSqlAstBuilder -import org.apache.spark.sql.execution.command.{PartitionerField, TableModel} +import org.apache.spark.sql.execution.command.{PartitionerField, TableModel, TableNewProcessor} import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} import org.apache.spark.sql.types.StructField @@ -33,7 +33,7 @@ import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.CarbonReflectionUtils import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.spark.CarbonOption import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.CommonUtil @@ -228,8 +228,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) bucketFields, isAlterFlow = false, tableComment) - - CarbonCreateTableCommand(tableModel, tablePath) + CarbonCreateTableCommand(TableNewProcessor(tableModel), tableModel.ifNotExistsSet, tablePath) } private def validateStreamingProperty(carbonOption: CarbonOption): Unit = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark2/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala new file mode 100644 index 0000000..e48aaba --- /dev/null +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.carbondata.register + +import java.io.{File, IOException} + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.sql.{AnalysisException, Row} +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} + +/** + * + */ +class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll { + + override def beforeAll { + sql("drop database if exists carbon cascade") + } + + def restoreData(dblocation: String, tableName: String) = { + val destination = dblocation + CarbonCommonConstants.FILE_SEPARATOR + tableName + val source = dblocation+ "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName + try { + FileUtils.copyDirectory(new File(source), new File(destination)) + FileUtils.deleteDirectory(new File(source)) + } catch { + case e : Exception => + throw new IOException("carbon table data restore failed.") + } finally { + + } + } + def backUpData(dblocation: String, tableName: String) = { + val source = dblocation + CarbonCommonConstants.FILE_SEPARATOR + tableName + val destination = dblocation+ "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName + try { + FileUtils.copyDirectory(new File(source), new File(destination)) + } catch { + case e : Exception => + throw new IOException("carbon table data backup failed.") + } + } + + test("register tables test") { + sql("drop database if exists carbon cascade") + sql(s"create database carbon location '$dblocation'") + sql("use carbon") + sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") + sql("insert into carbontable select 'a',1,'aa','aaa'") + backUpData(dblocation, "carbontable") + sql("drop table carbontable") + restoreData(dblocation, "carbontable") + sql("refresh table carbontable") + checkAnswer(sql("select count(*) from carbontable"), Row(1)) + checkAnswer(sql("select c1 from carbontable"), Seq(Row("a"))) + } + + test("register table test") { + sql("drop database if exists carbon cascade") + sql(s"create database carbon location '$dblocation'") + sql("use carbon") + sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") + sql("insert into carbontable select 'a',1,'aa','aaa'") + backUpData(dblocation, "carbontable") + sql("drop table carbontable") + restoreData(dblocation, "carbontable") + sql("refresh table carbontable") + checkAnswer(sql("select count(*) from carbontable"), Row(1)) + checkAnswer(sql("select c1 from carbontable"), Seq(Row("a"))) + } + + test("register pre aggregate tables test") { + sql("drop database if exists carbon cascade") + sql(s"create database carbon location '$dblocation'") + sql("use carbon") + sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") + sql("insert into carbontable select 'a',1,'aa','aaa'") + sql("insert into carbontable select 'b',1,'aa','aaa'") + sql("insert into carbontable select 'a',10,'aa','aaa'") + sql("create datamap preagg1 on table carbontable using 'preaggregate' as select c1,sum(c2) from carbontable group by c1") + backUpData(dblocation, "carbontable") + backUpData(dblocation, "carbontable_preagg1") + sql("drop table carbontable") + restoreData(dblocation, "carbontable") + restoreData(dblocation, "carbontable_preagg1") + sql("refresh table carbontable") + checkAnswer(sql("select count(*) from carbontable"), Row(3)) + checkAnswer(sql("select c1 from carbontable"), Seq(Row("a"), Row("b"), Row("a"))) + checkAnswer(sql("select count(*) from carbontable_preagg1"), Row(2)) + checkAnswer(sql("select carbontable_c1 from carbontable_preagg1"), Seq(Row("a"), Row("b"))) + } + + test("register pre aggregate table test") { + sql("drop database if exists carbon cascade") + sql(s"create database carbon location '$dblocation'") + sql("use carbon") + sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") + sql("insert into carbontable select 'a',1,'aa','aaa'") + sql("insert into carbontable select 'b',1,'aa','aaa'") + sql("insert into carbontable select 'a',10,'aa','aaa'") + sql("create datamap preagg1 on table carbontable using 'preaggregate' as select c1,sum(c2) from carbontable group by c1") + backUpData(dblocation, "carbontable") + backUpData(dblocation, "carbontable_preagg1") + sql("drop table carbontable") + restoreData(dblocation, "carbontable") + restoreData(dblocation, "carbontable_preagg1") + sql("refresh table carbontable") + checkAnswer(sql("select count(*) from carbontable"), Row(3)) + checkAnswer(sql("select c1 from carbontable"), Seq(Row("a"), Row("b"), Row("a"))) + checkAnswer(sql("select count(*) from carbontable_preagg1"), Row(2)) + checkAnswer(sql("select carbontable_c1 from carbontable_preagg1"), Seq(Row("a"), Row("b"))) + } + + test("register pre aggregate table should fail if the aggregate table not copied") { + sql("drop database if exists carbon cascade") + sql(s"create database carbon location '$dblocation'") + sql("use carbon") + sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") + sql("insert into carbontable select 'a',1,'aa','aaa'") + sql("insert into carbontable select 'b',1,'aa','aaa'") + sql("insert into carbontable select 'a',10,'aa','aaa'") + sql("create datamap preagg1 on table carbontable using 'preaggregate' as select c1,sum(c2) from carbontable group by c1") + backUpData(dblocation, "carbontable") + backUpData(dblocation, "carbontable_preagg1") + sql("drop table carbontable") + restoreData(dblocation, "carbontable") + try { + sql("refresh table carbontable") + assert(false) + } catch { + case e : AnalysisException => + assert(true) + } + restoreData(dblocation, "carbontable_preagg1") + } + + test("Update operation on carbon table should pass after registration or refresh") { + sql("drop database if exists carbon cascade") + sql(s"create database carbon location '$dblocation'") + sql("use carbon") + sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") + sql("insert into carbontable select 'a',1,'aa','aaa'") + sql("insert into carbontable select 'b',1,'bb','bbb'") + backUpData(dblocation, "carbontable") + sql("drop table carbontable") + restoreData(dblocation, "carbontable") + sql("refresh table carbontable") + // update operation + sql("""update carbon.carbontable d set (d.c2) = (d.c2 + 1) where d.c1 = 'a'""").show() + sql("""update carbon.carbontable d set (d.c2) = (d.c2 + 1) where d.c1 = 'b'""").show() + checkAnswer( + sql("""select c1,c2,c3,c5 from carbon.carbontable"""), + Seq(Row("a",2,"aa","aaa"),Row("b",2,"bb","bbb")) + ) + } + + test("Update operation on carbon table") { + sql("drop database if exists carbon cascade") + sql(s"create database carbon location '$dblocation'") + sql("use carbon") + sql( + """ + CREATE TABLE automerge(id int, name string, city string, age int) + STORED BY 'org.apache.carbondata.format' + """) + val testData = s"$resourcesPath/sample.csv" + sql(s"LOAD DATA LOCAL INPATH '$testData' into table automerge") + backUpData(dblocation, "automerge") + sql("drop table automerge") + restoreData(dblocation, "automerge") + sql("refresh table automerge") + // update operation + sql("""update carbon.automerge d set (d.id) = (d.id + 1) where d.id > 2""").show() + checkAnswer( + sql("select count(*) from automerge"), + Seq(Row(6)) + ) + // sql("drop table carbontable") + } + + test("Delete operation on carbon table") { + sql("drop database if exists carbon cascade") + sql(s"create database carbon location '$dblocation'") + sql("use carbon") + sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") + sql("insert into carbontable select 'a',1,'aa','aaa'") + sql("insert into carbontable select 'b',1,'bb','bbb'") + backUpData(dblocation, "carbontable") + sql("drop table carbontable") + restoreData(dblocation, "carbontable") + sql("refresh table carbontable") + // delete operation + sql("""delete from carbontable where c3 = 'aa'""").show + checkAnswer( + sql("""select c1,c2,c3,c5 from carbon.carbontable"""), + Seq(Row("b",1,"bb","bbb")) + ) + sql("drop table carbontable") + } + + test("Alter table add column test") { + sql("drop database if exists carbon cascade") + sql(s"create database carbon location '$dblocation'") + sql("use carbon") + sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") + sql("insert into carbontable select 'a',1,'aa','aaa'") + sql("insert into carbontable select 'b',1,'bb','bbb'") + backUpData(dblocation, "carbontable") + sql("drop table carbontable") + restoreData(dblocation, "carbontable") + sql("refresh table carbontable") + sql("Alter table carbontable add columns(c4 string) " + + "TBLPROPERTIES('DICTIONARY_EXCLUDE'='c4', 'DEFAULT.VALUE.c4'='def')") + checkAnswer( + sql("""select c1,c2,c3,c5,c4 from carbon.carbontable"""), + Seq(Row("a",1,"aa","aaa","def"), Row("b",1,"bb","bbb","def")) + ) + sql("drop table carbontable") + } + + test("Alter table change column datatype test") { + sql("drop database if exists carbon cascade") + sql(s"create database carbon location '$dblocation'") + sql("use carbon") + sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") + sql("insert into carbontable select 'a',1,'aa','aaa'") + sql("insert into carbontable select 'b',1,'bb','bbb'") + backUpData(dblocation, "carbontable") + sql("drop table carbontable") + restoreData(dblocation, "carbontable") + sql("refresh table carbontable") + sql("Alter table carbontable change c2 c2 long") + checkAnswer( + sql("""select c1,c2,c3,c5 from carbon.carbontable"""), + Seq(Row("a",1,"aa","aaa"), Row("b",1,"bb","bbb")) + ) + sql("drop table carbontable") + } + + test("Alter table drop column test") { + sql("drop database if exists carbon cascade") + sql(s"create database carbon location '$dblocation'") + sql("use carbon") + sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") + sql("insert into carbontable select 'a',1,'aa','aaa'") + sql("insert into carbontable select 'b',1,'bb','bbb'") + backUpData(dblocation, "carbontable") + sql("drop table carbontable") + restoreData(dblocation, "carbontable") + sql("refresh table carbontable") + sql("Alter table carbontable drop columns(c2)") + checkAnswer( + sql("""select * from carbon.carbontable"""), + Seq(Row("a","aa","aaa"), Row("b","bb","bbb")) + ) + sql("drop table carbontable") + } + + override def afterAll { + sql("use default") + sql("drop database if exists carbon cascade") + } +}
