[CARBONDATA-1968] Add external table support This PR adds support for creating external table with existing carbondata files, using Hive syntax. CREATE EXTERNAL TABLE tableName STORED BY 'carbondata' LOCATION 'path'
This closes #1749 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9c5934d7 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9c5934d7 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9c5934d7 Branch: refs/heads/carbonstore-rebase Commit: 9c5934d7e0a71da5bf96db966a4fce7296c6b60c Parents: 51421fd Author: Jacky Li <jacky.li...@qq.com> Authored: Tue Jan 2 23:46:14 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Fri Feb 9 01:39:54 2018 +0800 ---------------------------------------------------------------------- .../core/metadata/schema/table/CarbonTable.java | 9 ++ .../createTable/TestCreateExternalTable.scala | 91 ++++++++++++++++++++ .../TestDataWithDicExcludeAndInclude.scala | 10 --- .../command/table/CarbonDropTableCommand.scala | 5 +- .../spark/sql/parser/CarbonSparkSqlParser.scala | 64 +++++++++----- 5 files changed, 147 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c5934d7/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index 09ff440..6036569 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -785,6 +785,15 @@ public class CarbonTable implements Serializable { && !tableInfo.getParentRelationIdentifiers().isEmpty(); } + /** + * Return true if this is an external table (table with property "_external"="true", this is + * an internal table property set during table creation) + */ + public boolean isExternalTable() { + String external = tableInfo.getFactTable().getTableProperties().get("_external"); + return external != null && external.equalsIgnoreCase("true"); + } + public long size() throws IOException { Map<String, Long> dataIndexSize = CarbonUtil.calculateDataIndexSize(this); Long dataSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE); http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c5934d7/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala new file mode 100644 index 0000000..67370eb --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.File + +import org.apache.spark.sql.{AnalysisException, CarbonEnv} +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +class TestCreateExternalTable extends QueryTest with BeforeAndAfterAll { + + var originDataPath: String = _ + + override def beforeAll(): Unit = { + sql("DROP TABLE IF EXISTS origin") + // create carbon table and insert data + sql("CREATE TABLE origin(key INT, value STRING) STORED BY 'carbondata'") + sql("INSERT INTO origin select 100,'spark'") + sql("INSERT INTO origin select 200,'hive'") + originDataPath = s"$storeLocation/origin" + } + + override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS origin") + } + + test("create external table with existing files") { + assert(new File(originDataPath).exists()) + sql("DROP TABLE IF EXISTS source") + + // create external table with existing files + sql( + s""" + |CREATE EXTERNAL TABLE source + |STORED BY 'carbondata' + |LOCATION '$storeLocation/origin' + """.stripMargin) + checkAnswer(sql("SELECT count(*) from source"), sql("SELECT count(*) from origin")) + + val carbonTable = CarbonEnv.getCarbonTable(None, "source")(sqlContext.sparkSession) + assert(carbonTable.isExternalTable) + + sql("DROP TABLE IF EXISTS source") + + // DROP TABLE should not delete data + assert(new File(originDataPath).exists()) + } + + test("create external table with empty folder") { + val exception = intercept[AnalysisException] { + sql( + s""" + |CREATE EXTERNAL TABLE source + |STORED BY 'carbondata' + |LOCATION './nothing' + """.stripMargin) + } + assert(exception.getMessage().contains("Invalid table path provided")) + } + + test("create external table with CTAS") { + val exception = intercept[AnalysisException] { + sql( + """ + |CREATE EXTERNAL TABLE source + |STORED BY 'carbondata' + |LOCATION './nothing' + |AS + | SELECT * FROM origin + """.stripMargin) + } + assert(exception.getMessage().contains("Create external table as select")) + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c5934d7/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala index c788857..201da39 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala @@ -90,16 +90,6 @@ class TestLoadDataWithDictionaryExcludeAndInclude extends QueryTest with BeforeA ) } - test("test create external table should fail") { - assert(intercept[AnalysisException]( - sql( - """ - | CREATE EXTERNAL TABLE t1 (id string, value int) - | STORED BY 'carbondata' - """.stripMargin) - ).message.contains("Operation not allowed: CREATE EXTERNAL TABLE")) - } - override def afterAll { dropTable CarbonProperties.getInstance() http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c5934d7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala index 7c895ab..8001a93 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala @@ -147,7 +147,10 @@ case class CarbonDropTableCommand( // delete the table folder val tablePath = carbonTable.getTablePath val fileType = FileFactory.getFileType(tablePath) - if (FileFactory.isFileExist(tablePath, fileType)) { + + // delete table data only if it is not external table + if (FileFactory.isFileExist(tablePath, fileType) && + !carbonTable.isExternalTable) { val file = FileFactory.getCarbonFile(tablePath, fileType) CarbonUtil.deleteFoldersAndFilesSilent(file) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c5934d7/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala index 4b77417..ad6d0c7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala @@ -32,6 +32,8 @@ import org.apache.spark.sql.types.StructField import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.CarbonReflectionUtils +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.hadoop.util.SchemaReader import org.apache.carbondata.spark.CarbonOption import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.CommonUtil @@ -164,9 +166,6 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, if (bucketSpecContext != null) { operationNotAllowed("CREATE TABLE ... CLUSTERED BY", bucketSpecContext) } - if (external) { - operationNotAllowed("CREATE EXTERNAL TABLE", tableHeader) - } val cols = Option(columns).toSeq.flatMap(visitColTypeList) val properties = getPropertyKeyValues(tablePropertyList) @@ -231,6 +230,10 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, operationNotAllowed( "Schema may not be specified in a Create Table As Select (CTAS) statement", columns) } + // external table is not allow + if (external) { + operationNotAllowed("Create external table as select", tableHeader) + } fields = parser .getFields(CarbonEnv.getInstance(sparkSession).carbonMetastore .getSchemaFromUnresolvedRelation(sparkSession, Some(q).get)) @@ -242,29 +245,48 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, } // validate tblProperties val bucketFields = parser.getBucketFields(tableProperties, fields, options) - // prepare table model of the collected tokens - val tableModel: TableModel = parser.prepareTableModel( - ifNotExists, - convertDbNameToLowerCase(tableIdentifier.database), - tableIdentifier.table.toLowerCase, - fields, - partitionFields, - tableProperties, - bucketFields, - isAlterFlow = false, - tableComment) + val tableInfo = if (external) { + // read table info from schema file in the provided table path + val identifier = AbsoluteTableIdentifier.from( + tablePath.get, + CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession), + tableIdentifier.table) + val table = try { + SchemaReader.getTableInfo(identifier) + } catch { + case e: Throwable => + operationNotAllowed(s"Invalid table path provided: ${tablePath.get} ", tableHeader) + } + // set "_external" property, so that DROP TABLE will not delete the data + table.getFactTable.getTableProperties.put("_external", "true") + table + } else { + // prepare table model of the collected tokens + val tableModel: TableModel = parser.prepareTableModel( + ifNotExists, + convertDbNameToLowerCase(tableIdentifier.database), + tableIdentifier.table.toLowerCase, + fields, + partitionFields, + tableProperties, + bucketFields, + isAlterFlow = false, + tableComment) + TableNewProcessor(tableModel) + } selectQuery match { case query@Some(q) => CarbonCreateTableAsSelectCommand( - TableNewProcessor(tableModel), - query.get, - tableModel.ifNotExistsSet, - tablePath) + tableInfo = tableInfo, + query = query.get, + ifNotExistsSet = ifNotExists, + tableLocation = tablePath) case _ => - CarbonCreateTableCommand(TableNewProcessor(tableModel), - tableModel.ifNotExistsSet, - tablePath) + CarbonCreateTableCommand( + tableInfo = tableInfo, + ifNotExistsSet = ifNotExists, + tableLocation = tablePath) } }