http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala new file mode 100644 index 0000000..bfb9471 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.{File, FileFilter} + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.junit.Assert +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.sdk.file.{CarbonWriter, Schema} + + +class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll { + + var writerPath = new File(this.getClass.getResource("/").getPath + + + "../." + + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") + .getCanonicalPath + //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? + writerPath = writerPath.replace("\\", "/"); + + // prepare SDK writer output + def buildTestData(persistSchema: Boolean, outputMultipleFiles: Boolean): Any = { + + FileUtils.deleteDirectory(new File(writerPath)) + + val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString() + + try { + val builder = CarbonWriter.builder() + val writer = + if (persistSchema) { + builder.persistSchemaFile(true) + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).unManagedTable(true) + .uniqueIdentifier( + System.currentTimeMillis) + .buildWriterForCSVInput() + } else { + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).unManagedTable(true) + .uniqueIdentifier( + System.currentTimeMillis).withBlockSize(1).withBlockletSize(1) + .buildWriterForCSVInput() + } + var i = 0 + var row = 3 + if (outputMultipleFiles) { + row = 1000000 + } + while (i < row) { + writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) + i += 1 + } + writer.close() + } catch { + case ex: Exception => None + case _ => None + } + } + + def cleanTestData() = { + FileUtils.deleteDirectory(new File(writerPath)) + } + + def deleteFile(path: String, extension: String): Unit = { + val file: CarbonFile = FileFactory + .getCarbonFile(path, FileFactory.getFileType(path)) + + for (eachDir <- file.listFiles) { + if (!eachDir.isDirectory) { + if (eachDir.getName.endsWith(extension)) { + CarbonUtil.deleteFoldersAndFilesSilent(eachDir) + } + } else { + deleteFile(eachDir.getPath, extension) + } + } + } + + override def beforeAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + test("test create External Table with Schema with partition, should ignore schema and partition") + { + buildTestData(false, false) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + // with partition + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(name string) PARTITIONED BY (age int) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0), + Row("robot1", 1, 0.5), + Row("robot2", 2, 1.0))) + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + + test("read unmanaged table, files written from sdk Writer Output)") { + buildTestData(false, false) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable1") + + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable1 STORED BY 'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + checkAnswer(sql("select * from sdkOutputTable1"), Seq(Row("robot0", 0, 0.0), + Row("robot1", 1, 0.5), + Row("robot2", 2, 1.0))) + + checkAnswer(sql("select name from sdkOutputTable1"), Seq(Row("robot0"), + Row("robot1"), + Row("robot2"))) + + checkAnswer(sql("select age from sdkOutputTable1"), Seq(Row(0), Row(1), Row(2))) + + checkAnswer(sql("select * from sdkOutputTable1 where age > 1 and age < 8"), + Seq(Row("robot2", 2, 1.0))) + + checkAnswer(sql("select * from sdkOutputTable1 where name = 'robot2'"), + Seq(Row("robot2", 2, 1.0))) + + checkAnswer(sql("select * from sdkOutputTable1 where name like '%obot%' limit 2"), + Seq(Row("robot0", 0, 0.0), + Row("robot1", 1, 0.5))) + + checkAnswer(sql("select sum(age) from sdkOutputTable1 where name like 'robot%'"), Seq(Row(3))) + + checkAnswer(sql("select count(*) from sdkOutputTable1 where name like 'robot%' "), Seq(Row(3))) + + checkAnswer(sql("select count(*) from sdkOutputTable1"), Seq(Row(3))) + + sql("DROP TABLE sdkOutputTable1") + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + + test("Test Blocked operations for unmanaged table ") { + buildTestData(false, false) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + //1. alter datatype + var exception = intercept[MalformedCarbonCommandException] { + sql("Alter table sdkOutputTable change age age BIGINT") + } + assert(exception.getMessage() + .contains("Unsupported operation on unmanaged table")) + + //2. Load + exception = intercept[MalformedCarbonCommandException] { + sql("LOAD DATA LOCAL INPATH '/path/to/data' INTO TABLE sdkOutputTable ") + } + assert(exception.getMessage() + .contains("Unsupported operation on unmanaged table")) + + //3. Datamap creation + exception = intercept[MalformedCarbonCommandException] { + sql( + "CREATE DATAMAP agg_sdkOutputTable ON TABLE sdkOutputTable USING \"preaggregate\" AS " + + "SELECT name, sum(age) FROM sdkOutputTable GROUP BY name,age") + } + assert(exception.getMessage() + .contains("Unsupported operation on unmanaged table")) + + //4. Insert Into + exception = intercept[MalformedCarbonCommandException] { + sql("insert into table sdkOutputTable SELECT 20,'robotX',2.5") + } + assert(exception.getMessage() + .contains("Unsupported operation on unmanaged table")) + + //5. compaction + exception = intercept[MalformedCarbonCommandException] { + sql("ALTER TABLE sdkOutputTable COMPACT 'MAJOR'") + } + assert(exception.getMessage() + .contains("Unsupported operation on unmanaged table")) + + //6. Show segments + exception = intercept[MalformedCarbonCommandException] { + sql("Show segments for table sdkOutputTable").show(false) + } + assert(exception.getMessage() + .contains("Unsupported operation on unmanaged table")) + + //7. Delete segment by ID + exception = intercept[MalformedCarbonCommandException] { + sql("DELETE FROM TABLE sdkOutputTable WHERE SEGMENT.ID IN (0)") + } + assert(exception.getMessage() + .contains("Unsupported operation on unmanaged table")) + + //8. Delete segment by date + exception = intercept[MalformedCarbonCommandException] { + sql("DELETE FROM TABLE sdkOutputTable WHERE SEGMENT.STARTTIME BEFORE '2017-06-01 12:05:06'") + } + assert(exception.getMessage() + .contains("Unsupported operation on unmanaged table")) + + //9. Update Segment + exception = intercept[MalformedCarbonCommandException] { + sql("UPDATE sdkOutputTable SET (age) = (age + 9) ").show(false) + } + assert(exception.getMessage() + .contains("Unsupported operation on unmanaged table")) + + //10. Delete Segment + exception = intercept[MalformedCarbonCommandException] { + sql("DELETE FROM sdkOutputTable where name='robot1'").show(false) + } + assert(exception.getMessage() + .contains("Unsupported operation on unmanaged table")) + + //11. Show partition + exception = intercept[MalformedCarbonCommandException] { + sql("Show partitions sdkOutputTable").show(false) + } + assert(exception.getMessage() + .contains("Unsupported operation on unmanaged table")) + + //12. Streaming table creation + // External table don't accept table properties + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + + test("test create External Table With Schema, should ignore the schema provided") { + buildTestData(false, false) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + // with schema + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(age int) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0), + Row("robot1", 1, 0.5), + Row("robot2", 2, 1.0))) + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + + test("Read sdk writer output file without Carbondata file should fail") { + buildTestData(false, false) + deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + val exception = intercept[Exception] { + // data source file format + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION + |'$writerPath' """.stripMargin) + } + assert(exception.getMessage() + .contains("Operation not allowed: Invalid table path provided:")) + + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + + + test("Read sdk writer output file without any file should fail") { + buildTestData(false, false) + deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT) + deleteFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + val exception = intercept[Exception] { + //data source file format + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + sql("select * from sdkOutputTable").show(false) + } + assert(exception.getMessage() + .contains("Operation not allowed: Invalid table path provided:")) + + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + + test("Read sdk writer output multiple files ") { + buildTestData(false, true) + assert(new File(writerPath).exists()) + val folder = new File(writerPath) + val dataFiles = folder.listFiles(new FileFilter() { + override def accept(pathname: File): Boolean = { + pathname.getName + .endsWith(CarbonCommonConstants.FACT_FILE_EXT) + } + }) + Assert.assertNotNull(dataFiles) + Assert.assertNotEquals(1, dataFiles.length) + + sql("DROP TABLE IF EXISTS sdkOutputTable") + + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + checkAnswer(sql("select count(*) from sdkOutputTable"), Seq(Row(1000000))) + + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + + +}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala index b56bd6e..0c96247 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala @@ -38,6 +38,7 @@ import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec} import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable import org.apache.carbondata.core.metadata.schema.table.DataMapSchema import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} +import org.apache.carbondata.core.readcommitter.ReadCommittedScope import org.apache.carbondata.core.scan.expression.Expression import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression import org.apache.carbondata.core.scan.filter.intf.ExpressionType @@ -69,7 +70,7 @@ class CGDataMapFactory extends CoarseGrainDataMapFactory { /** * Get the datamap for segmentid */ - override def getDataMaps(segment: Segment): java.util.List[CoarseGrainDataMap] = { + override def getDataMaps(segment: Segment, readCommitted: ReadCommittedScope): java.util.List[CoarseGrainDataMap] = { val file = FileFactory.getCarbonFile( CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo)) @@ -87,8 +88,7 @@ class CGDataMapFactory extends CoarseGrainDataMapFactory { /** * Get datamaps for distributable object. */ - override def getDataMaps( - distributable: DataMapDistributable): java.util.List[CoarseGrainDataMap] = { + override def getDataMaps(distributable: DataMapDistributable, readCommitted: ReadCommittedScope): java.util.List[CoarseGrainDataMap] = { val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable] val dataMap: CoarseGrainDataMap = new CGDataMap() dataMap.init(new DataMapModel(mapDistributable.getFilePath)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala index bb8d7f8..270c676 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala @@ -34,6 +34,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.metadata.schema.table.DataMapSchema +import org.apache.carbondata.core.readcommitter.ReadCommittedScope import org.apache.carbondata.core.scan.filter.intf.ExpressionType import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.events.Event @@ -53,9 +54,9 @@ class C2DataMapFactory() extends CoarseGrainDataMapFactory { override def clear(): Unit = {} - override def getDataMaps(distributable: DataMapDistributable): util.List[CoarseGrainDataMap] = ??? + override def getDataMaps(distributable: DataMapDistributable, readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = ??? - override def getDataMaps(segment: Segment): util.List[CoarseGrainDataMap] = ??? + override def getDataMaps(segment: Segment, readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = ??? override def createWriter(segment: Segment, dataWritePath: String): DataMapWriter = DataMapWriterSuite.dataMapWriterC2Mock(identifier, segment, dataWritePath) http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala index 3bfc7b9..060d06a 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala @@ -25,7 +25,7 @@ import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll -import org.apache.carbondata.core.datamap.dev.{DataMapModel} +import org.apache.carbondata.core.datamap.dev.DataMapModel import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment} import org.apache.carbondata.core.datamap.dev.fgdatamap.{FineGrainBlocklet, FineGrainDataMap, FineGrainDataMapFactory} import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapWriter} @@ -40,6 +40,7 @@ import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec} import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable import org.apache.carbondata.core.metadata.schema.table.DataMapSchema import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} +import org.apache.carbondata.core.readcommitter.ReadCommittedScope import org.apache.carbondata.core.scan.expression.Expression import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression import org.apache.carbondata.core.scan.filter.intf.ExpressionType @@ -71,7 +72,7 @@ class FGDataMapFactory extends FineGrainDataMapFactory { /** * Get the datamap for segmentid */ - override def getDataMaps(segment: Segment): java.util.List[FineGrainDataMap] = { + override def getDataMaps(segment: Segment, readCommitted: ReadCommittedScope): java.util.List[FineGrainDataMap] = { val file = FileFactory .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo)) @@ -88,8 +89,7 @@ class FGDataMapFactory extends FineGrainDataMapFactory { /** * Get datamap for distributable object. */ - override def getDataMaps( - distributable: DataMapDistributable): java.util.List[FineGrainDataMap]= { + override def getDataMaps(distributable: DataMapDistributable, readCommitted: ReadCommittedScope): java.util.List[FineGrainDataMap]= { val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable] val dataMap: FineGrainDataMap = new FGDataMap() dataMap.init(new DataMapModel(mapDistributable.getFilePath)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala index 32c10ef..be9cc51 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala @@ -33,6 +33,7 @@ import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Se import org.apache.carbondata.core.datastore.page.ColumnPage import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.DataMapSchema +import org.apache.carbondata.core.readcommitter.ReadCommittedScope import org.apache.carbondata.core.scan.filter.intf.ExpressionType import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.events.Event @@ -183,9 +184,15 @@ class TestDataMap() extends CoarseGrainDataMapFactory { override def clear(): Unit = {} - override def getDataMaps(distributable: DataMapDistributable): util.List[CoarseGrainDataMap] = ??? + override def getDataMaps(distributable: DataMapDistributable, + readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = { + ??? + } - override def getDataMaps(segment: Segment): util.List[CoarseGrainDataMap] = ??? + override def getDataMaps(segment: Segment, + readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = { + ??? + } override def createWriter(segment: Segment, writeDirectoryPath: String): DataMapWriter = { new DataMapWriter(identifier, segment, writeDirectoryPath) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala index 1f4b7fe..65857b1 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala @@ -35,6 +35,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, RelationIdentifier} +import org.apache.carbondata.core.readcommitter.ReadCommittedScope import org.apache.carbondata.core.scan.filter.intf.ExpressionType import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.events.Event @@ -276,9 +277,9 @@ class WaitingDataMap() extends CoarseGrainDataMapFactory { override def clear(): Unit = {} - override def getDataMaps(distributable: DataMapDistributable): util.List[CoarseGrainDataMap] = ??? + override def getDataMaps(distributable: DataMapDistributable, readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = ??? - override def getDataMaps(segment: Segment): util.List[CoarseGrainDataMap] = ??? + override def getDataMaps(segment: Segment, readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = ??? override def createWriter(segment: Segment, writeDirectoryPath: String): DataMapWriter = { new DataMapWriter(identifier, segment, writeDirectoryPath) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 8c433ea..d34d009 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -45,7 +45,7 @@ import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.datastore.block.Distributable import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} +import org.apache.carbondata.core.metadata.schema.table.{TableInfo} import org.apache.carbondata.core.scan.expression.Expression import org.apache.carbondata.core.scan.filter.FilterUtil import org.apache.carbondata.core.scan.model.QueryModel @@ -495,6 +495,8 @@ class CarbonScanRDD( if (partitionNames != null) { CarbonInputFormat.setPartitionsToPrune(conf, partitionNames.asJava) } + + CarbonInputFormat.setUnmanagedTable(conf, tableInfo.isUnManagedTable) createInputFormat(conf) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala index 605777d..22a0112 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala @@ -158,7 +158,7 @@ object PartitionUtils { .createCarbonTableInputFormat(identifier, partitionIds.asJava, job) CarbonInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo) val splits = format.getSplitsOfOneSegment(job, segmentId, - oldPartitionIdList.map(_.asInstanceOf[Integer]).asJava, partitionInfo) + oldPartitionIdList.map(_.asInstanceOf[Integer]).asJava, partitionInfo) val blockList = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]) val tableBlockInfoList = CarbonInputSplit.createBlocks(blockList.asJava) tableBlockInfoList http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala index 5f78397..b9e2442 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala @@ -33,14 +33,14 @@ object CarbonSparkUtil { def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = { val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getTableName) - .asScala.map(x => x.getColName) // wf : may be problem + .asScala.map(x => x.getColName) // wf : may be problem val measureAttr = carbonTable.getMeasureByTableName(carbonTable.getTableName) - .asScala.map(x => x.getColName) + .asScala.map(x => x.getColName) val dictionary = carbonTable.getDimensionByTableName(carbonTable.getTableName).asScala.map { f => (f.getColName.toLowerCase, - f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) && - !f.getDataType.isComplexType) + f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) && + !f.getDataType.isComplexType) } CarbonMetaData(dimensionsAttr, measureAttr, http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala index b27a150..e29986a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala @@ -76,6 +76,8 @@ case class CarbonCountStar( SparkHadoopUtil.get.addCredentials(jobConf) val job = new Job(jobConf) FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath)) + CarbonInputFormat + .setUnmanagedTable(job.getConfiguration, carbonTable.getTableInfo.isUnManagedTable) (job, carbonInputFormat) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index 693b6c8..ef23926 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -299,7 +299,10 @@ object CarbonSource { tableDesc.copy(storage = updatedFormat) } else { val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava) - if (!metaStore.isReadFromHiveMetaStore) { + val isExternal = properties.getOrElse("isExternal", "false") + val isUnManagedTable = properties.getOrElse("isUnManaged", "false").contains("true") + tableInfo.setUnManagedTable(isUnManagedTable) + if (!isUnManagedTable && !metaStore.isReadFromHiveMetaStore) { // save to disk metaStore.saveToDisk(tableInfo, properties("tablePath")) // remove schema string from map as we don't store carbon schema to hive metastore @@ -320,16 +323,20 @@ object CarbonSource { properties: Map[String, String]): Map[String, String] = { val model = createTableInfoFromParams(properties, dataSchema, identifier) val tableInfo: TableInfo = TableNewProcessor(model) + val isExternal = properties.getOrElse("isExternal", "false") + val isUnManagedTable = properties.getOrElse("isUnManaged", "false").contains("true") + val tablePath = properties.getOrElse("path", "") tableInfo.setTablePath(identifier.getTablePath) + tableInfo.setUnManagedTable(isUnManagedTable) tableInfo.setDatabaseName(identifier.getDatabaseName) val schemaEvolutionEntry = new SchemaEvolutionEntry schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime) tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry) - val map = if (metaStore.isReadFromHiveMetaStore) { - CarbonUtil.convertToMultiStringMap(tableInfo) - } else { + val map = if (!metaStore.isReadFromHiveMetaStore && !isUnManagedTable) { metaStore.saveToDisk(tableInfo, identifier.getTablePath) new java.util.HashMap[String, String]() + } else { + CarbonUtil.convertToMultiStringMap(tableInfo) } properties.foreach(e => map.put(e._1, e._2)) map.put("tablepath", identifier.getTablePath) http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala index a46d514..be5287f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala @@ -55,6 +55,11 @@ case class CarbonCreateDataMapCommand( CarbonEnv.getCarbonTable(table.database, table.table)(sparkSession) case _ => null } + + if (mainTable != null && mainTable.getTableInfo.isUnManagedTable) { + throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table") + } + if (mainTable != null && mainTable.getDataMapSchema(dataMapName) != null) { if (!ifNotExistsSet) { throw new MalformedDataMapCommandException(s"DataMap name '$dataMapName' already exist") http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index bccc4dd..dc96399 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -77,6 +77,10 @@ case class CarbonAlterTableCompactionCommand( } relation.carbonTable } + if (table.getTableInfo.isUnManagedTable) { + throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table") + } + if (CarbonUtil.hasAggregationDataMap(table) || (table.isChildDataMap && null == operationContext.getProperty(table.getTableName))) { // If the compaction request is of 'streaming' type then we need to generate loadCommands http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala index 81427a1..57ccd82 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.execution.command.{Checker, DataCommand} import org.apache.carbondata.api.CarbonStore +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.events.{DeleteSegmentByIdPostEvent, DeleteSegmentByIdPreEvent, OperationContext, OperationListenerBus} @@ -35,6 +36,10 @@ case class CarbonDeleteLoadByIdCommand( Checker.validateTableExists(databaseNameOp, tableName, sparkSession) val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) + if (carbonTable.getTableInfo.isUnManagedTable) { + throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table") + } + // if insert overwrite in progress, do not allow delete segment if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) { throw new ConcurrentOperationException(carbonTable, "insert overwrite", "delete segment") http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala index 1d76bda..7d0655d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.execution.command.{Checker, DataCommand} import org.apache.carbondata.api.CarbonStore +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.events.{DeleteSegmentByDatePostEvent, DeleteSegmentByDatePreEvent, OperationContext, OperationListenerBus} @@ -36,6 +37,10 @@ case class CarbonDeleteLoadByLoadDateCommand( Checker.validateTableExists(databaseNameOp, tableName, sparkSession) val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) + if (carbonTable.getTableInfo.isUnManagedTable) { + throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table") + } + // if insert overwrite in progress, do not allow delete segment if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) { throw new ConcurrentOperationException(carbonTable, "insert overwrite", "delete segment") http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index f0a9a9e..3f86ca4 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -153,6 +153,9 @@ case class CarbonLoadDataCommand( } else { null } + if (table.getTableInfo.isUnManagedTable) { + throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table") + } // get the value of 'spark.executor.cores' from spark conf, default value is 1 val sparkExecutorCores = sparkSession.sparkContext.conf.get("spark.executor.cores", "1") // get the value of 'carbon.number.of.cores.while.loading' from carbon properties, http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala index 1e5885e..1e65887 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.execution.command.{Checker, DataCommand} import org.apache.spark.sql.types.{StringType, TimestampType} import org.apache.carbondata.api.CarbonStore +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException case class CarbonShowLoadsCommand( databaseNameOp: Option[String], @@ -43,6 +44,9 @@ case class CarbonShowLoadsCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { Checker.validateTableExists(databaseNameOp, tableName, sparkSession) val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) + if (carbonTable.getTableInfo.isUnManagedTable) { + throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table") + } CarbonStore.showSegments( limit, carbonTable.getMetadataPath http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala index 230378b..ca9a6a1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} @@ -44,6 +45,10 @@ private[sql] case class CarbonProjectForDeleteCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) + if (carbonTable.getTableInfo.isUnManagedTable) { + throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table") + } + if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) { throw new ConcurrentOperationException(carbonTable, "loading", "data delete") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala index 2a92478..24ac80c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.types.ArrayType import org.apache.spark.storage.StorageLevel +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.Segment @@ -56,6 +57,9 @@ private[sql] case class CarbonProjectForUpdateCommand( return Seq.empty } val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) + if (carbonTable.getTableInfo.isUnManagedTable) { + throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table") + } if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) { throw new ConcurrentOperationException(carbonTable, "loading", "data update") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala index e0e51ed..65c6269 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala @@ -39,6 +39,7 @@ case class CarbonCreateTableCommand( tableInfo: TableInfo, ifNotExistsSet: Boolean = false, tableLocation: Option[String] = None, + isExternal : Boolean = false, createDSTable: Boolean = true, isVisible: Boolean = true) extends MetadataCommand { @@ -89,6 +90,7 @@ case class CarbonCreateTableCommand( OperationListenerBus.getInstance.fireEvent(createTablePreExecutionEvent, operationContext) val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tableIdentifier) + val isUnmanaged = tableInfo.isUnManagedTable if (createDSTable) { try { val tablePath = tableIdentifier.getTablePath @@ -128,6 +130,8 @@ case class CarbonCreateTableCommand( | dbName "$dbName", | tablePath "$tablePath", | path "$tablePath", + | isExternal "$isExternal", + | isUnManaged "$isUnmanaged", | isVisible "$isVisible" | $carbonSchemaString) | $partitionString http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala index 2daece3..788a820 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala @@ -44,6 +44,7 @@ import org.apache.carbondata.core.datamap.{DataMapChooser, DataMapStoreManager, import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, ColumnarFormatVersion} +import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope import org.apache.carbondata.core.reader.CarbonHeaderReader import org.apache.carbondata.core.scan.expression.Expression import org.apache.carbondata.core.scan.expression.logical.AndExpression @@ -222,6 +223,8 @@ class SparkCarbonFileFormat extends FileFormat val segmentPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), "null") + val readCommittedScope = new LatestFilesReadCommittedScope( + identifier.getTablePath + "/Fact/Part0/Segment_null/") val indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(segmentPath) if (indexFiles.size() == 0) { throw new SparkException("Index file not present to read the carbondata file") @@ -233,7 +236,7 @@ class SparkCarbonFileFormat extends FileFormat .choose(tab, model.getFilterExpressionResolverTree) // TODO : handle the partition for CarbonFileLevelFormat - val prunedBlocklets = dataMapExprWrapper.prune(segments, null) + val prunedBlocklets = dataMapExprWrapper.prune(segments, null, readCommittedScope) val detailInfo = prunedBlocklets.get(0).getDetailInfo detailInfo.readColumnSchema(detailInfo.getColumnSchemaBinary) http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index a5a96af..ad3ad2e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -117,6 +117,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { if (carbonTable != null && carbonTable.isFileLevelFormat) { throw new MalformedCarbonCommandException( "Unsupported alter operation on Carbon external fileformat table") + } else if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) { + throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table") } else { ExecutedCommandExec(dataTypeChange) :: Nil } @@ -133,6 +135,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { if (carbonTable != null && carbonTable.isFileLevelFormat) { throw new MalformedCarbonCommandException( "Unsupported alter operation on Carbon external fileformat table") + } else if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) { + throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table") } else { ExecutedCommandExec(addColumn) :: Nil } @@ -149,6 +153,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { if (carbonTable != null && carbonTable.isFileLevelFormat) { throw new MalformedCarbonCommandException( "Unsupported alter operation on Carbon external fileformat table") + } else if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) { + throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table") } else { ExecutedCommandExec(dropColumn) :: Nil } @@ -181,6 +187,9 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { if (isCarbonTable) { val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore .lookupRelation(t)(sparkSession).asInstanceOf[CarbonRelation].carbonTable + if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) { + throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table") + } if (!carbonTable.isHivePartitionTable) { ExecutedCommandExec(CarbonShowCarbonPartitionsCommand(t)) :: Nil } else { @@ -231,6 +240,9 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { // if the table has 'preaggregate' DataMap, it doesn't support streaming now val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore .lookupRelation(tableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable + if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) { + throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table") + } // TODO remove this limitation later val property = properties.find(_._1.equalsIgnoreCase("streaming")) http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index 7a8601a..0075c13 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.hive import java.io.IOException import java.net.URI -import java.util.UUID import scala.collection.mutable.ArrayBuffer @@ -43,11 +42,10 @@ import org.apache.carbondata.core.fileoperations.FileWriteOperation import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema -import org.apache.carbondata.core.metadata.schema.table -import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.metadata.schema.{table, SchemaReader} +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.core.util.path.CarbonTablePath.getNewTablePath import org.apache.carbondata.core.writer.ThriftWriter import org.apache.carbondata.events.{LookupRelationPostEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} @@ -105,7 +103,8 @@ class CarbonFileMetastore extends CarbonMetaStore { case Some(t) => CarbonRelation(database, tableName, CarbonSparkUtil.createSparkMeta(t), t) case None => - readCarbonSchema(absIdentifier) match { + readCarbonSchema(absIdentifier, + parameters.getOrElse("isUnManaged", "false").toBoolean) match { case Some(meta) => CarbonRelation(database, tableName, CarbonSparkUtil.createSparkMeta(meta), meta) @@ -207,25 +206,51 @@ class CarbonFileMetastore extends CarbonMetaStore { true } - private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[CarbonTable] = { + def isTableInMetastore(identifier: AbsoluteTableIdentifier, + sparkSession: SparkSession): Boolean = { + sparkSession.sessionState.catalog.listTables(identifier.getDatabaseName) + .exists(_.table.equalsIgnoreCase(identifier.getTableName)) + } + + + private def readCarbonSchema(identifier: AbsoluteTableIdentifier, + inferSchema: Boolean): Option[CarbonTable] = { + + val schemaConverter = new ThriftWrapperSchemaConverterImpl val dbName = identifier.getCarbonTableIdentifier.getDatabaseName val tableName = identifier.getCarbonTableIdentifier.getTableName + val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableName) val tablePath = identifier.getTablePath - val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath) - val fileType = FileFactory.getFileType(tableMetadataFile) - if (FileFactory.isFileExist(tableMetadataFile, fileType)) { - val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableName) - val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile) - val schemaConverter = new ThriftWrapperSchemaConverterImpl + val wrapperTableInfo = + if (inferSchema) { + val thriftTableInfo = schemaConverter + .fromWrapperToExternalTableInfo(SchemaReader.inferSchema(identifier, false), + dbName, tableName) val wrapperTableInfo = - schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath) + schemaConverter + .fromExternalToWrapperTableInfo(thriftTableInfo, dbName, tableName, tablePath) + wrapperTableInfo.getFactTable.getTableProperties.put("_external", "true") + wrapperTableInfo.setUnManagedTable(true) + Some(wrapperTableInfo) + } else { + val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath) + val fileType = FileFactory.getFileType(tableMetadataFile) + if (FileFactory.isFileExist(tableMetadataFile, fileType)) { + val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile) + val wrapperTableInfo = + schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath) + Some(wrapperTableInfo) + } else { + None + } + } + + wrapperTableInfo.map { tableInfo => CarbonMetadata.getInstance().removeTable(tableUniqueName) - CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) + CarbonMetadata.getInstance().loadTableMetadata(tableInfo) val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) metadata.carbonTables += carbonTable - Some(carbonTable) - } else { - None + carbonTable } } @@ -448,6 +473,34 @@ class CarbonFileMetastore extends CarbonMetaStore { val tableIdentifier = TableIdentifier(tableName, Option(dbName)) sparkSession.sessionState.catalog.refreshTable(tableIdentifier) DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier) + } else { + if (isUnmanagedCarbonTable(absoluteTableIdentifier, sparkSession)) { + removeTableFromMetadata(dbName, tableName) + CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession) + // discard cached table info in cachedDataSourceTables + val tableIdentifier = TableIdentifier(tableName, Option(dbName)) + sparkSession.sessionState.catalog.refreshTable(tableIdentifier) + DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier) + } + } + } + + + def isUnmanagedCarbonTable(identifier: AbsoluteTableIdentifier, + sparkSession: SparkSession): Boolean = { + if (sparkSession.sessionState.catalog.listTables(identifier.getDatabaseName) + .exists(_.table.equalsIgnoreCase(identifier.getTableName))) { + + val table = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog] + .getCarbonEnv().carbonMetastore + .getTableFromMetadataCache(identifier.getDatabaseName, identifier.getTableName) + + table match { + case null => false + case _ => table.get.getTableInfo.isUnManagedTable + } + } else { + false } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala index 7b464f2..33eac61 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala @@ -258,6 +258,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, } // validate tblProperties val bucketFields = parser.getBucketFields(tableProperties, fields, options) + var unManagedTable : Boolean = false val tableInfo = if (external) { // read table info from schema file in the provided table path @@ -267,9 +268,13 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, tableIdentifier.table) val table = try { val schemaPath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath) - if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath)) && - provider.equalsIgnoreCase("'carbonfile'")) { - SchemaReader.inferSchema(identifier) + if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) { + if (provider.equalsIgnoreCase("'carbonfile'")) { + SchemaReader.inferSchema(identifier, true) + } else { + unManagedTable = true + SchemaReader.inferSchema(identifier, false) + } } else { SchemaReader.getTableInfo(identifier) @@ -302,6 +307,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, tableComment) TableNewProcessor(tableModel) } + tableInfo.setUnManagedTable(unManagedTable) selectQuery match { case query@Some(q) => CarbonCreateTableAsSelectCommand( @@ -313,7 +319,8 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, CarbonCreateTableCommand( tableInfo = tableInfo, ifNotExistsSet = ifNotExists, - tableLocation = tablePath) + tableLocation = tablePath, + external) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java index c21c57f..829c17e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java @@ -110,6 +110,8 @@ public class CarbonDataLoadConfiguration { private SortColumnRangeInfo sortColumnRangeInfo; + private boolean carbonUnmanagedTable; + /** * Flder path to where data should be written for this load. */ @@ -377,4 +379,12 @@ public class CarbonDataLoadConfiguration { public void setSortColumnRangeInfo(SortColumnRangeInfo sortColumnRangeInfo) { this.sortColumnRangeInfo = sortColumnRangeInfo; } + + public boolean isCarbonUnmanagedTable() { + return carbonUnmanagedTable; + } + + public void setCarbonUnmanagedTable(boolean carbonUnmanagedTable) { + this.carbonUnmanagedTable = carbonUnmanagedTable; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java index 21a4b3e..6d3f596 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java @@ -185,6 +185,7 @@ public final class DataLoadProcessBuilder { CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable(); AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier(); configuration.setTableIdentifier(identifier); + configuration.setCarbonUnmanagedTable(loadModel.isCarbonUnmanagedTable()); configuration.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime()); configuration.setHeader(loadModel.getCsvHeaderColumns()); configuration.setSegmentId(loadModel.getSegmentId()); @@ -214,6 +215,7 @@ public final class DataLoadProcessBuilder { loadModel.getGlobalSortPartitions()); configuration.setDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, loadModel.getBadRecordsLocation()); + CarbonMetadata.getInstance().addCarbonTable(carbonTable); List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(carbonTable.getTableName()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java index ebf3cf1..fd39563 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java @@ -47,6 +47,13 @@ public class CarbonLoadModel implements Serializable { private String tablePath; + /* + This points if the carbonTable is a Unmanaged Table or not. + The path will be pointed by the tablePath. And there will be + no Metadata folder present for the unmanaged Table. + */ + private boolean carbonUnmanagedTable; + private String csvHeader; private String[] csvHeaderColumns; private String csvDelimiter; @@ -410,6 +417,7 @@ public class CarbonLoadModel implements Serializable { copy.defaultTimestampFormat = defaultTimestampFormat; copy.maxColumns = maxColumns; copy.tablePath = tablePath; + copy.carbonUnmanagedTable = carbonUnmanagedTable; copy.useOnePass = useOnePass; copy.dictionaryServerHost = dictionaryServerHost; copy.dictionaryServerPort = dictionaryServerPort; @@ -463,6 +471,7 @@ public class CarbonLoadModel implements Serializable { copyObj.defaultTimestampFormat = defaultTimestampFormat; copyObj.maxColumns = maxColumns; copyObj.tablePath = tablePath; + copyObj.carbonUnmanagedTable = carbonUnmanagedTable; copyObj.useOnePass = useOnePass; copyObj.dictionaryServerHost = dictionaryServerHost; copyObj.dictionaryServerPort = dictionaryServerPort; @@ -825,4 +834,12 @@ public class CarbonLoadModel implements Serializable { LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metadataPath); setLoadMetadataDetails(Arrays.asList(details)); } + + public boolean isCarbonUnmanagedTable() { + return carbonUnmanagedTable; + } + + public void setCarbonUnmanagedTable(boolean carbonUnmanagedTable) { + this.carbonUnmanagedTable = carbonUnmanagedTable; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java index 8c005c3..8eb5ed1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java @@ -60,7 +60,7 @@ public class CarbonLoadModelBuilder { * @return a new CarbonLoadModel instance */ public CarbonLoadModel build( - Map<String, String> options) throws InvalidLoadOptionException, IOException { + Map<String, String> options, long UUID) throws InvalidLoadOptionException, IOException { Map<String, String> optionsFinal = LoadOption.fillOptionWithDefaultValue(options); if (!options.containsKey("fileheader")) { @@ -72,10 +72,13 @@ public class CarbonLoadModelBuilder { optionsFinal.put("fileheader", Strings.mkString(columns, ",")); } CarbonLoadModel model = new CarbonLoadModel(); + model.setCarbonUnmanagedTable(table.isUnManagedTable()); + model.setFactTimeStamp(UUID); // we have provided 'fileheader', so it hadoopConf can be null build(options, optionsFinal, model, null); + // set default values model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT); model.setDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT); http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java index e605b9e..089b8c7 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java @@ -244,8 +244,9 @@ public class LoadOption { } } - if (!CarbonDataProcessorUtil.isHeaderValid(carbonLoadModel.getTableName(), csvColumns, - carbonLoadModel.getCarbonDataLoadSchema(), staticPartitionCols)) { + if (!carbonLoadModel.isCarbonUnmanagedTable() && !CarbonDataProcessorUtil + .isHeaderValid(carbonLoadModel.getTableName(), csvColumns, + carbonLoadModel.getCarbonDataLoadSchema(), staticPartitionCols)) { if (csvFile == null) { LOG.error("CSV header in DDL is not proper." + " Column names in schema and CSV header are not the same."); http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index 1d892e0..0625a66 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -366,8 +366,14 @@ public class CarbonFactDataHandlerModel { return paths; } AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier(); - String carbonDataDirectoryPath = CarbonTablePath - .getSegmentPath(absoluteTableIdentifier.getTablePath(), configuration.getSegmentId() + ""); + String carbonDataDirectoryPath; + if (configuration.isCarbonUnmanagedTable()) { + carbonDataDirectoryPath = absoluteTableIdentifier.getTablePath(); + } else { + carbonDataDirectoryPath = CarbonTablePath + .getSegmentPath(absoluteTableIdentifier.getTablePath(), + configuration.getSegmentId() + ""); + } CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath); return carbonDataDirectoryPath; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java index 210d516..716ec2a 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java @@ -84,8 +84,8 @@ public class CarbonReader<T> { /** * Return a new {@link CarbonReaderBuilder} instance */ - public static CarbonReaderBuilder builder(String tablePath) { - return new CarbonReaderBuilder(tablePath); + public static CarbonReaderBuilder builder(String tablePath, String tableName) { + return new CarbonReaderBuilder(tablePath, tableName); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java index 894b973..7f00b49 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java @@ -45,9 +45,11 @@ public class CarbonReaderBuilder { private String tablePath; private String[] projectionColumns; private Expression filterExpression; + private String tableName; - CarbonReaderBuilder(String tablePath) { + CarbonReaderBuilder(String tablePath, String tableName) { this.tablePath = tablePath; + this.tableName = tableName; } public CarbonReaderBuilder projection(String[] projectionColumnNames) { @@ -63,7 +65,7 @@ public class CarbonReaderBuilder { } public <T> CarbonReader<T> build() throws IOException, InterruptedException { - CarbonTable table = CarbonTable.buildFromTablePath("_temp", tablePath); + CarbonTable table = CarbonTable.buildFromTablePath(tableName, tablePath); final CarbonFileInputFormat format = new CarbonFileInputFormat(); final Job job = new Job(new Configuration()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index 5be60c4..f70e165 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -55,6 +55,8 @@ public class CarbonWriterBuilder { private boolean persistSchemaFile; private int blockletSize; private int blockSize; + private boolean isUnManagedTable; + private long UUID; public CarbonWriterBuilder withSchema(Schema schema) { Objects.requireNonNull(schema, "schema should not be null"); @@ -82,9 +84,21 @@ public class CarbonWriterBuilder { return this; } + public CarbonWriterBuilder unManagedTable(boolean isUnManagedTable) { + Objects.requireNonNull(isUnManagedTable, "UnManaged Table should not be null"); + this.isUnManagedTable = isUnManagedTable; + return this; + } + + public CarbonWriterBuilder uniqueIdentifier(long UUID) { + Objects.requireNonNull(UUID, "Unique Identifier should not be null"); + this.UUID = UUID; + return this; + } + public CarbonWriterBuilder withBlockSize(int blockSize) { - if (blockSize <= 0) { - throw new IllegalArgumentException("blockSize should be greater than zero"); + if (blockSize <= 0 || blockSize > 2048) { + throw new IllegalArgumentException("blockSize should be between 1 MB to 2048 MB"); } this.blockSize = blockSize; return this; @@ -129,7 +143,7 @@ public class CarbonWriterBuilder { } // build LoadModel - return buildLoadModel(table); + return buildLoadModel(table, UUID); } /** @@ -152,8 +166,15 @@ public class CarbonWriterBuilder { new StructField(field.getFieldName(), field.getDataType()), sortColumnsList.contains(field.getFieldName())); } - String tableName = "_tempTable"; - String dbName = "_tempDB"; + String tableName; + String dbName; + if (!isUnManagedTable) { + tableName = "_tempTable"; + dbName = "_tempDB"; + } else { + dbName = null; + tableName = null; + } TableSchema schema = tableSchemaBuilder.build(); schema.setTableName(tableName); CarbonTable table = CarbonTable.builder() @@ -161,6 +182,7 @@ public class CarbonWriterBuilder { .databaseName(dbName) .tablePath(path) .tableSchema(schema) + .isUnManagedTable(isUnManagedTable) .build(); return table; } @@ -198,13 +220,13 @@ public class CarbonWriterBuilder { /** * Build a {@link CarbonLoadModel} */ - private CarbonLoadModel buildLoadModel(CarbonTable table) + private CarbonLoadModel buildLoadModel(CarbonTable table, long UUID) throws InvalidLoadOptionException, IOException { Map<String, String> options = new HashMap<>(); if (sortColumns != null) { options.put("sort_columns", Strings.mkString(sortColumns, ",")); } CarbonLoadModelBuilder builder = new CarbonLoadModelBuilder(table); - return builder.build(options); + return builder.build(options, UUID); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVUnManagedCarbonWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVUnManagedCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVUnManagedCarbonWriterTest.java new file mode 100644 index 0000000..4bcdfff --- /dev/null +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVUnManagedCarbonWriterTest.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.File; +import java.io.FileFilter; +import java.io.FilenameFilter; +import java.io.IOException; + +import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import org.apache.commons.io.FileUtils; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test suite for {@link CSVCarbonWriter} + */ +public class CSVUnManagedCarbonWriterTest { + + @Test + public void testWriteFiles() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + writeFilesAndVerify(new Schema(fields), path); + + FileUtils.deleteDirectory(new File(path)); + } + + @Test + public void testWriteFilesJsonSchema() throws IOException { + String path = "./testWriteFilesJsonSchema"; + FileUtils.deleteDirectory(new File(path)); + + String schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString(); + + writeFilesAndVerify(Schema.parseJson(schema), path); + + FileUtils.deleteDirectory(new File(path)); + } + + private void writeFilesAndVerify(Schema schema, String path) { + writeFilesAndVerify(schema, path, null); + } + + private void writeFilesAndVerify(Schema schema, String path, String[] sortColumns) { + writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1); + } + + private void writeFilesAndVerify(Schema schema, String path, boolean persistSchema) { + writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1); + } + + /** + * Invoke CarbonWriter API to write carbon files and assert the file is rewritten + * @param rows number of rows to write + * @param schema schema of the file + * @param path local write path + * @param sortColumns sort columns + * @param persistSchema true if want to persist schema file + * @param blockletSize blockletSize in the file, -1 for default size + * @param blockSize blockSize in the file, -1 for default size + */ + private void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns, + boolean persistSchema, int blockletSize, int blockSize) { + try { + CarbonWriterBuilder builder = CarbonWriter.builder() + .withSchema(schema) + .unManagedTable(true) + .uniqueIdentifier(System.currentTimeMillis()) + .outputPath(path); + if (sortColumns != null) { + builder = builder.sortBy(sortColumns); + } + if (persistSchema) { + builder = builder.persistSchemaFile(true); + } + if (blockletSize != -1) { + builder = builder.withBlockletSize(blockletSize); + } + if (blockSize != -1) { + builder = builder.withBlockSize(blockSize); + } + + CarbonWriter writer = builder.buildWriterForCSVInput(); + + for (int i = 0; i < rows; i++) { + writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)}); + } + writer.close(); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } catch (InvalidLoadOptionException l) { + l.printStackTrace(); + Assert.fail(l.getMessage()); + } + + File segmentFolder = new File(path); + Assert.assertTrue(segmentFolder.exists()); + + File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + @Override public boolean accept(File pathname) { + return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); + } + }); + Assert.assertNotNull(dataFiles); + Assert.assertTrue(dataFiles.length > 0); + } + + + @Test + public void testAllPrimitiveDataType() throws IOException { + // TODO: write all data type and read by CarbonRecordReader to verify the content + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[9]; + fields[0] = new Field("stringField", DataTypes.STRING); + fields[1] = new Field("intField", DataTypes.INT); + fields[2] = new Field("shortField", DataTypes.SHORT); + fields[3] = new Field("longField", DataTypes.LONG); + fields[4] = new Field("doubleField", DataTypes.DOUBLE); + fields[5] = new Field("boolField", DataTypes.BOOLEAN); + fields[6] = new Field("dateField", DataTypes.DATE); + fields[7] = new Field("timeField", DataTypes.TIMESTAMP); + fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2)); + + try { + CarbonWriterBuilder builder = CarbonWriter.builder() + .withSchema(new Schema(fields)) + .uniqueIdentifier(System.currentTimeMillis()) + .unManagedTable(true) + .outputPath(path); + + CarbonWriter writer = builder.buildWriterForCSVInput(); + + for (int i = 0; i < 100; i++) { + String[] row = new String[]{ + "robot" + (i % 10), + String.valueOf(i), + String.valueOf(i), + String.valueOf(Long.MAX_VALUE - i), + String.valueOf((double) i / 2), + String.valueOf(true), + "2019-03-02", + "2019-02-12 03:03:34" + }; + writer.write(row); + } + writer.close(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + + File segmentFolder = new File(path); + Assert.assertTrue(segmentFolder.exists()); + + File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + @Override public boolean accept(File pathname) { + return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); + } + }); + Assert.assertNotNull(dataFiles); + Assert.assertTrue(dataFiles.length > 0); + + FileUtils.deleteDirectory(new File(path)); + } + + @Test + public void test2Blocklet() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 1, 100); + + // TODO: implement reader to verify the number of blocklet in the file + + FileUtils.deleteDirectory(new File(path)); + } + + @Test + public void test2Block() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 2, 2); + + File segmentFolder = new File(path); + File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + @Override public boolean accept(File pathname) { + return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); + } + }); + Assert.assertNotNull(dataFiles); + Assert.assertEquals(2, dataFiles.length); + + FileUtils.deleteDirectory(new File(path)); + } + + @Test + public void testSortColumns() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + writeFilesAndVerify(new Schema(fields), path, new String[]{"name"}); + + // TODO: implement reader and verify the data is sorted + + FileUtils.deleteDirectory(new File(path)); + } + + @Test + public void testPartitionOutput() { + // TODO: test write data with partition + } + + @Test + public void testSchemaPersistence() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + writeFilesAndVerify(new Schema(fields), path, true); + + String schemaFile = CarbonTablePath.getSchemaFilePath(path); + Assert.assertTrue(new File(schemaFile).exists()); + + FileUtils.deleteDirectory(new File(path)); + } + +}
