[CARBONDATA-2355] Support run SQL on carbondata files directly Support run SQL on carbondata files directly
This closes #2181 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9469e6bd Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9469e6bd Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9469e6bd Branch: refs/heads/carbonstore Commit: 9469e6bd4da5c75ba836fb550112cec01f666544 Parents: 4d22ddc Author: xubo245 <601450...@qq.com> Authored: Wed Apr 18 17:34:12 2018 +0800 Committer: chenliang613 <chenliang...@huawei.com> Committed: Fri Jun 1 18:01:33 2018 +0800 ---------------------------------------------------------------------- docs/sdk-guide.md | 7 ++ .../carbondata/examples/DirectSQLExample.scala | 100 +++++++++++++++++++ .../carbondata/examples/S3UsingSDkExample.scala | 2 +- ...FileInputFormatWithExternalCarbonTable.scala | 2 +- ...tCreateTableUsingSparkCarbonFileFormat.scala | 30 +++++- .../TestNonTransactionalCarbonTable.scala | 2 +- ...ransactionalCarbonTableWithComplexType.scala | 2 +- ...tSparkCarbonFileFormatWithSparkSession.scala | 2 +- .../datasources/SparkCarbonFileFormat.scala | 26 ++++- 9 files changed, 164 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/9469e6bd/docs/sdk-guide.md ---------------------------------------------------------------------- diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md index 360516a..ec70919 100644 --- a/docs/sdk-guide.md +++ b/docs/sdk-guide.md @@ -128,7 +128,14 @@ Each of SQL data types are mapped into data types of SDK. Following are the mapp | STRING | DataTypes.STRING | | DECIMAL | DataTypes.createDecimalType(precision, scale) | +## Run SQL on files directly +Instead of creating table and query it, you can also query that file directly with SQL. +### Example +``` +SELECT * FROM carbonfile.`$Path` +``` +Find example code at [DirectSQLExample](https://github.com/apache/carbondata/blob/master/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala) in the CarbonData repo. ## API List ### Class org.apache.carbondata.sdk.file.CarbonWriterBuilder http://git-wip-us.apache.org/repos/asf/carbondata/blob/9469e6bd/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala new file mode 100644 index 0000000..a011d80 --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.File + +import org.apache.commons.io.FileUtils + +import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.examples.util.ExampleUtils +import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema} + +/** + * Running SQL on carbon files directly + * No need to create table first + * TODO: support more than one carbon file + */ +object DirectSQLExample { + + // prepare SDK writer output + def buildTestData( + path: String, + num: Int = 3, + persistSchema: Boolean = false): Any = { + + // getCanonicalPath gives path with \, but the code expects /. + val writerPath = path.replace("\\", "/"); + + val fields: Array[Field] = new Array[Field](3) + fields(0) = new Field("name", DataTypes.STRING) + fields(1) = new Field("age", DataTypes.INT) + fields(2) = new Field("height", DataTypes.DOUBLE) + + try { + val builder = CarbonWriter + .builder() + .outputPath(writerPath) + .isTransactionalTable(true) + .uniqueIdentifier(System.currentTimeMillis) + .withBlockSize(2) + if (persistSchema) { + builder.persistSchemaFile(true) + } + val writer = builder.buildWriterForCSVInput(new Schema(fields)) + var i = 0 + while (i < num) { + writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) + i += 1 + } + writer.close() + } catch { + case e: Exception => throw e + } + } + + def cleanTestData(path: String): Unit = { + FileUtils.deleteDirectory(new File(path)) + } + + // scalastyle:off + def main(args: Array[String]) { + val carbonSession = ExampleUtils.createCarbonSession("DirectSQLExample") + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val path = s"$rootPath/examples/spark2/target/carbonFile/" + + import carbonSession._ + // 1. generate data file + cleanTestData(path) + buildTestData(path, 20) + val readPath = path + "Fact/Part0/Segment_null" + + println("Running SQL on carbon files directly") + try { + // 2. run queries directly, no need to create table first + sql(s"""select * FROM carbonfile.`$readPath` limit 10""".stripMargin).show() + } catch { + case e: Exception => throw e + } finally { + // 3.delete data files + cleanTestData(path) + } + } + // scalastyle:on +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9469e6bd/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala index 022b28e..1795960 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala @@ -36,7 +36,7 @@ object S3UsingSDKExample { num: Int = 3, persistSchema: Boolean = false): Any = { - // getCanonicalPath gives path with \, so code expects /. Need to handle in code ? + // getCanonicalPath gives path with \, but the code expects /. val writerPath = path.replace("\\", "/"); val fields: Array[Field] = new Array[Field](3) http://git-wip-us.apache.org/repos/asf/carbondata/blob/9469e6bd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala index 019b915..e6d39d3 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala @@ -38,7 +38,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be "../." + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") .getCanonicalPath - //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? + //getCanonicalPath gives path with \, but the code expects /. writerPath = writerPath.replace("\\", "/"); http://git-wip-us.apache.org/repos/asf/carbondata/blob/9469e6bd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala index 66be8e4..211bc8c 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala @@ -46,7 +46,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd "../." + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") .getCanonicalPath - //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? + //getCanonicalPath gives path with \, but the code expects /. writerPath = writerPath.replace("\\", "/"); val filePath = writerPath + "/Fact/Part0/Segment_null/" @@ -153,6 +153,34 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd cleanTestData() } + test("Running SQL directly and read carbondata files (sdk Writer Output) using the SparkCarbonFileFormat ") { + buildTestData(false) + assert(new File(filePath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + //data source file format + if (sqlContext.sparkContext.version.startsWith("2.1")) { + //data source file format + sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """) + } else if (sqlContext.sparkContext.version.startsWith("2.2")) { + //data source file format + sql( + s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION + |'$filePath' """.stripMargin) + } else { + // TO DO + } + + val directSQL = sql(s"""select * FROM carbonfile.`$filePath`""".stripMargin) + directSQL.show(false) + checkAnswer(sql("select * from sdkOutputTable"), directSQL) + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(filePath).exists()) + cleanTestData() + } + test("should not allow to alter datasource carbontable ") { buildTestData(false) http://git-wip-us.apache.org/repos/asf/carbondata/blob/9469e6bd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala index 61b37d5..0083733 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala @@ -55,7 +55,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { "../." + "./target/SparkCarbonFileFormat/WriterOutput/") .getCanonicalPath - //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? + //getCanonicalPath gives path with \, but the code expects /. writerPath = writerPath.replace("\\", "/") def buildTestDataSingleFile(): Any = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/9469e6bd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala index d4de428..19aaf72 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala @@ -39,7 +39,7 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo "../." + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") .getCanonicalPath - //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? + //getCanonicalPath gives path with \, but the code expects /. writerPath = writerPath.replace("\\", "/") http://git-wip-us.apache.org/repos/asf/carbondata/blob/9469e6bd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala index 54b23a5..79b64ae 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala @@ -36,7 +36,7 @@ object TestSparkCarbonFileFormatWithSparkSession { "../." + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") .getCanonicalPath - //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? + //getCanonicalPath gives path with \, but the code expects /. writerPath = writerPath.replace("\\", "/"); val filePath = writerPath + "/Fact/Part0/Segment_null/" http://git-wip-us.apache.org/repos/asf/carbondata/blob/9469e6bd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala index 934f5c7..697eec5 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources import java.net.URI +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.conf.Configuration @@ -68,8 +69,23 @@ class SparkCarbonFileFormat extends FileFormat override def inferSchema(sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { - val filePaths = CarbonUtil.getFilePathExternalFilePath( - options.get("path").get) + val filePaths = if (options.isEmpty) { + val carbondataFiles = files.seq.filter { each => + if (each.isFile) { + each.getPath.getName.contains(".carbondata") + } else { + false + } + } + + carbondataFiles.map { each => + each.getPath.toString + }.toList.asJava + } else { + CarbonUtil.getFilePathExternalFilePath( + options.get("path").get) + } + if (filePaths.size() == 0) { throw new SparkException("CarbonData file is not present in the location mentioned in DDL") } @@ -193,7 +209,11 @@ class SparkCarbonFileFormat extends FileFormat val fileSplit = new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty) - val path: String = options.get("path").get + val path: String = if (options.isEmpty) { + file.filePath + } else { + options.get("path").get + } val endindex: Int = path.indexOf("Fact") - 1 val tablePath = path.substring(0, endindex) lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(