Repository: carbondata Updated Branches: refs/heads/master b86ff926d -> b7b8073d6
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 3f86ca4..29d91d6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -153,9 +153,7 @@ case class CarbonLoadDataCommand( } else { null } - if (table.getTableInfo.isUnManagedTable) { - throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table") - } + // get the value of 'spark.executor.cores' from spark conf, default value is 1 val sparkExecutorCores = sparkSession.sparkContext.conf.get("spark.executor.cores", "1") // get the value of 'carbon.number.of.cores.while.loading' from carbon properties, @@ -192,6 +190,7 @@ case class CarbonLoadDataCommand( FileUtils.getPaths(factPathFromUser, hadoopConf) } carbonLoadModel.setFactFilePath(factPath) + carbonLoadModel.setCarbonTransactionalTable(table.getTableInfo.isTransactionalTable) carbonLoadModel.setAggLoadRequest( internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean) carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", "")) @@ -267,10 +266,12 @@ case class CarbonLoadDataCommand( carbonLoadModel.setUseOnePass(false) } // Create table and metadata folders if not exist - val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath) - val fileType = FileFactory.getFileType(metadataDirectoryPath) - if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) { - FileFactory.mkdirs(metadataDirectoryPath, fileType) + if (carbonLoadModel.isCarbonTransactionalTable) { + val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath) + val fileType = FileFactory.getFileType(metadataDirectoryPath) + if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) { + FileFactory.mkdirs(metadataDirectoryPath, fileType) + } } val partitionStatus = SegmentStatus.SUCCESS val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala index cd5d8f9..0fb05e0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala @@ -55,8 +55,8 @@ case class CarbonShowLoadsCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { Checker.validateTableExists(databaseNameOp, tableName, sparkSession) val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) - if (carbonTable.getTableInfo.isUnManagedTable) { - throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table") + if (!carbonTable.getTableInfo.isTransactionalTable) { + throw new MalformedCarbonCommandException("Unsupported operation on non transactional table") } CarbonStore.showSegments( limit, http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala index 1b087bd..225237b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala @@ -44,8 +44,8 @@ private[sql] case class CarbonProjectForDeleteCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) - if (carbonTable.getTableInfo.isUnManagedTable) { - throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table") + if (!carbonTable.getTableInfo.isTransactionalTable) { + throw new MalformedCarbonCommandException("Unsupported operation on non transactional table") } if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala index 24ac80c..d8379a7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala @@ -57,8 +57,8 @@ private[sql] case class CarbonProjectForUpdateCommand( return Seq.empty } val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) - if (carbonTable.getTableInfo.isUnManagedTable) { - throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table") + if (!carbonTable.getTableInfo.isTransactionalTable) { + throw new MalformedCarbonCommandException("Unsupported operation on non transactional table") } if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) { throw new ConcurrentOperationException(carbonTable, "loading", "data update") http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala index 59ec71c..807c925 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala @@ -123,6 +123,7 @@ case class CarbonAlterTableAddHivePartitionCommand( "Schema of index files located in location is not matching with current table schema") } val loadModel = new CarbonLoadModel + loadModel.setCarbonTransactionalTable(true) loadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(table)) // Create new entry in tablestatus file CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, false) http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala index 756bc97..25c0559 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala @@ -138,6 +138,7 @@ case class CarbonAlterTableDropPartitionCommand( // Need to fill dimension relation carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) carbonLoadModel.setTableName(table.getTableName) + carbonLoadModel.setCarbonTransactionalTable(table.isTransactionalTable) carbonLoadModel.setDatabaseName(table.getDatabaseName) carbonLoadModel.setTablePath(table.getTablePath) val loadStartTime = CarbonUpdateUtil.readCurrentTime http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala index 929de0a..f4b6de0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala @@ -149,6 +149,7 @@ case class CarbonAlterTableSplitPartitionCommand( carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) carbonLoadModel.setTableName(table.getTableName) carbonLoadModel.setDatabaseName(table.getDatabaseName) + carbonLoadModel.setCarbonTransactionalTable(table.isTransactionalTable) carbonLoadModel.setTablePath(tablePath) val loadStartTime = CarbonUpdateUtil.readCurrentTime carbonLoadModel.setFactTimeStamp(loadStartTime) http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala index af52d6b..dfcd12b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala @@ -77,8 +77,8 @@ private[sql] case class CarbonAlterTableRenameCommand( var oldCarbonTable: CarbonTable = null oldCarbonTable = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession) .asInstanceOf[CarbonRelation].carbonTable - if (oldCarbonTable.getTableInfo.isUnManagedTable) { - throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table") + if (!oldCarbonTable.getTableInfo.isTransactionalTable) { + throw new MalformedCarbonCommandException("Unsupported operation on non transactional table") } val locksToBeAcquired = List(LockUsage.METADATA_LOCK, http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala index 6266c53..16e99b5 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala @@ -90,7 +90,7 @@ case class CarbonCreateTableCommand( OperationListenerBus.getInstance.fireEvent(createTablePreExecutionEvent, operationContext) val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tableIdentifier) - val isUnmanaged = tableInfo.isUnManagedTable + val isTransactionalTable = tableInfo.isTransactionalTable if (createDSTable) { try { val tablePath = tableIdentifier.getTablePath @@ -133,7 +133,7 @@ case class CarbonCreateTableCommand( | tablePath "$tablePath", | path "$tablePath", | isExternal "$isExternal", - | isUnManaged "$isUnmanaged", + | isTransactional "$isTransactionalTable", | isVisible "$isVisible" | $carbonSchemaString) | $partitionString http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala index 53e1ed4..61df9b1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala @@ -50,15 +50,22 @@ case class CarbonDropTableCommand( override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK) + + val identifier = CarbonEnv.getIdentifier(databaseNameOp, tableName)(sparkSession) val dbName = identifier.getCarbonTableIdentifier.getDatabaseName val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer() try { + carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) + val locksToBeAcquired: List[String] = if (carbonTable.isTransactionalTable) { + List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK) + } else { + List.empty + } locksToBeAcquired foreach { lock => carbonLocks += CarbonLockUtil.getLockObject(identifier, lock) } - carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) + if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) { throw new ConcurrentOperationException(carbonTable, "loading", "drop table") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index c58d02d..26d5330 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -115,8 +115,9 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { if (carbonTable != null && carbonTable.isFileLevelFormat) { throw new MalformedCarbonCommandException( "Unsupported alter operation on Carbon external fileformat table") - } else if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) { - throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table") + } else if (carbonTable != null && !carbonTable.getTableInfo.isTransactionalTable) { + throw new MalformedCarbonCommandException( + "Unsupported operation on non transactional table") } else { ExecutedCommandExec(dataTypeChange) :: Nil } @@ -133,8 +134,9 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { if (carbonTable != null && carbonTable.isFileLevelFormat) { throw new MalformedCarbonCommandException( "Unsupported alter operation on Carbon external fileformat table") - } else if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) { - throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table") + } else if (carbonTable != null && !carbonTable.getTableInfo.isTransactionalTable) { + throw new MalformedCarbonCommandException( + "Unsupported operation on non transactional table") } else { ExecutedCommandExec(addColumn) :: Nil } @@ -151,8 +153,9 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { if (carbonTable != null && carbonTable.isFileLevelFormat) { throw new MalformedCarbonCommandException( "Unsupported alter operation on Carbon external fileformat table") - } else if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) { - throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table") + } else if (carbonTable != null && !carbonTable.getTableInfo.isTransactionalTable) { + throw new MalformedCarbonCommandException( + "Unsupported operation on non transactional table") } else { ExecutedCommandExec(dropColumn) :: Nil } @@ -185,8 +188,9 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { if (isCarbonTable) { val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore .lookupRelation(t)(sparkSession).asInstanceOf[CarbonRelation].carbonTable - if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) { - throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table") + if (carbonTable != null && !carbonTable.getTableInfo.isTransactionalTable) { + throw new MalformedCarbonCommandException( + "Unsupported operation on non transactional table") } if (!carbonTable.isHivePartitionTable) { ExecutedCommandExec(CarbonShowCarbonPartitionsCommand(t)) :: Nil @@ -238,8 +242,9 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { // if the table has 'preaggregate' DataMap, it doesn't support streaming now val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore .lookupRelation(tableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable - if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) { - throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table") + if (carbonTable != null && !carbonTable.getTableInfo.isTransactionalTable) { + throw new MalformedCarbonCommandException( + "Unsupported operation on non transactional table") } // TODO remove this limitation later http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index 36b6d96..c61471a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -104,7 +104,7 @@ class CarbonFileMetastore extends CarbonMetaStore { CarbonRelation(database, tableName, CarbonSparkUtil.createSparkMeta(t), t) case None => readCarbonSchema(absIdentifier, - parameters.getOrElse("isUnManaged", "false").toBoolean) match { + !parameters.getOrElse("isTransactional", "true").toBoolean) match { case Some(meta) => CarbonRelation(database, tableName, CarbonSparkUtil.createSparkMeta(meta), meta) @@ -230,7 +230,7 @@ class CarbonFileMetastore extends CarbonMetaStore { schemaConverter .fromExternalToWrapperTableInfo(thriftTableInfo, dbName, tableName, tablePath) wrapperTableInfo.getFactTable.getTableProperties.put("_external", "true") - wrapperTableInfo.setUnManagedTable(true) + wrapperTableInfo.setTransactionalTable(false) Some(wrapperTableInfo) } else { val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath) @@ -474,7 +474,7 @@ class CarbonFileMetastore extends CarbonMetaStore { sparkSession.sessionState.catalog.refreshTable(tableIdentifier) DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier) } else { - if (isUnmanagedCarbonTable(absoluteTableIdentifier)) { + if (!isTransactionalCarbonTable(absoluteTableIdentifier)) { removeTableFromMetadata(dbName, tableName) CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession) // discard cached table info in cachedDataSourceTables @@ -486,9 +486,9 @@ class CarbonFileMetastore extends CarbonMetaStore { } - def isUnmanagedCarbonTable(identifier: AbsoluteTableIdentifier): Boolean = { - val table = getTableFromMetadataCache(identifier.getDatabaseName, identifier.getTableName); - table.map(_.getTableInfo.isUnManagedTable).getOrElse(false) + def isTransactionalCarbonTable(identifier: AbsoluteTableIdentifier): Boolean = { + val table = getTableFromMetadataCache(identifier.getDatabaseName, identifier.getTableName) + table.map(_.getTableInfo.isTransactionalTable).getOrElse(true) } private def getTimestampFileAndType() = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala index 33eac61..aacbdd0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala @@ -258,7 +258,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, } // validate tblProperties val bucketFields = parser.getBucketFields(tableProperties, fields, options) - var unManagedTable : Boolean = false + var isTransactionalTable : Boolean = true val tableInfo = if (external) { // read table info from schema file in the provided table path @@ -272,7 +272,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, if (provider.equalsIgnoreCase("'carbonfile'")) { SchemaReader.inferSchema(identifier, true) } else { - unManagedTable = true + isTransactionalTable = false SchemaReader.inferSchema(identifier, false) } } @@ -307,7 +307,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, tableComment) TableNewProcessor(tableModel) } - tableInfo.setUnManagedTable(unManagedTable) + tableInfo.setTransactionalTable(isTransactionalTable) selectQuery match { case query@Some(q) => CarbonCreateTableAsSelectCommand( http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala index 43d8c03..d98229a 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala @@ -158,6 +158,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft val carbonSchema = new CarbonDataLoadSchema(table) carbonLoadModel.setDatabaseName(table.getDatabaseName) carbonLoadModel.setTableName(table.getTableName) + carbonLoadModel.setCarbonTransactionalTable(table.isTransactionalTable) carbonLoadModel.setTablePath(relation.carbonTable.getTablePath) carbonLoadModel.setCarbonDataLoadSchema(carbonSchema) carbonLoadModel.setFactFilePath(filePath) http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java index 829c17e..ad1c84c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java @@ -110,7 +110,7 @@ public class CarbonDataLoadConfiguration { private SortColumnRangeInfo sortColumnRangeInfo; - private boolean carbonUnmanagedTable; + private boolean carbonTransactionalTable; /** * Flder path to where data should be written for this load. @@ -380,11 +380,11 @@ public class CarbonDataLoadConfiguration { this.sortColumnRangeInfo = sortColumnRangeInfo; } - public boolean isCarbonUnmanagedTable() { - return carbonUnmanagedTable; + public boolean isCarbonTransactionalTable() { + return carbonTransactionalTable; } - public void setCarbonUnmanagedTable(boolean carbonUnmanagedTable) { - this.carbonUnmanagedTable = carbonUnmanagedTable; + public void setCarbonTransactionalTable(boolean carbonTransactionalTable) { + this.carbonTransactionalTable = carbonTransactionalTable; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java index 6d3f596..9c1d113 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java @@ -185,7 +185,7 @@ public final class DataLoadProcessBuilder { CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable(); AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier(); configuration.setTableIdentifier(identifier); - configuration.setCarbonUnmanagedTable(loadModel.isCarbonUnmanagedTable()); + configuration.setCarbonTransactionalTable(loadModel.isCarbonTransactionalTable()); configuration.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime()); configuration.setHeader(loadModel.getCsvHeaderColumns()); configuration.setSegmentId(loadModel.getSegmentId()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java index fd39563..2b820d8 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java @@ -48,11 +48,11 @@ public class CarbonLoadModel implements Serializable { private String tablePath; /* - This points if the carbonTable is a Unmanaged Table or not. + This points if the carbonTable is a Non Transactional Table or not. The path will be pointed by the tablePath. And there will be - no Metadata folder present for the unmanaged Table. + no Metadata folder present for the Non Transactional Table. */ - private boolean carbonUnmanagedTable; + private boolean carbonTransactionalTable = true; private String csvHeader; private String[] csvHeaderColumns; @@ -417,7 +417,7 @@ public class CarbonLoadModel implements Serializable { copy.defaultTimestampFormat = defaultTimestampFormat; copy.maxColumns = maxColumns; copy.tablePath = tablePath; - copy.carbonUnmanagedTable = carbonUnmanagedTable; + copy.carbonTransactionalTable = carbonTransactionalTable; copy.useOnePass = useOnePass; copy.dictionaryServerHost = dictionaryServerHost; copy.dictionaryServerPort = dictionaryServerPort; @@ -471,7 +471,7 @@ public class CarbonLoadModel implements Serializable { copyObj.defaultTimestampFormat = defaultTimestampFormat; copyObj.maxColumns = maxColumns; copyObj.tablePath = tablePath; - copyObj.carbonUnmanagedTable = carbonUnmanagedTable; + copyObj.carbonTransactionalTable = carbonTransactionalTable; copyObj.useOnePass = useOnePass; copyObj.dictionaryServerHost = dictionaryServerHost; copyObj.dictionaryServerPort = dictionaryServerPort; @@ -835,11 +835,11 @@ public class CarbonLoadModel implements Serializable { setLoadMetadataDetails(Arrays.asList(details)); } - public boolean isCarbonUnmanagedTable() { - return carbonUnmanagedTable; + public boolean isCarbonTransactionalTable() { + return carbonTransactionalTable; } - public void setCarbonUnmanagedTable(boolean carbonUnmanagedTable) { - this.carbonUnmanagedTable = carbonUnmanagedTable; + public void setCarbonTransactionalTable(boolean carbonTransactionalTable) { + this.carbonTransactionalTable = carbonTransactionalTable; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java index 8eb5ed1..3385479 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java @@ -57,10 +57,11 @@ public class CarbonLoadModelBuilder { /** * build CarbonLoadModel for data loading * @param options Load options from user input + * @param taskNo * @return a new CarbonLoadModel instance */ - public CarbonLoadModel build( - Map<String, String> options, long UUID) throws InvalidLoadOptionException, IOException { + public CarbonLoadModel build(Map<String, String> options, long UUID, String taskNo) + throws InvalidLoadOptionException, IOException { Map<String, String> optionsFinal = LoadOption.fillOptionWithDefaultValue(options); if (!options.containsKey("fileheader")) { @@ -72,8 +73,9 @@ public class CarbonLoadModelBuilder { optionsFinal.put("fileheader", Strings.mkString(columns, ",")); } CarbonLoadModel model = new CarbonLoadModel(); - model.setCarbonUnmanagedTable(table.isUnManagedTable()); + model.setCarbonTransactionalTable(table.isTransactionalTable()); model.setFactTimeStamp(UUID); + model.setTaskNo(taskNo); // we have provided 'fileheader', so it hadoopConf can be null build(options, optionsFinal, model, null); @@ -129,6 +131,7 @@ public class CarbonLoadModelBuilder { carbonLoadModel.setDatabaseName(table.getDatabaseName()); carbonLoadModel.setTablePath(table.getTablePath()); carbonLoadModel.setTableName(table.getTableName()); + carbonLoadModel.setCarbonTransactionalTable(table.isTransactionalTable()); CarbonDataLoadSchema dataLoadSchema = new CarbonDataLoadSchema(table); // Need to fill dimension relation carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema); http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java index 089b8c7..4ff1cce 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java @@ -244,7 +244,7 @@ public class LoadOption { } } - if (!carbonLoadModel.isCarbonUnmanagedTable() && !CarbonDataProcessorUtil + if (carbonLoadModel.isCarbonTransactionalTable() && !CarbonDataProcessorUtil .isHeaderValid(carbonLoadModel.getTableName(), csvColumns, carbonLoadModel.getCarbonDataLoadSchema(), staticPartitionCols)) { if (csvFile == null) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index 0625a66..aaf20c7 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -367,7 +367,7 @@ public class CarbonFactDataHandlerModel { } AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier(); String carbonDataDirectoryPath; - if (configuration.isCarbonUnmanagedTable()) { + if (!configuration.isCarbonTransactionalTable()) { carbonDataDirectoryPath = absoluteTableIdentifier.getTablePath(); } else { carbonDataDirectoryPath = CarbonTablePath http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index 2b4748f..2b295d6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -159,6 +159,30 @@ public final class CarbonLoaderUtil { } /** + * This API deletes the content of the non Transactional Tables when insert overwrite is set true. + * + * @param loadModel + * @throws IOException + */ + public static void deleteNonTransactionalTableForInsertOverwrite(final CarbonLoadModel loadModel) + throws IOException { + // We need to delete the content of the Table Path Folder except the + // Newly added file. + List<String> filesToBeDeleted = new ArrayList<>(); + CarbonFile carbonFile = FileFactory.getCarbonFile(loadModel.getTablePath()); + CarbonFile[] filteredList = carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return !file.getName().contains(loadModel.getFactTimeStamp() + ""); + } + }); + for (CarbonFile file : filteredList) { + filesToBeDeleted.add(file.getAbsolutePath()); + } + + deleteFiles(filesToBeDeleted); + } + + /** * This API will write the load level metadata for the loadmanagement module inorder to * manage the load and query execution management smoothly. * @@ -169,8 +193,13 @@ public final class CarbonLoaderUtil { * @throws IOException */ public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry, - CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid) + final CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid) throws IOException { + // For Non Transactional tables no need to update the the Table Status file. + if (!loadModel.isCarbonTransactionalTable()) { + return true; + } + return recordNewLoadMetadata(newMetaEntry, loadModel, loadStartEntry, insertOverwrite, uuid, new ArrayList<Segment>(), new ArrayList<Segment>()); } @@ -191,10 +220,12 @@ public final class CarbonLoaderUtil { boolean status = false; AbsoluteTableIdentifier identifier = loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); - String metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath()); - FileType fileType = FileFactory.getFileType(metadataPath); - if (!FileFactory.isFileExist(metadataPath, fileType)) { - FileFactory.mkdirs(metadataPath, fileType); + if (loadModel.isCarbonTransactionalTable()) { + String metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath()); + FileType fileType = FileFactory.getFileType(metadataPath); + if (!FileFactory.isFileExist(metadataPath, fileType)) { + FileFactory.mkdirs(metadataPath, fileType); + } } String tableStatusPath; if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() && !uuid.isEmpty()) { @@ -432,6 +463,10 @@ public final class CarbonLoaderUtil { } CarbonLoaderUtil .populateNewLoadMetaEntry(newLoadMetaEntry, status, model.getFactTimeStamp(), false); + + if (!model.isCarbonTransactionalTable() && insertOverwrite) { + CarbonLoaderUtil.deleteNonTransactionalTableForInsertOverwrite(model); + } boolean entryAdded = CarbonLoaderUtil .recordNewLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite, uuid); if (!entryAdded) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java index aae6f03..16d4d53 100644 --- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java +++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java @@ -136,6 +136,7 @@ public class StoreCreator { loadModel.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName()); loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName()); loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName()); + loadModel.setCarbonTransactionalTable(true); loadModel.setFactFilePath(factFilePath); loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>()); loadModel.setTablePath(identifier.getTablePath()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index 4e09553..de1e5be 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -55,8 +55,9 @@ public class CarbonWriterBuilder { private boolean persistSchemaFile; private int blockletSize; private int blockSize; - private boolean isUnManagedTable; + private boolean isTransactionalTable; private long UUID; + private String taskNo; /** * prepares the builder with the schema provided @@ -91,6 +92,19 @@ public class CarbonWriterBuilder { } /** + * sets the taskNo for the writer. SDKs concurrently running + * will set taskNo in order to avoid conflits in file write. + * @param taskNo is the TaskNo user wants to specify. Mostly it system time. + * @return updated CarbonWriterBuilder + */ + public CarbonWriterBuilder taskNo(String taskNo) { + this.taskNo = taskNo; + return this; + } + + + + /** * If set, create a schema file in metadata folder. * @param persist is a boolean value, If set, create a schema file in metadata folder * @return updated CarbonWriterBuilder @@ -101,14 +115,14 @@ public class CarbonWriterBuilder { } /** - * If set true, writes the carbondata and carbonindex files in a flat folder structure - * @param isUnManagedTable is a boolelan value if set writes + * If set false, writes the carbondata and carbonindex files in a flat folder structure + * @param isTransactionalTable is a boolelan value if set to false then writes * the carbondata and carbonindex files in a flat folder structure * @return updated CarbonWriterBuilder */ - public CarbonWriterBuilder unManagedTable(boolean isUnManagedTable) { - Objects.requireNonNull(isUnManagedTable, "UnManaged Table should not be null"); - this.isUnManagedTable = isUnManagedTable; + public CarbonWriterBuilder isTransactionalTable(boolean isTransactionalTable) { + Objects.requireNonNull(isTransactionalTable, "Transactional Table should not be null"); + this.isTransactionalTable = isTransactionalTable; return this; } @@ -180,7 +194,7 @@ public class CarbonWriterBuilder { } // build LoadModel - return buildLoadModel(table, UUID); + return buildLoadModel(table, UUID, taskNo); } /** @@ -209,7 +223,7 @@ public class CarbonWriterBuilder { } String tableName; String dbName; - if (!isUnManagedTable) { + if (isTransactionalTable) { tableName = "_tempTable"; dbName = "_tempDB"; } else { @@ -223,7 +237,7 @@ public class CarbonWriterBuilder { .databaseName(dbName) .tablePath(path) .tableSchema(schema) - .isUnManagedTable(isUnManagedTable) + .isTransactionalTable(isTransactionalTable) .build(); return table; } @@ -261,13 +275,13 @@ public class CarbonWriterBuilder { /** * Build a {@link CarbonLoadModel} */ - private CarbonLoadModel buildLoadModel(CarbonTable table, long UUID) + private CarbonLoadModel buildLoadModel(CarbonTable table, long UUID, String taskNo) throws InvalidLoadOptionException, IOException { Map<String, String> options = new HashMap<>(); if (sortColumns != null) { options.put("sort_columns", Strings.mkString(sortColumns, ",")); } CarbonLoadModelBuilder builder = new CarbonLoadModelBuilder(table); - return builder.build(options, UUID); + return builder.build(options, UUID, taskNo); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java index 25c34e0..c30bd3a 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java @@ -66,6 +66,7 @@ public class AvroCarbonWriterTest { CarbonWriter writer = CarbonWriter.builder() .withSchema(new org.apache.carbondata.sdk.file.Schema(fields)) .outputPath(path) + .isTransactionalTable(true) .buildWriterForAvroInput(); for (int i = 0; i < 100; i++) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java index eecbf5f..41fde66 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java @@ -90,6 +90,7 @@ public class CSVCarbonWriterTest { try { CarbonWriterBuilder builder = CarbonWriter.builder() .withSchema(new Schema(fields)) + .isTransactionalTable(true) .outputPath(path); CarbonWriter writer = builder.buildWriterForCSVInput(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java new file mode 100644 index 0000000..32fe6e8 --- /dev/null +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; + +import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import org.apache.commons.io.FileUtils; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test suite for {@link CSVCarbonWriter} + */ +public class CSVNonTransactionalCarbonWriterTest { + + @Test + public void testWriteFiles() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + writeFilesAndVerify(new Schema(fields), path); + + FileUtils.deleteDirectory(new File(path)); + } + + @Test + public void testWriteFilesJsonSchema() throws IOException { + String path = "./testWriteFilesJsonSchema"; + FileUtils.deleteDirectory(new File(path)); + + String schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString(); + + writeFilesAndVerify(Schema.parseJson(schema), path); + + FileUtils.deleteDirectory(new File(path)); + } + + private void writeFilesAndVerify(Schema schema, String path) { + writeFilesAndVerify(schema, path, null); + } + + private void writeFilesAndVerify(Schema schema, String path, String[] sortColumns) { + writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1); + } + + private void writeFilesAndVerify(Schema schema, String path, boolean persistSchema) { + writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1); + } + + /** + * Invoke CarbonWriter API to write carbon files and assert the file is rewritten + * @param rows number of rows to write + * @param schema schema of the file + * @param path local write path + * @param sortColumns sort columns + * @param persistSchema true if want to persist schema file + * @param blockletSize blockletSize in the file, -1 for default size + * @param blockSize blockSize in the file, -1 for default size + */ + private void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns, + boolean persistSchema, int blockletSize, int blockSize) { + try { + CarbonWriterBuilder builder = CarbonWriter.builder() + .withSchema(schema) + .isTransactionalTable(false) + .uniqueIdentifier(System.currentTimeMillis()) + .taskNo(Long.toString(System.nanoTime())) + .outputPath(path); + if (sortColumns != null) { + builder = builder.sortBy(sortColumns); + } + if (persistSchema) { + builder = builder.persistSchemaFile(true); + } + if (blockletSize != -1) { + builder = builder.withBlockletSize(blockletSize); + } + if (blockSize != -1) { + builder = builder.withBlockSize(blockSize); + } + + CarbonWriter writer = builder.buildWriterForCSVInput(); + + for (int i = 0; i < rows; i++) { + writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)}); + } + writer.close(); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } catch (InvalidLoadOptionException l) { + l.printStackTrace(); + Assert.fail(l.getMessage()); + } + + File segmentFolder = new File(path); + Assert.assertTrue(segmentFolder.exists()); + + File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + @Override public boolean accept(File pathname) { + return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); + } + }); + Assert.assertNotNull(dataFiles); + Assert.assertTrue(dataFiles.length > 0); + } + + + @Test + public void testAllPrimitiveDataType() throws IOException { + // TODO: write all data type and read by CarbonRecordReader to verify the content + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[9]; + fields[0] = new Field("stringField", DataTypes.STRING); + fields[1] = new Field("intField", DataTypes.INT); + fields[2] = new Field("shortField", DataTypes.SHORT); + fields[3] = new Field("longField", DataTypes.LONG); + fields[4] = new Field("doubleField", DataTypes.DOUBLE); + fields[5] = new Field("boolField", DataTypes.BOOLEAN); + fields[6] = new Field("dateField", DataTypes.DATE); + fields[7] = new Field("timeField", DataTypes.TIMESTAMP); + fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2)); + + try { + CarbonWriterBuilder builder = CarbonWriter.builder() + .withSchema(new Schema(fields)) + .uniqueIdentifier(System.currentTimeMillis()) + .isTransactionalTable(false) + .taskNo(Long.toString(System.nanoTime())) + .outputPath(path); + + CarbonWriter writer = builder.buildWriterForCSVInput(); + + for (int i = 0; i < 100; i++) { + String[] row = new String[]{ + "robot" + (i % 10), + String.valueOf(i), + String.valueOf(i), + String.valueOf(Long.MAX_VALUE - i), + String.valueOf((double) i / 2), + String.valueOf(true), + "2019-03-02", + "2019-02-12 03:03:34" + }; + writer.write(row); + } + writer.close(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + + File segmentFolder = new File(path); + Assert.assertTrue(segmentFolder.exists()); + + File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + @Override public boolean accept(File pathname) { + return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); + } + }); + Assert.assertNotNull(dataFiles); + Assert.assertTrue(dataFiles.length > 0); + + FileUtils.deleteDirectory(new File(path)); + } + + @Test + public void test2Blocklet() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 1, 100); + + // TODO: implement reader to verify the number of blocklet in the file + + FileUtils.deleteDirectory(new File(path)); + } + + @Test + public void test2Block() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 2, 2); + + File segmentFolder = new File(path); + File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + @Override public boolean accept(File pathname) { + return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); + } + }); + Assert.assertNotNull(dataFiles); + Assert.assertEquals(2, dataFiles.length); + + FileUtils.deleteDirectory(new File(path)); + } + + @Test + public void testSortColumns() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + writeFilesAndVerify(new Schema(fields), path, new String[]{"name"}); + + // TODO: implement reader and verify the data is sorted + + FileUtils.deleteDirectory(new File(path)); + } + + @Test + public void testPartitionOutput() { + // TODO: test write data with partition + } + + @Test + public void testSchemaPersistence() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + writeFilesAndVerify(new Schema(fields), path, true); + + String schemaFile = CarbonTablePath.getSchemaFilePath(path); + Assert.assertTrue(new File(schemaFile).exists()); + + FileUtils.deleteDirectory(new File(path)); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVUnManagedCarbonWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVUnManagedCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVUnManagedCarbonWriterTest.java deleted file mode 100644 index 4bcdfff..0000000 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVUnManagedCarbonWriterTest.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.sdk.file; - -import java.io.File; -import java.io.FileFilter; -import java.io.FilenameFilter; -import java.io.IOException; - -import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.util.path.CarbonTablePath; - -import org.apache.commons.io.FileUtils; -import org.junit.Assert; -import org.junit.Test; - -/** - * Test suite for {@link CSVCarbonWriter} - */ -public class CSVUnManagedCarbonWriterTest { - - @Test - public void testWriteFiles() throws IOException { - String path = "./testWriteFiles"; - FileUtils.deleteDirectory(new File(path)); - - Field[] fields = new Field[2]; - fields[0] = new Field("name", DataTypes.STRING); - fields[1] = new Field("age", DataTypes.INT); - - writeFilesAndVerify(new Schema(fields), path); - - FileUtils.deleteDirectory(new File(path)); - } - - @Test - public void testWriteFilesJsonSchema() throws IOException { - String path = "./testWriteFilesJsonSchema"; - FileUtils.deleteDirectory(new File(path)); - - String schema = new StringBuilder() - .append("[ \n") - .append(" {\"name\":\"string\"},\n") - .append(" {\"age\":\"int\"},\n") - .append(" {\"height\":\"double\"}\n") - .append("]") - .toString(); - - writeFilesAndVerify(Schema.parseJson(schema), path); - - FileUtils.deleteDirectory(new File(path)); - } - - private void writeFilesAndVerify(Schema schema, String path) { - writeFilesAndVerify(schema, path, null); - } - - private void writeFilesAndVerify(Schema schema, String path, String[] sortColumns) { - writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1); - } - - private void writeFilesAndVerify(Schema schema, String path, boolean persistSchema) { - writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1); - } - - /** - * Invoke CarbonWriter API to write carbon files and assert the file is rewritten - * @param rows number of rows to write - * @param schema schema of the file - * @param path local write path - * @param sortColumns sort columns - * @param persistSchema true if want to persist schema file - * @param blockletSize blockletSize in the file, -1 for default size - * @param blockSize blockSize in the file, -1 for default size - */ - private void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns, - boolean persistSchema, int blockletSize, int blockSize) { - try { - CarbonWriterBuilder builder = CarbonWriter.builder() - .withSchema(schema) - .unManagedTable(true) - .uniqueIdentifier(System.currentTimeMillis()) - .outputPath(path); - if (sortColumns != null) { - builder = builder.sortBy(sortColumns); - } - if (persistSchema) { - builder = builder.persistSchemaFile(true); - } - if (blockletSize != -1) { - builder = builder.withBlockletSize(blockletSize); - } - if (blockSize != -1) { - builder = builder.withBlockSize(blockSize); - } - - CarbonWriter writer = builder.buildWriterForCSVInput(); - - for (int i = 0; i < rows; i++) { - writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)}); - } - writer.close(); - } catch (IOException e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); - } catch (InvalidLoadOptionException l) { - l.printStackTrace(); - Assert.fail(l.getMessage()); - } - - File segmentFolder = new File(path); - Assert.assertTrue(segmentFolder.exists()); - - File[] dataFiles = segmentFolder.listFiles(new FileFilter() { - @Override public boolean accept(File pathname) { - return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); - } - }); - Assert.assertNotNull(dataFiles); - Assert.assertTrue(dataFiles.length > 0); - } - - - @Test - public void testAllPrimitiveDataType() throws IOException { - // TODO: write all data type and read by CarbonRecordReader to verify the content - String path = "./testWriteFiles"; - FileUtils.deleteDirectory(new File(path)); - - Field[] fields = new Field[9]; - fields[0] = new Field("stringField", DataTypes.STRING); - fields[1] = new Field("intField", DataTypes.INT); - fields[2] = new Field("shortField", DataTypes.SHORT); - fields[3] = new Field("longField", DataTypes.LONG); - fields[4] = new Field("doubleField", DataTypes.DOUBLE); - fields[5] = new Field("boolField", DataTypes.BOOLEAN); - fields[6] = new Field("dateField", DataTypes.DATE); - fields[7] = new Field("timeField", DataTypes.TIMESTAMP); - fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2)); - - try { - CarbonWriterBuilder builder = CarbonWriter.builder() - .withSchema(new Schema(fields)) - .uniqueIdentifier(System.currentTimeMillis()) - .unManagedTable(true) - .outputPath(path); - - CarbonWriter writer = builder.buildWriterForCSVInput(); - - for (int i = 0; i < 100; i++) { - String[] row = new String[]{ - "robot" + (i % 10), - String.valueOf(i), - String.valueOf(i), - String.valueOf(Long.MAX_VALUE - i), - String.valueOf((double) i / 2), - String.valueOf(true), - "2019-03-02", - "2019-02-12 03:03:34" - }; - writer.write(row); - } - writer.close(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - - File segmentFolder = new File(path); - Assert.assertTrue(segmentFolder.exists()); - - File[] dataFiles = segmentFolder.listFiles(new FileFilter() { - @Override public boolean accept(File pathname) { - return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); - } - }); - Assert.assertNotNull(dataFiles); - Assert.assertTrue(dataFiles.length > 0); - - FileUtils.deleteDirectory(new File(path)); - } - - @Test - public void test2Blocklet() throws IOException { - String path = "./testWriteFiles"; - FileUtils.deleteDirectory(new File(path)); - - Field[] fields = new Field[2]; - fields[0] = new Field("name", DataTypes.STRING); - fields[1] = new Field("age", DataTypes.INT); - - writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 1, 100); - - // TODO: implement reader to verify the number of blocklet in the file - - FileUtils.deleteDirectory(new File(path)); - } - - @Test - public void test2Block() throws IOException { - String path = "./testWriteFiles"; - FileUtils.deleteDirectory(new File(path)); - - Field[] fields = new Field[2]; - fields[0] = new Field("name", DataTypes.STRING); - fields[1] = new Field("age", DataTypes.INT); - - writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 2, 2); - - File segmentFolder = new File(path); - File[] dataFiles = segmentFolder.listFiles(new FileFilter() { - @Override public boolean accept(File pathname) { - return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); - } - }); - Assert.assertNotNull(dataFiles); - Assert.assertEquals(2, dataFiles.length); - - FileUtils.deleteDirectory(new File(path)); - } - - @Test - public void testSortColumns() throws IOException { - String path = "./testWriteFiles"; - FileUtils.deleteDirectory(new File(path)); - - Field[] fields = new Field[2]; - fields[0] = new Field("name", DataTypes.STRING); - fields[1] = new Field("age", DataTypes.INT); - - writeFilesAndVerify(new Schema(fields), path, new String[]{"name"}); - - // TODO: implement reader and verify the data is sorted - - FileUtils.deleteDirectory(new File(path)); - } - - @Test - public void testPartitionOutput() { - // TODO: test write data with partition - } - - @Test - public void testSchemaPersistence() throws IOException { - String path = "./testWriteFiles"; - FileUtils.deleteDirectory(new File(path)); - - Field[] fields = new Field[2]; - fields[0] = new Field("name", DataTypes.STRING); - fields[1] = new Field("age", DataTypes.INT); - - writeFilesAndVerify(new Schema(fields), path, true); - - String schemaFile = CarbonTablePath.getSchemaFilePath(path); - Assert.assertTrue(new File(schemaFile).exists()); - - FileUtils.deleteDirectory(new File(path)); - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java index 068164d..03aecb8 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java @@ -56,6 +56,7 @@ public class TestUtil { try { CarbonWriterBuilder builder = CarbonWriter.builder() .withSchema(schema) + .isTransactionalTable(true) .outputPath(path); if (sortColumns != null) { builder = builder.sortBy(sortColumns);
