This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 9e423e9 [CARBONDATA-3518] Support create table like command
9e423e9 is described below
commit 9e423e9a46d443ef82f945c707319ba533eb3cb7
Author: Manhua <[email protected]>
AuthorDate: Wed Sep 11 17:43:58 2019 +0800
[CARBONDATA-3518] Support create table like command
Support CREATE TABLE ... LIKE SQL syntax for carbon table
This closes #3382
---
.../core/metadata/schema/table/TableSchema.java | 31 ++-
.../createTable/TestCreateTableIfNotExists.scala | 8 -
.../createTable/TestCreateTableLike.scala | 211 +++++++++++++++++++++
.../table/CarbonCreateTableLikeCommand.scala | 76 ++++++++
.../spark/sql/execution/strategy/DDLStrategy.scala | 8 +-
5 files changed, 314 insertions(+), 20 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
index 61b2987..8e96aab 100644
---
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
+++
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
@@ -16,11 +16,7 @@
*/
package org.apache.carbondata.core.metadata.schema.table;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
+import java.io.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -40,7 +36,7 @@ import com.google.gson.annotations.SerializedName;
/**
* Persisting the table information
*/
-public class TableSchema implements Serializable, Writable {
+public class TableSchema implements Serializable, Writable, Cloneable {
/**
* serialization version
@@ -71,7 +67,7 @@ public class TableSchema implements Serializable, Writable {
private SchemaEvolution schemaEvolution;
/**
- * contains all key value pairs for table properties set by user in craete
DDL
+ * contains all key value pairs for table properties set by user in create
DDL
*/
private Map<String, String> tableProperties;
@@ -311,4 +307,25 @@ public class TableSchema implements Serializable, Writable
{
return new TableSchemaBuilder();
}
+ /**
+ * make a deep copy of TableSchema
+ */
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ this.write(dos);
+ dos.close();
+ bos.close();
+
+ DataInputStream dis = new DataInputStream(new
ByteArrayInputStream(bos.toByteArray()));
+ TableSchema tableSchema = (TableSchema) super.clone();
+ tableSchema.readFields(dis);
+ return tableSchema;
+ } catch (IOException e) {
+ throw new RuntimeException("Error occur while cloning TableSchema", e);
+ }
+ }
+
}
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableIfNotExists.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableIfNotExists.scala
index 35238dc..be61269 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableIfNotExists.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableIfNotExists.scala
@@ -45,14 +45,6 @@ class TestCreateTableIfNotExists extends QueryTest with
BeforeAndAfterAll {
}
}
- test("test blocking of create table like command") {
- sql("create table sourceTable(name string) stored by 'carbondata'")
- val exception = intercept[MalformedCarbonCommandException] {
- sql("create table targetTable like sourceTable")
- }
- assert(exception.getMessage.contains("Operation not allowed, when source
table is carbon table"))
- }
-
test("test create table if not exist concurrently") {
val executorService: ExecutorService = Executors.newFixedThreadPool(10)
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableLike.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableLike.scala
new file mode 100644
index 0000000..29df400
--- /dev/null
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableLike.scala
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import java.util
+
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+class TestCreateTableLike extends QueryTest with BeforeAndAfterEach with
BeforeAndAfterAll{
+
+ override protected def beforeAll(): Unit = {
+ sql("drop table if exists pt_tbl")
+ sql("drop table if exists hive_pt")
+ sql("drop table if exists bkt_tbl")
+ sql("drop table if exists stream_tbl")
+ sql("drop table if exists sourceTable")
+ sql(
+ """create table sourceTable
+ |(a int, b string)
+ |stored by 'carbondata'
+ |TBLPROPERTIES(
+ | 'SORT_COLUMNS'='b',
+ | 'SORT_SCOPE'='GLOBAL_SORT',
+ | 'DICTIONARY_INCLUDE'='b',
+ | 'LOCAL_DICTIONARY_ENABLE'='false',
+ | 'carbon.column.compress'='zstd',
+ | 'CACHE_LEVEL'='blocklet',
+ | 'COLUMN_META_CACHE'='a',
+ | 'TABLE_BLOCKSIZE'='256',
+ | 'TABLE_BLOCKLET_SIZE'='16')
+ | """.stripMargin)
+ sql("insert into sourceTable values(5,'bb'),(6,'cc')")
+ }
+
+ override protected def afterAll(): Unit = {
+ sql("drop table if exists pt_tbl")
+ sql("drop table if exists hive_pt")
+ sql("drop table if exists bkt_tbl")
+ sql("drop table if exists stream_tbl")
+ sql("drop table if exists sourceTable")
+ }
+
+ override protected def beforeEach(): Unit = {
+ sql("drop table if exists targetTable")
+ }
+
+ override protected def afterEach(): Unit = {
+ sql("drop table if exists targetTable")
+ }
+
+ def checkColumns(left: util.List[ColumnSchema], right:
util.List[ColumnSchema]): Boolean = {
+ if (left.size != right.size) {
+ false
+ } else {
+ for(i <- 0 until left.size()) {
+ if (!left.get(i).equals(right.get(i))) {
+ return false
+ }
+ }
+ true
+ }
+ }
+
+ def checkTableProperties(src: TableIdentifier, dst: TableIdentifier): Unit =
{
+ val info_src =
CarbonEnv.getCarbonTable(src)(sqlContext.sparkSession).getTableInfo
+ val info_dst =
CarbonEnv.getCarbonTable(dst)(sqlContext.sparkSession).getTableInfo
+
+ val fact_src = info_src.getFactTable
+ val fact_dst = info_dst.getFactTable
+
+ // check column schemas same
+ assert(checkColumns(fact_src.getListOfColumns, fact_dst.getListOfColumns))
+
+ // check table properties same
+ assert(fact_src.getTableProperties.equals(fact_dst.getTableProperties))
+
+ // check transaction same
+ assert(!(info_src.isTransactionalTable ^ info_dst.isTransactionalTable))
+
+ // check bucket info same
+ if (null == fact_src.getBucketingInfo) {
+ assert(null == fact_dst.getBucketingInfo)
+ } else {
+ assert(null != fact_dst.getBucketingInfo)
+ assert(fact_src.getBucketingInfo.getNumOfRanges ==
fact_dst.getBucketingInfo.getNumOfRanges)
+
assert(checkColumns(fact_src.getBucketingInfo.getListOfColumns,fact_dst.getBucketingInfo.getListOfColumns))
+ }
+
+ // check partition info same
+ if (null == fact_src.getPartitionInfo) {
+ assert(null == fact_dst.getPartitionInfo)
+ } else {
+ assert(null != fact_dst.getPartitionInfo)
+ assert(fact_src.getPartitionInfo.getPartitionType ==
fact_dst.getPartitionInfo.getPartitionType)
+ assert(checkColumns(fact_src.getPartitionInfo.getColumnSchemaList,
fact_dst.getPartitionInfo.getColumnSchemaList))
+ }
+
+ // check different id
+ assert(!info_src.getTableUniqueName.equals(info_dst.getTableUniqueName))
+
assert(!info_src.getOrCreateAbsoluteTableIdentifier().getTablePath.equals(info_dst.getOrCreateAbsoluteTableIdentifier.getTablePath))
+
assert(!info_src.getFactTable.getTableId.equals(info_dst.getFactTable.getTableId))
+
assert(!info_src.getFactTable.getTableName.equals(info_dst.getFactTable.getTableName))
+ }
+
+ test("create table like simple table") {
+ sql("create table targetTable like sourceTable")
+ checkTableProperties(TableIdentifier("sourceTable"),
TableIdentifier("targetTable"))
+ }
+
+ test("test same table name") {
+ val exception = intercept[TableAlreadyExistsException] {
+ sql("create table sourceTable like sourceTable")
+ }
+ assert(exception.getMessage.contains("already exists in database"))
+ }
+
+ test("command with location") {
+ sql(s"create table targetTable like sourceTable location
'$warehouse/tbl_with_loc' ")
+ checkTableProperties(TableIdentifier("sourceTable"),
TableIdentifier("targetTable"))
+ }
+
+ test("table with datamap") {
+ // datamap relation does not store in parent table
+ sql(
+ s"""
+ | CREATE DATAMAP dm1 ON TABLE sourceTable
+ | USING 'bloomfilter'
+ | DMProperties('INDEX_COLUMNS'='b', 'BLOOM_SIZE'='32000')
+ """.stripMargin)
+ sql("create table targetTable like sourceTable")
+ checkTableProperties(TableIdentifier("sourceTable"),
TableIdentifier("targetTable"))
+ }
+
+ test("table with partition") {
+ sql("""
+ | CREATE TABLE IF NOT EXISTS pt_tbl(
+ | a int, b string
+ | )
+ | PARTITIONED BY (area string)
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('PARTITION_TYPE'='LIST',
+ | 'LIST_INFO'='Asia, America, Europe')
+ """.stripMargin)
+ sql("create table targetTable like pt_tbl")
+ checkTableProperties(TableIdentifier("pt_tbl"),
TableIdentifier("targetTable"))
+ }
+
+ test("table with hive partition") {
+ sql(
+ """
+ | CREATE TABLE hive_pt (
+ | a int, b string)
+ | PARTITIONED BY (id int)
+ | STORED BY 'carbondata'
+ """.stripMargin)
+ sql("create table targetTable like hive_pt")
+ checkTableProperties(TableIdentifier("hive_pt"),
TableIdentifier("targetTable"))
+ }
+
+ test("table with bucket") {
+ sql("""
+ | CREATE TABLE IF NOT EXISTS bkt_tbl (
+ | a int, b string
+ | ) STORED BY 'carbondata'
+ | TBLPROPERTIES ('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='b')
+ | """.stripMargin)
+
+ sql("create table targetTable like bkt_tbl")
+ checkTableProperties(TableIdentifier("bkt_tbl"),
TableIdentifier("targetTable"))
+ }
+
+ test("table with streaming") {
+ sql("""
+ | CREATE TABLE IF NOT EXISTS stream_tbl (
+ | a int, b string
+ | ) STORED BY 'carbondata'
+ | TBLPROPERTIES ('streaming' = 'true')
+ | """.stripMargin)
+
+ sql("create table targetTable like stream_tbl")
+ checkTableProperties(TableIdentifier("stream_tbl"),
TableIdentifier("targetTable"))
+ }
+
+ test("table with schema changed") {
+ sql("ALTER TABLE sourceTable ADD COLUMNS(charField STRING) " +
+ "TBLPROPERTIES ('DEFAULT.VALUE.charfield'='def')")
+ sql("create table targetTable like sourceTable")
+ checkTableProperties(TableIdentifier("sourceTable"),
TableIdentifier("targetTable"))
+ }
+
+}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableLikeCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableLikeCommand.scala
new file mode 100644
index 0000000..4e8911b
--- /dev/null
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableLikeCommand.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.table
+
+import java.util
+import java.util.UUID
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.MetadataCommand
+
+import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.{SchemaEvolution,
SchemaEvolutionEntry}
+import org.apache.carbondata.core.metadata.schema.table.{TableInfo,
TableSchema}
+
+/**
+* The syntax of using this command in SQL is:
+* {{{
+* CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
+* LIKE [other_db_name.]existing_table_name
+* }}}
+*/
+case class CarbonCreateTableLikeCommand(
+ sourceTable: TableIdentifier,
+ targetTable: TableIdentifier,
+ ifNotExists: Boolean = false) extends MetadataCommand {
+
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+ override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+ val srcTable = CarbonEnv.getCarbonTable(sourceTable.database,
sourceTable.table)(sparkSession)
+ if (!srcTable.isTransactionalTable) {
+ throw new MalformedCarbonCommandException("Unsupported operation on non
transactional table")
+ }
+ if (srcTable.isChildTable || srcTable.isChildDataMap) {
+ throw new MalformedCarbonCommandException("Unsupported operation on
child table or datamap")
+ }
+
+ // copy schema of source table and update fields to target table
+ val dstTableSchema =
srcTable.getTableInfo.getFactTable.clone().asInstanceOf[TableSchema]
+ dstTableSchema.setTableName(targetTable.table)
+ dstTableSchema.setTableId(UUID.randomUUID().toString)
+
+ val schemaEvol: SchemaEvolution = new SchemaEvolution
+ val schEntryList: util.List[SchemaEvolutionEntry] = new
util.ArrayList[SchemaEvolutionEntry]
+ schemaEvol.setSchemaEvolutionEntryList(schEntryList)
+ dstTableSchema.setSchemaEvolution(schemaEvol)
+
+ // build table info for creating table
+ val dstTableInfo = new TableInfo
+ val dstDB =
targetTable.database.getOrElse(sparkSession.catalog.currentDatabase)
+ dstTableInfo.setDatabaseName(dstDB)
+ dstTableInfo.setLastUpdatedTime(System.currentTimeMillis())
+ dstTableInfo.setFactTable(dstTableSchema)
+
+ CarbonCreateTableCommand(dstTableInfo, ifNotExists).run(sparkSession)
+ }
+
+ override protected def opName: String = "CREATE TABLE LIKE"
+}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 3ef8cfa..37a33ec 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.command._
import
org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand,
CarbonInsertIntoCommand, CarbonLoadDataCommand, RefreshCarbonTableCommand}
import
org.apache.spark.sql.execution.command.partition.{CarbonAlterTableAddHivePartitionCommand,
CarbonAlterTableDropHivePartitionCommand, CarbonShowCarbonPartitionsCommand}
import org.apache.spark.sql.execution.command.schema._
-import
org.apache.spark.sql.execution.command.table.{CarbonDescribeFormattedCommand,
CarbonDropTableCommand}
+import
org.apache.spark.sql.execution.command.table.{CarbonCreateTableLikeCommand,
CarbonDescribeFormattedCommand, CarbonDropTableCommand}
import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand,
CarbonResetCommand, CarbonSetCommand}
import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable =>
DescribeTableCommand}
import org.apache.spark.sql.execution.datasources.{RefreshResource,
RefreshTable}
@@ -37,8 +37,6 @@ import org.apache.spark.util.{CarbonReflectionUtils,
DataMapUtil, FileUtils, Spa
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil,
ThreadLocalSessionInfo}
import org.apache.carbondata.spark.util.Util
@@ -97,8 +95,8 @@ class DDLStrategy(sparkSession: SparkSession) extends
SparkStrategy {
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetaStore
.tableExists(createLikeTable.sourceTable)(sparkSession)
if (isCarbonTable) {
- throw new MalformedCarbonCommandException(
- "Operation not allowed, when source table is carbon table")
+
ExecutedCommandExec(CarbonCreateTableLikeCommand(createLikeTable.sourceTable,
+ createLikeTable.targetTable, createLikeTable.ifNotExists)) :: Nil
} else {
ExecutedCommandExec(createLikeTable) :: Nil
}