This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e423e9  [CARBONDATA-3518] Support create table like command
9e423e9 is described below

commit 9e423e9a46d443ef82f945c707319ba533eb3cb7
Author: Manhua <[email protected]>
AuthorDate: Wed Sep 11 17:43:58 2019 +0800

    [CARBONDATA-3518] Support create table like command
    
    Support CREATE TABLE ... LIKE SQL syntax for carbon table
    
    This closes #3382
---
 .../core/metadata/schema/table/TableSchema.java    |  31 ++-
 .../createTable/TestCreateTableIfNotExists.scala   |   8 -
 .../createTable/TestCreateTableLike.scala          | 211 +++++++++++++++++++++
 .../table/CarbonCreateTableLikeCommand.scala       |  76 ++++++++
 .../spark/sql/execution/strategy/DDLStrategy.scala |   8 +-
 5 files changed, 314 insertions(+), 20 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
index 61b2987..8e96aab 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
@@ -16,11 +16,7 @@
  */
 package org.apache.carbondata.core.metadata.schema.table;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
+import java.io.*;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -40,7 +36,7 @@ import com.google.gson.annotations.SerializedName;
 /**
  * Persisting the table information
  */
-public class TableSchema implements Serializable, Writable {
+public class TableSchema implements Serializable, Writable, Cloneable {
 
   /**
    * serialization version
@@ -71,7 +67,7 @@ public class TableSchema implements Serializable, Writable {
   private SchemaEvolution schemaEvolution;
 
   /**
-   * contains all key value pairs for table properties set by user in craete 
DDL
+   * contains all key value pairs for table properties set by user in create 
DDL
    */
   private Map<String, String> tableProperties;
 
@@ -311,4 +307,25 @@ public class TableSchema implements Serializable, Writable 
{
     return new TableSchemaBuilder();
   }
 
+  /**
+   * make a deep copy of TableSchema
+   */
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    try {
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      DataOutputStream dos = new DataOutputStream(bos);
+      this.write(dos);
+      dos.close();
+      bos.close();
+
+      DataInputStream dis = new DataInputStream(new 
ByteArrayInputStream(bos.toByteArray()));
+      TableSchema tableSchema = (TableSchema) super.clone();
+      tableSchema.readFields(dis);
+      return tableSchema;
+    } catch (IOException e) {
+      throw new RuntimeException("Error occur while cloning TableSchema", e);
+    }
+  }
+
 }
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableIfNotExists.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableIfNotExists.scala
index 35238dc..be61269 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableIfNotExists.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableIfNotExists.scala
@@ -45,14 +45,6 @@ class TestCreateTableIfNotExists extends QueryTest with 
BeforeAndAfterAll {
     }
   }
 
-  test("test blocking of create table like command") {
-    sql("create table sourceTable(name string) stored by 'carbondata'")
-    val exception = intercept[MalformedCarbonCommandException] {
-      sql("create table targetTable like sourceTable")
-    }
-    assert(exception.getMessage.contains("Operation not allowed, when source 
table is carbon table"))
-  }
-
   test("test create table if not exist concurrently") {
 
     val executorService: ExecutorService = Executors.newFixedThreadPool(10)
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableLike.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableLike.scala
new file mode 100644
index 0000000..29df400
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableLike.scala
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import java.util
+
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+class TestCreateTableLike extends QueryTest with BeforeAndAfterEach with 
BeforeAndAfterAll{
+
+  override protected def beforeAll(): Unit = {
+    sql("drop table if exists pt_tbl")
+    sql("drop table if exists hive_pt")
+    sql("drop table if exists bkt_tbl")
+    sql("drop table if exists stream_tbl")
+    sql("drop table if exists sourceTable")
+    sql(
+      """create table sourceTable
+        |(a int, b string)
+        |stored by 'carbondata'
+        |TBLPROPERTIES(
+        |  'SORT_COLUMNS'='b',
+        |  'SORT_SCOPE'='GLOBAL_SORT',
+        |  'DICTIONARY_INCLUDE'='b',
+        |  'LOCAL_DICTIONARY_ENABLE'='false',
+        |  'carbon.column.compress'='zstd',
+        |  'CACHE_LEVEL'='blocklet',
+        |  'COLUMN_META_CACHE'='a',
+        |  'TABLE_BLOCKSIZE'='256',
+        |  'TABLE_BLOCKLET_SIZE'='16')
+        |  """.stripMargin)
+    sql("insert into sourceTable values(5,'bb'),(6,'cc')")
+  }
+
+  override protected def afterAll(): Unit = {
+    sql("drop table if exists pt_tbl")
+    sql("drop table if exists hive_pt")
+    sql("drop table if exists bkt_tbl")
+    sql("drop table if exists stream_tbl")
+    sql("drop table if exists sourceTable")
+  }
+
+  override protected def beforeEach(): Unit = {
+    sql("drop table if exists targetTable")
+  }
+
+  override protected def afterEach(): Unit = {
+    sql("drop table if exists targetTable")
+  }
+
+  def checkColumns(left: util.List[ColumnSchema], right: 
util.List[ColumnSchema]): Boolean = {
+    if (left.size != right.size) {
+      false
+    } else {
+      for(i <- 0 until left.size()) {
+        if (!left.get(i).equals(right.get(i))) {
+          return false
+        }
+      }
+      true
+    }
+  }
+
+  def checkTableProperties(src: TableIdentifier, dst: TableIdentifier): Unit = 
{
+    val info_src = 
CarbonEnv.getCarbonTable(src)(sqlContext.sparkSession).getTableInfo
+    val info_dst = 
CarbonEnv.getCarbonTable(dst)(sqlContext.sparkSession).getTableInfo
+
+    val fact_src = info_src.getFactTable
+    val fact_dst = info_dst.getFactTable
+
+    // check column schemas same
+    assert(checkColumns(fact_src.getListOfColumns, fact_dst.getListOfColumns))
+
+    // check table properties same
+    assert(fact_src.getTableProperties.equals(fact_dst.getTableProperties))
+
+    // check transaction same
+    assert(!(info_src.isTransactionalTable ^ info_dst.isTransactionalTable))
+
+    // check bucket info same
+    if (null == fact_src.getBucketingInfo) {
+      assert(null == fact_dst.getBucketingInfo)
+    } else {
+      assert(null != fact_dst.getBucketingInfo)
+      assert(fact_src.getBucketingInfo.getNumOfRanges == 
fact_dst.getBucketingInfo.getNumOfRanges)
+      
assert(checkColumns(fact_src.getBucketingInfo.getListOfColumns,fact_dst.getBucketingInfo.getListOfColumns))
+    }
+
+    // check partition info same
+    if (null == fact_src.getPartitionInfo) {
+      assert(null == fact_dst.getPartitionInfo)
+    } else {
+      assert(null != fact_dst.getPartitionInfo)
+      assert(fact_src.getPartitionInfo.getPartitionType == 
fact_dst.getPartitionInfo.getPartitionType)
+      assert(checkColumns(fact_src.getPartitionInfo.getColumnSchemaList, 
fact_dst.getPartitionInfo.getColumnSchemaList))
+    }
+
+    // check different id
+    assert(!info_src.getTableUniqueName.equals(info_dst.getTableUniqueName))
+    
assert(!info_src.getOrCreateAbsoluteTableIdentifier().getTablePath.equals(info_dst.getOrCreateAbsoluteTableIdentifier.getTablePath))
+    
assert(!info_src.getFactTable.getTableId.equals(info_dst.getFactTable.getTableId))
+    
assert(!info_src.getFactTable.getTableName.equals(info_dst.getFactTable.getTableName))
+  }
+
+  test("create table like simple table") {
+    sql("create table targetTable like sourceTable")
+    checkTableProperties(TableIdentifier("sourceTable"), 
TableIdentifier("targetTable"))
+  }
+
+  test("test same table name") {
+    val exception = intercept[TableAlreadyExistsException] {
+      sql("create table sourceTable like sourceTable")
+    }
+    assert(exception.getMessage.contains("already exists in database"))
+  }
+
+  test("command with location") {
+    sql(s"create table targetTable like sourceTable location 
'$warehouse/tbl_with_loc' ")
+    checkTableProperties(TableIdentifier("sourceTable"), 
TableIdentifier("targetTable"))
+  }
+
+  test("table with datamap") {
+    // datamap relation does not store in parent table
+    sql(
+      s"""
+         | CREATE DATAMAP dm1 ON TABLE sourceTable
+         | USING 'bloomfilter'
+         | DMProperties('INDEX_COLUMNS'='b', 'BLOOM_SIZE'='32000')
+      """.stripMargin)
+    sql("create table targetTable like sourceTable")
+    checkTableProperties(TableIdentifier("sourceTable"), 
TableIdentifier("targetTable"))
+  }
+
+  test("table with partition") {
+    sql("""
+          | CREATE TABLE IF NOT EXISTS pt_tbl(
+          |   a int, b string
+          | )
+          | PARTITIONED BY (area string)
+          | STORED BY 'carbondata'
+          | TBLPROPERTIES('PARTITION_TYPE'='LIST',
+          | 'LIST_INFO'='Asia, America, Europe')
+        """.stripMargin)
+    sql("create table targetTable like pt_tbl")
+    checkTableProperties(TableIdentifier("pt_tbl"), 
TableIdentifier("targetTable"))
+  }
+
+  test("table with hive partition") {
+    sql(
+      """
+        | CREATE TABLE hive_pt (
+        | a int, b string)
+        | PARTITIONED BY (id int)
+        | STORED BY 'carbondata'
+      """.stripMargin)
+    sql("create table targetTable like hive_pt")
+    checkTableProperties(TableIdentifier("hive_pt"), 
TableIdentifier("targetTable"))
+  }
+
+  test("table with bucket") {
+    sql("""
+        | CREATE TABLE IF NOT EXISTS bkt_tbl (
+        |   a int, b string
+        | ) STORED BY 'carbondata'
+        | TBLPROPERTIES ('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='b')
+        | """.stripMargin)
+
+    sql("create table targetTable like bkt_tbl")
+    checkTableProperties(TableIdentifier("bkt_tbl"), 
TableIdentifier("targetTable"))
+  }
+
+  test("table with streaming") {
+    sql("""
+          | CREATE TABLE IF NOT EXISTS stream_tbl (
+          |   a int, b string
+          | ) STORED BY 'carbondata'
+          | TBLPROPERTIES ('streaming' = 'true')
+          | """.stripMargin)
+
+    sql("create table targetTable like stream_tbl")
+    checkTableProperties(TableIdentifier("stream_tbl"), 
TableIdentifier("targetTable"))
+  }
+
+  test("table with schema changed") {
+    sql("ALTER TABLE sourceTable ADD COLUMNS(charField STRING) " +
+      "TBLPROPERTIES ('DEFAULT.VALUE.charfield'='def')")
+    sql("create table targetTable like sourceTable")
+    checkTableProperties(TableIdentifier("sourceTable"), 
TableIdentifier("targetTable"))
+  }
+
+}
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableLikeCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableLikeCommand.scala
new file mode 100644
index 0000000..4e8911b
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableLikeCommand.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.table
+
+import java.util
+import java.util.UUID
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.MetadataCommand
+
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.{SchemaEvolution, 
SchemaEvolutionEntry}
+import org.apache.carbondata.core.metadata.schema.table.{TableInfo, 
TableSchema}
+
+/**
+* The syntax of using this command in SQL is:
+* {{{
+*   CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
+*   LIKE [other_db_name.]existing_table_name
+* }}}
+*/
+case class CarbonCreateTableLikeCommand(
+    sourceTable: TableIdentifier,
+    targetTable: TableIdentifier,
+    ifNotExists: Boolean = false) extends MetadataCommand {
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    val srcTable = CarbonEnv.getCarbonTable(sourceTable.database, 
sourceTable.table)(sparkSession)
+    if (!srcTable.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non 
transactional table")
+    }
+    if (srcTable.isChildTable || srcTable.isChildDataMap) {
+      throw new MalformedCarbonCommandException("Unsupported operation on 
child table or datamap")
+    }
+
+    // copy schema of source table and update fields to target table
+    val dstTableSchema = 
srcTable.getTableInfo.getFactTable.clone().asInstanceOf[TableSchema]
+    dstTableSchema.setTableName(targetTable.table)
+    dstTableSchema.setTableId(UUID.randomUUID().toString)
+
+    val schemaEvol: SchemaEvolution = new SchemaEvolution
+    val schEntryList: util.List[SchemaEvolutionEntry] = new 
util.ArrayList[SchemaEvolutionEntry]
+    schemaEvol.setSchemaEvolutionEntryList(schEntryList)
+    dstTableSchema.setSchemaEvolution(schemaEvol)
+
+    // build table info for creating table
+    val dstTableInfo = new TableInfo
+    val dstDB = 
targetTable.database.getOrElse(sparkSession.catalog.currentDatabase)
+    dstTableInfo.setDatabaseName(dstDB)
+    dstTableInfo.setLastUpdatedTime(System.currentTimeMillis())
+    dstTableInfo.setFactTable(dstTableSchema)
+
+    CarbonCreateTableCommand(dstTableInfo, ifNotExists).run(sparkSession)
+  }
+
+  override protected def opName: String = "CREATE TABLE LIKE"
+}
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 3ef8cfa..37a33ec 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.command._
 import 
org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand,
 CarbonInsertIntoCommand, CarbonLoadDataCommand, RefreshCarbonTableCommand}
 import 
org.apache.spark.sql.execution.command.partition.{CarbonAlterTableAddHivePartitionCommand,
 CarbonAlterTableDropHivePartitionCommand, CarbonShowCarbonPartitionsCommand}
 import org.apache.spark.sql.execution.command.schema._
-import 
org.apache.spark.sql.execution.command.table.{CarbonDescribeFormattedCommand, 
CarbonDropTableCommand}
+import 
org.apache.spark.sql.execution.command.table.{CarbonCreateTableLikeCommand, 
CarbonDescribeFormattedCommand, CarbonDropTableCommand}
 import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, 
CarbonResetCommand, CarbonSetCommand}
 import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => 
DescribeTableCommand}
 import org.apache.spark.sql.execution.datasources.{RefreshResource, 
RefreshTable}
@@ -37,8 +37,6 @@ import org.apache.spark.util.{CarbonReflectionUtils, 
DataMapUtil, FileUtils, Spa
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, 
ThreadLocalSessionInfo}
 import org.apache.carbondata.spark.util.Util
 
@@ -97,8 +95,8 @@ class DDLStrategy(sparkSession: SparkSession) extends 
SparkStrategy {
         val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetaStore
           .tableExists(createLikeTable.sourceTable)(sparkSession)
         if (isCarbonTable) {
-          throw new MalformedCarbonCommandException(
-            "Operation not allowed, when source table is carbon table")
+          
ExecutedCommandExec(CarbonCreateTableLikeCommand(createLikeTable.sourceTable,
+            createLikeTable.targetTable, createLikeTable.ifNotExists)) :: Nil
         } else {
           ExecutedCommandExec(createLikeTable) :: Nil
         }

Reply via email to