Repository: carbondata Updated Branches: refs/heads/master 3647aee3c -> 8bda43b05
[CARBONDATA-2262] Support the syntax of 'using CARBONDATA' to create table Add new function to Support the syntax of 'using CARBONDATA' to create table, for example: CREATE TABLE src_carbondata1(key INT, value STRING) using carbondata Be sure to do all of the following checklist to help us incorporate This closes #2081 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8bda43b0 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8bda43b0 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8bda43b0 Branch: refs/heads/master Commit: 8bda43b05dc64f692a6885f2228f60cb2a27ca6a Parents: 3647aee Author: root <[email protected]> Authored: Tue Mar 20 16:26:13 2018 +0800 Committer: Jacky Li <[email protected]> Committed: Thu Mar 29 19:25:53 2018 +0800 ---------------------------------------------------------------------- .../examples/SparkSessionExample.scala | 2 +- .../hive/CarbonHiveMetastoreListener.scala | 10 +-- .../sql/commands/UsingCarbondataSuite.scala | 72 ++++++++++++++++++++ .../sql/execution/strategy/DDLStrategy.scala | 6 +- .../spark/sql/hive/CarbonFileMetastore.scala | 6 +- 5 files changed, 87 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/8bda43b0/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala index 5155e36..1164658 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala @@ -78,7 +78,7 @@ object SparkSessionExample { | dateField DATE, | charField CHAR(5) | ) - | USING org.apache.spark.sql.CarbonSource + | USING carbondata | OPTIONS('DICTIONARY_INCLUDE'='dateField, charField', | 'dbName'='default', 'tableName'='sparksession_table') """.stripMargin) http://git-wip-us.apache.org/repos/asf/carbondata/blob/8bda43b0/integration/hive/src/main/scala/org/apache/carbondata/hive/CarbonHiveMetastoreListener.scala ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/scala/org/apache/carbondata/hive/CarbonHiveMetastoreListener.scala b/integration/hive/src/main/scala/org/apache/carbondata/hive/CarbonHiveMetastoreListener.scala index f7ee0ee..3f0f4f0 100644 --- a/integration/hive/src/main/scala/org/apache/carbondata/hive/CarbonHiveMetastoreListener.scala +++ b/integration/hive/src/main/scala/org/apache/carbondata/hive/CarbonHiveMetastoreListener.scala @@ -33,8 +33,9 @@ class CarbonHiveMetastoreListener(conf: Configuration) extends MetaStorePreEvent case CREATE_TABLE => val table = preEventContext.asInstanceOf[PreCreateTableEvent].getTable val tableProps = table.getParameters - if (tableProps != null && - tableProps.get("spark.sql.sources.provider") == "org.apache.spark.sql.CarbonSource") { + if (tableProps != null + && (tableProps.get("spark.sql.sources.provider") == "org.apache.spark.sql.CarbonSource" + || tableProps.get("spark.sql.sources.provider").equalsIgnoreCase("carbondata"))) { val numSchemaParts = tableProps.get("spark.sql.sources.schema.numParts") if (numSchemaParts != null && !numSchemaParts.isEmpty) { val parts = (0 until numSchemaParts.toInt).map { index => @@ -61,8 +62,9 @@ class CarbonHiveMetastoreListener(conf: Configuration) extends MetaStorePreEvent case ALTER_TABLE => val table = preEventContext.asInstanceOf[PreAlterTableEvent].getNewTable val tableProps = table.getParameters - if (tableProps != null && - tableProps.get("spark.sql.sources.provider") == "org.apache.spark.sql.CarbonSource") { + if (tableProps != null + && (tableProps.get("spark.sql.sources.provider") == "org.apache.spark.sql.CarbonSource" + || tableProps.get("spark.sql.sources.provider").equalsIgnoreCase("carbondata"))) { val numSchemaParts = tableProps.get("spark.sql.sources.schema.numParts") if (numSchemaParts != null && !numSchemaParts.isEmpty) { val schemaParts = (0 until numSchemaParts.toInt).map { index => http://git-wip-us.apache.org/repos/asf/carbondata/blob/8bda43b0/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala new file mode 100644 index 0000000..37d65b4 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sql.commands + +import scala.collection.mutable + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterEach + +import org.apache.carbondata.core.constants.CarbonCommonConstants + +class UsingCarbondataSuite extends QueryTest with BeforeAndAfterEach { + override def beforeEach(): Unit = { + sql("DROP TABLE IF EXISTS src_carbondata1") + sql("DROP TABLE IF EXISTS tableSize3") + } + + override def afterEach(): Unit = { + sql("DROP TABLE IF EXISTS src_carbondata1") + sql("DROP TABLE IF EXISTS tableSize3") + } + + test("CARBONDATA-2262: test check results of table with complex data type and bucketing") { + sql("DROP TABLE IF EXISTS create_source") + sql("CREATE TABLE create_source(intField INT, stringField STRING, complexField ARRAY<INT>) " + + "USING carbondata") + sql("""INSERT INTO create_source VALUES(1,"source","1$2$3")""") + checkAnswer(sql("SELECT * FROM create_source"), Row(1, "source", mutable.WrappedArray.newBuilder[Int].+=(1, 2, 3))) + sql("DROP TABLE IF EXISTS create_source") + } + + test("CARBONDATA-2262: Support the syntax of 'USING CarbonData' whithout tableName") { + sql("CREATE TABLE src_carbondata1(key INT, value STRING) USING carbondata") + sql("INSERT INTO src_carbondata1 VALUES(1,'source')") + checkAnswer(sql("SELECT * FROM src_carbondata1"), Row(1, "source")) + } + + test("CARBONDATA-2262: Support the syntax of 'STORED AS carbondata, get data size and index size after minor compaction") { + sql("CREATE TABLE tableSize3 (empno INT, workgroupcategory STRING, deptno INT, projectcode INT, attendance INT) USING carbondata") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE tableSize3 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE tableSize3 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE tableSize3 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE tableSize3 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE tableSize3 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE tableSize3 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""") + sql("ALTER TABLE tableSize3 COMPACT 'minor'") + checkExistence(sql("DESCRIBE FORMATTED tableSize3"), true, CarbonCommonConstants.TABLE_DATA_SIZE) + checkExistence(sql("DESCRIBE FORMATTED tableSize3"), true, CarbonCommonConstants.TABLE_INDEX_SIZE) + val res3 = sql("DESCRIBE FORMATTED tableSize3").collect() + .filter(row => row.getString(0).contains(CarbonCommonConstants.TABLE_DATA_SIZE) || + row.getString(0).contains(CarbonCommonConstants.TABLE_INDEX_SIZE)) + assert(res3.length == 2) + res3.foreach(row => assert(row.getString(1).trim.toLong > 0)) + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/8bda43b0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index b20349c..f9da0a7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -207,7 +207,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { ExecutedCommandExec(CarbonResetCommand()) :: Nil case org.apache.spark.sql.execution.datasources.CreateTable(tableDesc, mode, None) if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER - && tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource") => + && (tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource") + || tableDesc.provider.get.equalsIgnoreCase("carbondata")) => val updatedCatalog = CarbonSource.updateCatalogTableWithCarbonSchema(tableDesc, sparkSession) val cmd = @@ -215,7 +216,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { ExecutedCommandExec(cmd) :: Nil case CreateDataSourceTableCommand(table, ignoreIfExists) if table.provider.get != DDLUtils.HIVE_PROVIDER - && table.provider.get.equals("org.apache.spark.sql.CarbonSource") => + && (table.provider.get.equals("org.apache.spark.sql.CarbonSource") + || table.provider.get.equalsIgnoreCase("carbondata")) => val updatedCatalog = CarbonSource.updateCatalogTableWithCarbonSchema(table, sparkSession) val cmd = CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists) ExecutedCommandExec(cmd) :: Nil http://git-wip-us.apache.org/repos/asf/carbondata/blob/8bda43b0/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index ea0dd3a..7a8601a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -156,7 +156,8 @@ class CarbonFileMetastore extends CarbonMetaStore { val catalogTable = CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta", c).asInstanceOf[CatalogTable] catalogTable.provider match { - case Some(name) if name.equals("org.apache.spark.sql.CarbonSource") => name + case Some(name) if (name.equals("org.apache.spark.sql.CarbonSource") + || name.equalsIgnoreCase("carbondata")) => name case _ => throw new NoSuchTableException(database, tableIdentifier.table) } val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from( @@ -549,7 +550,8 @@ class CarbonFileMetastore extends CarbonMetaStore { val catalogTable = CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta", c).asInstanceOf[CatalogTable] catalogTable.provider match { - case Some(name) if name.equals("org.apache.spark.sql.CarbonSource") => name + case Some(name) if (name.equals("org.apache.spark.sql.CarbonSource") + || name.equalsIgnoreCase("carbondata")) => name case _ => throw new NoSuchTableException(tableIdentifier.database.get, tableIdentifier.table) }
