Repository: carbondata Updated Branches: refs/heads/master b6777fcc3 -> 5fc7f06f2
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index 153b169..07491d1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -65,7 +65,7 @@ object AlterTableUtil { sys.error(s"Table $dbName.$tableName does not exist") } // acquire the lock first - val table = relation.tableMeta.carbonTable + val table = relation.carbonTable val acquiredLocks = ListBuffer[ICarbonLock]() try { locksToBeAcquired.foreach { lock => @@ -133,7 +133,7 @@ object AlterTableUtil { thriftTable: TableInfo)(sparkSession: SparkSession, sessionState: CarbonSessionState): Unit = { val dbName = carbonTable.getDatabaseName - val tableName = carbonTable.getFactTableName + val tableName = carbonTable.getTableName CarbonEnv.getInstance(sparkSession).carbonMetastore .updateTableSchemaForAlter(carbonTable.getCarbonTableIdentifier, carbonTable.getCarbonTableIdentifier, @@ -232,10 +232,7 @@ object AlterTableUtil { def revertAddColumnChanges(dbName: String, tableName: String, timeStamp: Long) (sparkSession: SparkSession): Unit = { val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore - val carbonTable = metastore - .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta - .carbonTable - + val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath, carbonTable.getCarbonTableIdentifier) val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) @@ -262,9 +259,7 @@ object AlterTableUtil { def revertDropColumnChanges(dbName: String, tableName: String, timeStamp: Long) (sparkSession: SparkSession): Unit = { val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore - val carbonTable = metastore - .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta - .carbonTable + val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath, carbonTable.getCarbonTableIdentifier) val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) @@ -297,9 +292,7 @@ object AlterTableUtil { def revertDataTypeChanges(dbName: String, tableName: String, timeStamp: Long) (sparkSession: SparkSession): Unit = { val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore - val carbonTable = metastore - .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta - .carbonTable + val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath, carbonTable.getCarbonTableIdentifier) val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) @@ -343,30 +336,27 @@ object AlterTableUtil { val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) var locks = List.empty[ICarbonLock] var timeStamp = 0L - var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]() var carbonTable: CarbonTable = null try { locks = AlterTableUtil .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession) val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore - carbonTable = metastore - .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] - .tableMeta.carbonTable + carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) // get the latest carbon table // read the latest schema file val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath, carbonTable.getCarbonTableIdentifier) val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) val schemaConverter = new ThriftWrapperSchemaConverterImpl() - val wrapperTableInfo = schemaConverter - .fromExternalToWrapperTableInfo(thriftTableInfo, - dbName, - tableName, - carbonTable.getTablePath) + val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( + thriftTableInfo, + dbName, + tableName, + carbonTable.getTablePath) val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry schemaEvolutionEntry.setTimeStamp(timeStamp) - val thriftTable = schemaConverter - .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) + val thriftTable = schemaConverter.fromWrapperToExternalTableInfo( + wrapperTableInfo, dbName, tableName) val tblPropertiesMap: mutable.Map[String, String] = thriftTable.fact_table.getTableProperties.asScala if (set) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala index dcfbaea..c05c0f1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala @@ -18,7 +18,6 @@ package org.apache.spark.util import org.apache.spark.sql.{CarbonEnv, SparkSession} -import org.apache.spark.sql.hive.CarbonRelation import org.apache.carbondata.api.CarbonStore @@ -40,9 +39,7 @@ object CleanFiles { def cleanFiles(spark: SparkSession, dbName: String, tableName: String, storePath: String, forceTableClean: Boolean = false): Unit = { TableAPIUtil.validateTableExists(spark, dbName, tableName) - val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore. - lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation]. - tableMeta.carbonTable + val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark) CarbonStore.cleanFiles(dbName, tableName, storePath, carbonTable, forceTableClean) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala index 8375762..d682b21 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala @@ -17,7 +17,6 @@ package org.apache.spark.util import org.apache.spark.sql.{CarbonEnv, SparkSession} - import org.apache.spark.sql.hive.CarbonRelation import org.apache.carbondata.api.CarbonStore @@ -30,9 +29,7 @@ object DeleteSegmentByDate { def deleteSegmentByDate(spark: SparkSession, dbName: String, tableName: String, dateValue: String): Unit = { TableAPIUtil.validateTableExists(spark, dbName, tableName) - val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore. - lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation]. - tableMeta.carbonTable + val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark) CarbonStore.deleteLoadByDate(dateValue, dbName, tableName, carbonTable) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala index 9b87504..5b58c8d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala @@ -34,9 +34,7 @@ object DeleteSegmentById { def deleteSegmentById(spark: SparkSession, dbName: String, tableName: String, segmentIds: Seq[String]): Unit = { TableAPIUtil.validateTableExists(spark, dbName, tableName) - val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore. - lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation]. - tableMeta.carbonTable + val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark) CarbonStore.deleteLoadById(segmentIds, dbName, tableName, carbonTable) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala index 23cba20..287191c 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala @@ -245,7 +245,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { test("Alter table add partition: List Partition") { sql("""ALTER TABLE list_table_area ADD PARTITION ('OutSpace', 'Hi')""".stripMargin) val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area") - val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) + val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName) val partitionIds = partitionInfo.getPartitionIds val list_info = partitionInfo.getListInfo assert(partitionIds == List(0, 1, 2, 3, 4, 5).map(Integer.valueOf(_)).asJava) @@ -286,7 +286,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { sql("""ALTER TABLE list_table_area DROP PARTITION(2) WITH DATA""") val carbonTable2 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area") - val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getFactTableName) + val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getTableName) val partitionIds2 = partitionInfo2.getPartitionIds val list_info2 = partitionInfo2.getListInfo assert(partitionIds2 == List(0, 1, 3, 4, 5).map(Integer.valueOf(_)).asJava) @@ -304,7 +304,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { test("Alter table add partition: Range Partition") { sql("""ALTER TABLE range_table_logdate ADD PARTITION ('2017/01/01', '2018/01/01')""") val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate") - val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) + val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName) val partitionIds = partitionInfo.getPartitionIds val range_info = partitionInfo.getRangeInfo assert(partitionIds == List(0, 1, 2, 3, 4, 5).map(Integer.valueOf(_)).asJava) @@ -342,7 +342,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { sql("""ALTER TABLE range_table_logdate DROP PARTITION(3) WITH DATA;""") val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate") - val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName) + val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getTableName) val partitionIds1 = partitionInfo1.getPartitionIds val range_info1 = partitionInfo1.getRangeInfo assert(partitionIds1 == List(0, 1, 2, 4, 5).map(Integer.valueOf(_)).asJava) @@ -373,7 +373,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { test("Alter table split partition: List Partition") { sql("""ALTER TABLE list_table_country SPLIT PARTITION(4) INTO ('Canada', 'Russia', '(Good, NotGood)')""".stripMargin) val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_country") - val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) + val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName) val partitionIds = partitionInfo.getPartitionIds val list_info = partitionInfo.getListInfo assert(partitionIds == List(0, 1, 2, 3, 6, 7, 8, 5).map(Integer.valueOf(_)).asJava) @@ -415,7 +415,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { sql("""ALTER TABLE list_table_country DROP PARTITION(8)""") val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_country") - val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName) + val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getTableName) val partitionIds1 = partitionInfo1.getPartitionIds val list_info1 = partitionInfo1.getListInfo assert(partitionIds1 == List(0, 1, 2, 3, 6, 7, 5).map(Integer.valueOf(_)).asJava) @@ -438,7 +438,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { sql("""ALTER TABLE list_table_country ADD PARTITION ('(Part1, Part2, Part3, Part4)')""".stripMargin) sql("""ALTER TABLE list_table_country SPLIT PARTITION(9) INTO ('Part4', 'Part2', '(Part1, Part3)')""".stripMargin) val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_country") - val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) + val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName) val partitionIds = partitionInfo.getPartitionIds val list_info = partitionInfo.getListInfo assert(partitionIds == List(0, 1, 2, 3, 6, 7, 5, 10, 11, 12).map(Integer.valueOf(_)).asJava) @@ -485,7 +485,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { sql("""ALTER TABLE list_table_area ADD PARTITION ('(One,Two, Three, Four)')""".stripMargin) sql("""ALTER TABLE list_table_area SPLIT PARTITION(6) INTO ('One', '(Two, Three )', 'Four')""".stripMargin) val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area") - val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) + val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName) val partitionIds = partitionInfo.getPartitionIds val list_info = partitionInfo.getListInfo assert(partitionIds == List(0, 1, 3, 4, 5, 7, 8, 9).map(Integer.valueOf(_)).asJava) @@ -528,7 +528,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { test("Alter table split partition: Range Partition") { sql("""ALTER TABLE range_table_logdate_split SPLIT PARTITION(4) INTO ('2017/01/01', '2018/01/01')""") val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate_split") - val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) + val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName) val partitionIds = partitionInfo.getPartitionIds val rangeInfo = partitionInfo.getRangeInfo assert(partitionIds == List(0, 1, 2, 3, 5, 6).map(Integer.valueOf(_)).asJava) @@ -566,7 +566,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { sql("""ALTER TABLE range_table_logdate_split DROP PARTITION(6)""") val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate_split") - val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName) + val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getTableName) val partitionIds1 = partitionInfo1.getPartitionIds val rangeInfo1 = partitionInfo1.getRangeInfo assert(partitionIds1 == List(0, 1, 2, 3, 5).map(Integer.valueOf(_)).asJava) @@ -586,7 +586,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { test("Alter table split partition: Range Partition + Bucket") { sql("""ALTER TABLE range_table_bucket SPLIT PARTITION(4) INTO ('2017/01/01', '2018/01/01')""") val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket") - val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) + val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName) val partitionIds = partitionInfo.getPartitionIds val rangeInfo = partitionInfo.getRangeInfo assert(partitionIds == List(0, 1, 2, 3, 5, 6).map(Integer.valueOf(_)).asJava) @@ -624,7 +624,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { sql("""ALTER TABLE range_table_bucket DROP PARTITION(6) WITH DATA""") val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket") - val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName) + val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getTableName) val partitionIds1 = partitionInfo1.getPartitionIds val rangeInfo1 = partitionInfo1.getRangeInfo assert(partitionIds1 == List(0, 1, 2, 3, 5).map(Integer.valueOf(_)).asJava) @@ -642,7 +642,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { sql("""ALTER TABLE range_table_bucket DROP PARTITION(3)""") val carbonTable2 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket") - val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getFactTableName) + val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getTableName) val partitionIds2 = partitionInfo2.getPartitionIds val rangeInfo2 = partitionInfo2.getRangeInfo assert(partitionIds2 == List(0, 1, 2, 5).map(Integer.valueOf(_)).asJava) @@ -659,7 +659,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { sql("""ALTER TABLE range_table_bucket DROP PARTITION(5)""") val carbonTable3 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket") - val partitionInfo3 = carbonTable3.getPartitionInfo(carbonTable.getFactTableName) + val partitionInfo3 = carbonTable3.getPartitionInfo(carbonTable.getTableName) val partitionIds3 = partitionInfo3.getPartitionIds val rangeInfo3 = partitionInfo3.getRangeInfo assert(partitionIds3 == List(0, 1, 2).map(Integer.valueOf(_)).asJava) @@ -789,7 +789,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { sql("ALTER TABLE carbon_table_default_db ADD PARTITION ('2017')") val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_carbon_table_default_db") - val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) + val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName) val partitionIds = partitionInfo.getPartitionIds val range_info = partitionInfo.getRangeInfo assert(partitionIds == List(0, 1, 2, 3).map(Integer.valueOf(_)).asJava) @@ -809,7 +809,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { sql("ALTER TABLE carbondb.carbontable ADD PARTITION ('2017')") val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("carbondb_carbontable") - val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable1.getFactTableName) + val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable1.getTableName) val partitionIds1 = partitionInfo1.getPartitionIds val range_info1 = partitionInfo1.getRangeInfo assert(partitionIds1 == List(0, 1, 2, 3).map(Integer.valueOf(_)).asJava) http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala index a3024be..3f5d8c6 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala @@ -43,12 +43,12 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll { header: String, allDictFilePath: String): CarbonLoadModel = { val carbonLoadModel = new CarbonLoadModel - carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getDatabaseName) - carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getTableName) - val table = relation.tableMeta.carbonTable + carbonLoadModel.setTableName(relation.carbonTable.getDatabaseName) + carbonLoadModel.setDatabaseName(relation.carbonTable.getTableName) + val table = relation.carbonTable val carbonSchema = new CarbonDataLoadSchema(table) carbonLoadModel.setDatabaseName(table.getDatabaseName) - carbonLoadModel.setTableName(table.getFactTableName) + carbonLoadModel.setTableName(table.getTableName) carbonLoadModel.setCarbonDataLoadSchema(carbonSchema) carbonLoadModel.setFactFilePath(filePath) carbonLoadModel.setCsvHeader(header) @@ -141,10 +141,8 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll { test("Support generate global dictionary from all dictionary files") { val header = "id,name,city,age" val carbonLoadModel = buildCarbonLoadModel(sampleRelation, null, header, sampleAllDictionaryFile) - GlobalDictionaryUtil - .generateGlobalDictionary(sqlContext, - carbonLoadModel, - sampleRelation.tableMeta.storePath) + GlobalDictionaryUtil.generateGlobalDictionary( + sqlContext, carbonLoadModel, sampleRelation.carbonTable.getTablePath) DictionaryTestCaseUtil. checkDictionary(sampleRelation, "city", "shenzhen") @@ -156,7 +154,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll { GlobalDictionaryUtil .generateGlobalDictionary(sqlContext, carbonLoadModel, - complexRelation.tableMeta.storePath) + complexRelation.carbonTable.getTablePath) DictionaryTestCaseUtil. checkDictionary(complexRelation, "channelsId", "1650") http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala index 930de43..4551120 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala @@ -38,9 +38,9 @@ object DictionaryTestCaseUtil { * @param value a value of column */ def checkDictionary(relation: CarbonRelation, columnName: String, value: String) { - val table = relation.tableMeta.carbonTable - val dimension = table.getDimensionByName(table.getFactTableName, columnName) - val tableIdentifier = new CarbonTableIdentifier(table.getDatabaseName, table.getFactTableName, "uniqueid") + val table = relation.carbonTable + val dimension = table.getDimensionByName(table.getTableName, columnName) + val tableIdentifier = new CarbonTableIdentifier(table.getDatabaseName, table.getTableName, "uniqueid") val absoluteTableIdentifier = new AbsoluteTableIdentifier(table.getTablePath, tableIdentifier) val columnIdentifier = new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, dimension.getColumnIdentifier, dimension.getDataType, http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala index d37a68b..78ae384 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala @@ -151,12 +151,12 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft extColFilePath: String, csvDelimiter: String = ","): CarbonLoadModel = { val carbonLoadModel = new CarbonLoadModel - carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getDatabaseName) - carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getTableName) - val table = relation.tableMeta.carbonTable + carbonLoadModel.setTableName(relation.carbonTable.getDatabaseName) + carbonLoadModel.setDatabaseName(relation.carbonTable.getTableName) + val table = relation.carbonTable val carbonSchema = new CarbonDataLoadSchema(table) carbonLoadModel.setDatabaseName(table.getDatabaseName) - carbonLoadModel.setTableName(table.getFactTableName) + carbonLoadModel.setTableName(table.getTableName) carbonLoadModel.setCarbonDataLoadSchema(carbonSchema) carbonLoadModel.setFactFilePath(filePath) carbonLoadModel.setCsvHeader(header) @@ -198,7 +198,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft var carbonLoadModel = buildCarbonLoadModel(extComplexRelation, complexFilePath1, header, extColDictFilePath1) GlobalDictionaryUtil.generateGlobalDictionary(sqlContext, carbonLoadModel, - extComplexRelation.tableMeta.storePath) + extComplexRelation.carbonTable.getTablePath) // check whether the dictionary is generated DictionaryTestCaseUtil.checkDictionary( extComplexRelation, "deviceInformationId", "10086") @@ -207,7 +207,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft carbonLoadModel = buildCarbonLoadModel(extComplexRelation, complexFilePath1, header, extColDictFilePath2) GlobalDictionaryUtil.generateGlobalDictionary(sqlContext, carbonLoadModel, - extComplexRelation.tableMeta.storePath) + extComplexRelation.carbonTable.getTablePath) // check the old dictionary and whether the new distinct value is generated DictionaryTestCaseUtil.checkDictionary( extComplexRelation, "deviceInformationId", "10086") @@ -220,7 +220,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft var carbonLoadModel = buildCarbonLoadModel(extComplexRelation, complexFilePath1, header, extColDictFilePath3) GlobalDictionaryUtil.generateGlobalDictionary(sqlContext, carbonLoadModel, - extComplexRelation.tableMeta.storePath) + extComplexRelation.carbonTable.getTablePath) // check whether the dictionary is generated DictionaryTestCaseUtil.checkDictionary( extComplexRelation, "channelsId", "1421|") @@ -229,7 +229,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft carbonLoadModel = buildCarbonLoadModel(verticalDelimiteRelation, complexFilePath2, header2, extColDictFilePath3, "|") GlobalDictionaryUtil.generateGlobalDictionary(sqlContext, carbonLoadModel, - verticalDelimiteRelation.tableMeta.storePath) + verticalDelimiteRelation.carbonTable.getTablePath) // check whether the dictionary is generated DictionaryTestCaseUtil.checkDictionary( verticalDelimiteRelation, "channelsId", "1431,") http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java index 442d93e..71c3dc2 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java @@ -182,9 +182,9 @@ public final class DataLoadProcessBuilder { loadModel.getBadRecordsLocation()); CarbonMetadata.getInstance().addCarbonTable(carbonTable); List<CarbonDimension> dimensions = - carbonTable.getDimensionByTableName(carbonTable.getFactTableName()); + carbonTable.getDimensionByTableName(carbonTable.getTableName()); List<CarbonMeasure> measures = - carbonTable.getMeasureByTableName(carbonTable.getFactTableName()); + carbonTable.getMeasureByTableName(carbonTable.getTableName()); Map<String, String> dateFormatMap = CarbonDataProcessorUtil.getDateFormatMap(loadModel.getDateFormat()); List<DataField> dataFields = new ArrayList<>(); @@ -209,7 +209,7 @@ public final class DataLoadProcessBuilder { } } configuration.setDataFields(dataFields.toArray(new DataField[dataFields.size()])); - configuration.setBucketingInfo(carbonTable.getBucketingInfo(carbonTable.getFactTableName())); + configuration.setBucketingInfo(carbonTable.getBucketingInfo(carbonTable.getTableName())); // configuration for one pass load: dictionary server info configuration.setUseOnePass(loadModel.getUseOnePass()); configuration.setDictionaryServerHost(loadModel.getDictionaryServerHost()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java index be3572c..65f70a0 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java @@ -209,7 +209,7 @@ public class CarbonCompactionExecutor { List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); List<CarbonDimension> dimensions = - carbonTable.getDimensionByTableName(carbonTable.getFactTableName()); + carbonTable.getDimensionByTableName(carbonTable.getTableName()); for (CarbonDimension dim : dimensions) { // check if dimension is deleted QueryDimension queryDimension = new QueryDimension(dim.getColName()); @@ -220,7 +220,7 @@ public class CarbonCompactionExecutor { List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); List<CarbonMeasure> measures = - carbonTable.getMeasureByTableName(carbonTable.getFactTableName()); + carbonTable.getMeasureByTableName(carbonTable.getTableName()); for (CarbonMeasure carbonMeasure : measures) { // check if measure is deleted QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java index 08b8600..c60bb24 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java @@ -308,7 +308,7 @@ public class CarbonCompactionUtil { public static int[] updateColumnSchemaAndGetCardinality(Map<String, Integer> columnCardinalityMap, CarbonTable carbonTable, List<ColumnSchema> updatedColumnSchemaList) { List<CarbonDimension> masterDimensions = - carbonTable.getDimensionByTableName(carbonTable.getFactTableName()); + carbonTable.getDimensionByTableName(carbonTable.getTableName()); List<Integer> updatedCardinalityList = new ArrayList<>(columnCardinalityMap.size()); for (CarbonDimension dimension : masterDimensions) { Integer value = columnCardinalityMap.get(dimension.getColumnId()); @@ -321,7 +321,7 @@ public class CarbonCompactionUtil { } // add measures to the column schema list List<CarbonMeasure> masterSchemaMeasures = - carbonTable.getMeasureByTableName(carbonTable.getFactTableName()); + carbonTable.getMeasureByTableName(carbonTable.getTableName()); for (CarbonMeasure measure : masterSchemaMeasures) { updatedColumnSchemaList.add(measure.getColumnSchema()); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index c1df349..8f6d19c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -1264,7 +1264,7 @@ public final class CarbonDataMergerUtil { lockStatus = carbonLock.lockWithRetries(); if (lockStatus) { LOGGER.info( - "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName() + "Acquired lock for table" + table.getDatabaseName() + "." + table.getTableName() + " for table status updation"); LoadMetadataDetails[] listOfLoadFolderDetailsArray = @@ -1284,18 +1284,18 @@ public final class CarbonDataMergerUtil { } } else { LOGGER.error("Not able to acquire the lock for Table status updation for table " + table - .getDatabaseName() + "." + table.getFactTableName()); + .getDatabaseName() + "." + table.getTableName()); } } finally { if (lockStatus) { if (carbonLock.unlock()) { LOGGER.info( "Table unlocked successfully after table status updation" + table.getDatabaseName() - + "." + table.getFactTableName()); + + "." + table.getTableName()); } else { LOGGER.error( "Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table - .getFactTableName() + " during table status updation"); + .getTableName() + " during table status updation"); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java b/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java deleted file mode 100644 index 09dbfff..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.merger; - -import java.io.Serializable; - -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; - -public class TableMeta implements Serializable { - - private static final long serialVersionUID = -1749874611119829431L; - - public CarbonTableIdentifier carbonTableIdentifier; - public String storePath; - public CarbonTable carbonTable; - public String tablePath; - - public TableMeta(CarbonTableIdentifier carbonTableIdentifier, String storePath, String tablePath, - CarbonTable carbonTable) { - this.carbonTableIdentifier = carbonTableIdentifier; - this.storePath = storePath; - this.tablePath = tablePath; - this.carbonTable = carbonTable; - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java index aeddac6..36e022b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java @@ -78,7 +78,7 @@ public abstract class AbstractCarbonQueryExecutor { List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); List<CarbonDimension> dimensions = - carbonTable.getDimensionByTableName(carbonTable.getFactTableName()); + carbonTable.getDimensionByTableName(carbonTable.getTableName()); for (CarbonDimension dim : dimensions) { // check if dimension is deleted QueryDimension queryDimension = new QueryDimension(dim.getColName()); @@ -89,7 +89,7 @@ public abstract class AbstractCarbonQueryExecutor { List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); List<CarbonMeasure> measures = - carbonTable.getMeasureByTableName(carbonTable.getFactTableName()); + carbonTable.getMeasureByTableName(carbonTable.getTableName()); for (CarbonMeasure carbonMeasure : measures) { // check if measure is deleted QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java index 1db414f..48c5471 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java @@ -46,7 +46,7 @@ public class RowResultProcessor { SegmentProperties segProp, String[] tempStoreLocation, Integer bucketId) { CarbonDataProcessorUtil.createLocations(tempStoreLocation); this.segmentProperties = segProp; - String tableName = carbonTable.getFactTableName(); + String tableName = carbonTable.getTableName(); CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName, tempStoreLocation); http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java index 504e7ec..75fcea3 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java @@ -17,7 +17,6 @@ package org.apache.carbondata.processing.store; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -42,19 +41,15 @@ import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter; import org.apache.carbondata.core.keygenerator.columnar.impl.MultiDimKeyVarLengthEquiSplitGenerator; import org.apache.carbondata.core.memory.MemoryException; -import org.apache.carbondata.core.metadata.CarbonMetadata; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonThreadFactory; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.processing.datatypes.GenericDataType; import org.apache.carbondata.processing.loading.sort.SortScopeOptions; -import org.apache.carbondata.processing.store.file.FileManager; -import org.apache.carbondata.processing.store.file.IFileManagerComposite; import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo; import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter; @@ -77,10 +72,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { private CarbonFactDataWriter dataWriter; /** - * File manager - */ - private IFileManagerComposite fileManager; - /** * total number of entries in blocklet */ private int entryCount; @@ -91,11 +82,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { */ private int pageSize; - // This variable is true if it is dictionary dimension and its cardinality is lower than - // property of CarbonCommonConstants.HIGH_CARDINALITY_VALUE - // It decides whether it will do RLE encoding on data page for this dimension - private boolean[] rleEncodingForDictDimension; - private boolean[] isNoDictionary; private long processedDataCount; private ExecutorService producerExecutorService; private List<Future<Void>> producerExecutorServiceTaskList; @@ -130,12 +116,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { private boolean processingComplete; /** - * boolean to check whether dimension - * is of dictionary type or no dictionary type - */ - private boolean[] isDictDimension; - - /** * current data format version */ private ColumnarFormatVersion version; @@ -146,47 +126,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { public CarbonFactDataHandlerColumnar(CarbonFactDataHandlerModel model) { this.model = model; initParameters(model); - - int numDimColumns = colGrpModel.getNoOfColumnStore() + model.getNoDictionaryCount() - + getExpandedComplexColsCount(); - this.rleEncodingForDictDimension = new boolean[numDimColumns]; - this.isNoDictionary = new boolean[numDimColumns]; - - int noDictStartIndex = this.colGrpModel.getNoOfColumnStore(); - // setting true value for dims of high card - for (int i = 0; i < model.getNoDictionaryCount(); i++) { - this.isNoDictionary[noDictStartIndex + i] = true; - } - - boolean isAggKeyBlock = Boolean.parseBoolean( - CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK, - CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK_DEFAULTVALUE)); - if (isAggKeyBlock) { - int[] dimLens = model.getSegmentProperties().getDimColumnsCardinality(); - for (int i = 0; i < model.getTableSpec().getNumSimpleDimensions(); i++) { - if (model.getSegmentProperties().getDimensions().get(i).isGlobalDictionaryEncoding()) { - this.rleEncodingForDictDimension[i] = true; - } - } - - if (model.getDimensionCount() < dimLens.length) { - int allColsCount = getColsCount(model.getDimensionCount()); - List<Boolean> rleWithComplex = new ArrayList<Boolean>(allColsCount); - for (int i = 0; i < model.getDimensionCount(); i++) { - GenericDataType complexDataType = model.getComplexIndexMap().get(i); - if (complexDataType != null) { - complexDataType.fillAggKeyBlock(rleWithComplex, this.rleEncodingForDictDimension); - } else { - rleWithComplex.add(this.rleEncodingForDictDimension[i]); - } - } - this.rleEncodingForDictDimension = new boolean[allColsCount]; - for (int i = 0; i < allColsCount; i++) { - this.rleEncodingForDictDimension[i] = rleWithComplex.get(i); - } - } - } this.version = CarbonProperties.getInstance().getFormatVersion(); StringBuffer noInvertedIdxCol = new StringBuffer(); for (CarbonDimension cd : model.getSegmentProperties().getDimensions()) { @@ -202,13 +141,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { SortScopeOptions.SortScope sortScope = model.getSortScope(); this.colGrpModel = model.getSegmentProperties().getColumnGroupModel(); - //TODO need to pass carbon table identifier to metadata - CarbonTable carbonTable = - CarbonMetadata.getInstance().getCarbonTable( - model.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + model.getTableName()); - isDictDimension = - CarbonUtil.identifyDimensionType(carbonTable.getDimensionByTableName(model.getTableName())); - // in compaction flow the measure with decimal type will come as spark decimal. // need to convert it to byte array. if (model.isCompactionFlow()) { @@ -247,19 +179,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { consumerExecutorServiceTaskList.add(consumerExecutorService.submit(consumer)); } - private boolean[] arrangeUniqueBlockType(boolean[] aggKeyBlock) { - int counter = 0; - boolean[] uniqueBlock = new boolean[aggKeyBlock.length]; - for (int i = 0; i < isDictDimension.length; i++) { - if (isDictDimension[i]) { - uniqueBlock[i] = aggKeyBlock[counter++]; - } else { - uniqueBlock[i] = false; - } - } - return uniqueBlock; - } - private void setComplexMapSurrogateIndex(int dimensionCount) { int surrIndex = 0; for (int i = 0; i < dimensionCount; i++) { @@ -283,9 +202,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { * @throws CarbonDataWriterException */ public void initialise() throws CarbonDataWriterException { - fileManager = new FileManager(); - // todo: the fileManager seems to be useless, remove it later - fileManager.setName(new File(model.getStoreLocation()[0]).getName()); setWritingConfiguration(); } @@ -412,27 +328,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { } } - private int getColsCount(int columnSplit) { - int count = 0; - for (int i = 0; i < columnSplit; i++) { - GenericDataType complexDataType = model.getComplexIndexMap().get(i); - if (complexDataType != null) { - count += complexDataType.getColsCount(); - } else count++; - } - return count; - } - // return the number of complex column after complex columns are expanded private int getExpandedComplexColsCount() { return model.getExpandedComplexColsCount(); } - // return the number of complex column - private int getComplexColumnCount() { - return model.getComplexIndexMap().size(); - } - /** * below method will be used to close the handler */ @@ -519,7 +419,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { this.dataWriter = getFactDataWriter(); // initialize the channel; this.dataWriter.initializeWriter(); - //initializeColGrpMinMax(); } /** @@ -571,14 +470,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { carbonDataWriterVo.setStoreLocation(model.getStoreLocation()); carbonDataWriterVo.setMeasureCount(model.getMeasureCount()); carbonDataWriterVo.setTableName(model.getTableName()); - carbonDataWriterVo.setFileManager(fileManager); - carbonDataWriterVo.setRleEncodingForDictDim(rleEncodingForDictDimension); - carbonDataWriterVo.setIsComplexType(isComplexTypes()); carbonDataWriterVo.setNoDictionaryCount(model.getNoDictionaryCount()); carbonDataWriterVo.setCarbonDataFileAttributes(model.getCarbonDataFileAttributes()); carbonDataWriterVo.setDatabaseName(model.getDatabaseName()); carbonDataWriterVo.setWrapperColumnSchemaList(model.getWrapperColumnSchema()); - carbonDataWriterVo.setIsDictionaryColumn(isDictDimension); carbonDataWriterVo.setCarbonDataDirectoryPath(model.getCarbonDataDirectoryPath()); carbonDataWriterVo.setColCardinality(model.getColCardinality()); carbonDataWriterVo.setSegmentProperties(model.getSegmentProperties()); @@ -590,31 +485,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { return carbonDataWriterVo; } - private boolean[] isComplexTypes() { - int noDictionaryCount = model.getNoDictionaryCount(); - int noOfColumn = colGrpModel.getNoOfColumnStore() + noDictionaryCount + getComplexColumnCount(); - int allColsCount = getColsCount(noOfColumn); - boolean[] isComplexType = new boolean[allColsCount]; - - List<Boolean> complexTypesList = new ArrayList<Boolean>(allColsCount); - for (int i = 0; i < noOfColumn; i++) { - GenericDataType complexDataType = model.getComplexIndexMap().get(i - noDictionaryCount); - if (complexDataType != null) { - int count = complexDataType.getColsCount(); - for (int j = 0; j < count; j++) { - complexTypesList.add(true); - } - } else { - complexTypesList.add(false); - } - } - for (int i = 0; i < allColsCount; i++) { - isComplexType[i] = complexTypesList.get(i); - } - - return isComplexType; - } - /** * This method will reset the block processing count */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java b/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java deleted file mode 100644 index ddd9bf2..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.store.file; - - -public class FileData extends FileManager { - - /** - * Store Path - */ - private String storePath; - - /** - * hierarchyValueWriter - */ - - public FileData(String fileName, String storePath) { - this.fileName = fileName; - this.storePath = storePath; - } - - /** - * @return Returns the carbonDataFileTempPath. - */ - public String getFileName() { - return fileName; - } - - /** - * @return Returns the storePath. - */ - public String getStorePath() { - return storePath; - } - -} - http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java b/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java deleted file mode 100644 index cfa3a66..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.store.file; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; - -public class FileManager implements IFileManagerComposite { - /** - * listOfFileData, composite parent which holds the different objects - */ - protected List<IFileManagerComposite> listOfFileData = - new ArrayList<IFileManagerComposite>(CarbonCommonConstants.CONSTANT_SIZE_TEN); - - protected String fileName; - - @Override public void add(IFileManagerComposite customData) { - listOfFileData.add(customData); - } - - @Override public void remove(IFileManagerComposite customData) { - listOfFileData.remove(customData); - - } - - @Override public IFileManagerComposite get(int i) { - return listOfFileData.get(i); - } - - @Override public void setName(String name) { - this.fileName = name; - } - - /** - * Return the size - */ - public int size() { - return listOfFileData.size(); - } - -} - http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java b/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java deleted file mode 100644 index 6691772..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.store.file; - -public interface IFileManagerComposite { - /** - * Add the data which can be either row Folder(Composite) or File - * - * @param customData - */ - void add(IFileManagerComposite customData); - - /** - * Remove the CustomData type object from the IFileManagerComposite object hierarchy. - * - * @param customData - */ - void remove(IFileManagerComposite customData); - - /** - * get the CustomData type object name - * - * @return CustomDataIntf type - */ - IFileManagerComposite get(int i); - - /** - * set the CustomData type object name - * - * @param name - */ - void setName(String name); - - /** - * Get the size - * - * @return - */ - int size(); - -} - http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index 1b6ba72..855ec03 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -60,7 +60,6 @@ import org.apache.carbondata.format.BlockIndex; import org.apache.carbondata.format.BlockletInfo3; import org.apache.carbondata.format.IndexHeader; import org.apache.carbondata.processing.datamap.DataMapWriterListener; -import org.apache.carbondata.processing.store.file.FileData; import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.io.IOUtils; @@ -317,9 +316,6 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { .getCarbonDataFileName(fileCount, dataWriterVo.getCarbonDataFileAttributes().getTaskId(), dataWriterVo.getBucketNumber(), dataWriterVo.getTaskExtension(), "" + dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp()); - String actualFileNameVal = carbonDataFileName + CarbonCommonConstants.FILE_INPROGRESS_STATUS; - FileData fileData = new FileData(actualFileNameVal, chosenTempLocation); - dataWriterVo.getFileManager().add(fileData); this.carbonDataFileTempPath = chosenTempLocation + File.separator + carbonDataFileName + CarbonCommonConstants.FILE_INPROGRESS_STATUS; this.fileCount++; http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java index 26fff09..79cdd95 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java @@ -22,7 +22,6 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.processing.datamap.DataMapWriterListener; import org.apache.carbondata.processing.store.CarbonDataFileAttributes; -import org.apache.carbondata.processing.store.file.IFileManagerComposite; /** * Value object for writing the data @@ -35,12 +34,6 @@ public class CarbonDataWriterVo { private String tableName; - private IFileManagerComposite fileManager; - - private boolean[] rleEncodingForDictDim; - - private boolean[] isComplexType; - private int NoDictionaryCount; private CarbonDataFileAttributes carbonDataFileAttributes; @@ -49,8 +42,6 @@ public class CarbonDataWriterVo { private List<ColumnSchema> wrapperColumnSchemaList; - private boolean[] isDictionaryColumn; - private String carbonDataDirectoryPath; private int[] colCardinality; @@ -110,48 +101,6 @@ public class CarbonDataWriterVo { } /** - * @return the fileManager - */ - public IFileManagerComposite getFileManager() { - return fileManager; - } - - /** - * @param fileManager the fileManager to set - */ - public void setFileManager(IFileManagerComposite fileManager) { - this.fileManager = fileManager; - } - - /** - * @return the rleEncodingForDictDim - */ - public boolean[] getRleEncodingForDictDim() { - return rleEncodingForDictDim; - } - - /** - * @param rleEncodingForDictDim the rleEncodingForDictDim to set - */ - public void setRleEncodingForDictDim(boolean[] rleEncodingForDictDim) { - this.rleEncodingForDictDim = rleEncodingForDictDim; - } - - /** - * @return the isComplexType - */ - public boolean[] getIsComplexType() { - return isComplexType; - } - - /** - * @param isComplexType the isComplexType to set - */ - public void setIsComplexType(boolean[] isComplexType) { - this.isComplexType = isComplexType; - } - - /** * @return the noDictionaryCount */ public int getNoDictionaryCount() { @@ -208,20 +157,6 @@ public class CarbonDataWriterVo { } /** - * @return the isDictionaryColumn - */ - public boolean[] getIsDictionaryColumn() { - return isDictionaryColumn; - } - - /** - * @param isDictionaryColumn the isDictionaryColumn to set - */ - public void setIsDictionaryColumn(boolean[] isDictionaryColumn) { - this.isDictionaryColumn = isDictionaryColumn; - } - - /** * @return the carbonDataDirectoryPath */ public String getCarbonDataDirectoryPath() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index ca40830..7218a12 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -383,7 +383,7 @@ public final class CarbonDataProcessorUtil { */ public static Set<String> getSchemaColumnNames(CarbonDataLoadSchema schema, String tableName) { Set<String> columnNames = new HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - String factTableName = schema.getCarbonTable().getFactTableName(); + String factTableName = schema.getCarbonTable().getTableName(); if (tableName.equals(factTableName)) { List<CarbonDimension> dimensions = schema.getCarbonTable().getDimensionByTableName(factTableName); http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index 29a979d..db3442e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -263,6 +263,11 @@ public final class CarbonLoaderUtil { AbsoluteTableIdentifier absoluteTableIdentifier = loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); + String metadataPath = carbonTablePath.getMetadataDirectoryPath(); + FileType fileType = FileFactory.getFileType(metadataPath); + if (!FileFactory.isFileExist(metadataPath, fileType)) { + FileFactory.mkdirs(metadataPath, fileType); + } String tableStatusPath = carbonTablePath.getTableStatusFilePath(); SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java index 58cc019..e09e3db 100644 --- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java +++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java @@ -297,9 +297,9 @@ public class StoreCreator { String header = reader.readLine(); String[] split = header.split(","); List<CarbonColumn> allCols = new ArrayList<CarbonColumn>(); - List<CarbonDimension> dims = table.getDimensionByTableName(table.getFactTableName()); + List<CarbonDimension> dims = table.getDimensionByTableName(table.getTableName()); allCols.addAll(dims); - List<CarbonMeasure> msrs = table.getMeasureByTableName(table.getFactTableName()); + List<CarbonMeasure> msrs = table.getMeasureByTableName(table.getTableName()); allCols.addAll(msrs); Set<String>[] set = new HashSet[dims.size()]; for (int i = 0; i < set.length; i++) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java index 943858d..7682437 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java @@ -66,7 +66,7 @@ public class StreamSegment { try { if (carbonLock.lockWithRetries()) { LOGGER.info( - "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName() + "Acquired lock for table" + table.getDatabaseName() + "." + table.getTableName() + " for stream table get or create segment"); LoadMetadataDetails[] details = @@ -104,17 +104,17 @@ public class StreamSegment { } else { LOGGER.error( "Not able to acquire the lock for stream table get or create segment for table " + table - .getDatabaseName() + "." + table.getFactTableName()); + .getDatabaseName() + "." + table.getTableName()); throw new IOException("Failed to get stream segment"); } } finally { if (carbonLock.unlock()) { LOGGER.info("Table unlocked successfully after stream table get or create segment" + table - .getDatabaseName() + "." + table.getFactTableName()); + .getDatabaseName() + "." + table.getTableName()); } else { LOGGER.error( "Unable to unlock table lock for stream table" + table.getDatabaseName() + "." + table - .getFactTableName() + " during stream table get or create segment"); + .getTableName() + " during stream table get or create segment"); } } } @@ -132,7 +132,7 @@ public class StreamSegment { try { if (carbonLock.lockWithRetries()) { LOGGER.info( - "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName() + "Acquired lock for table" + table.getDatabaseName() + "." + table.getTableName() + " for stream table finish segment"); LoadMetadataDetails[] details = @@ -165,17 +165,17 @@ public class StreamSegment { } else { LOGGER.error( "Not able to acquire the lock for stream table status updation for table " + table - .getDatabaseName() + "." + table.getFactTableName()); + .getDatabaseName() + "." + table.getTableName()); throw new IOException("Failed to get stream segment"); } } finally { if (carbonLock.unlock()) { LOGGER.info( "Table unlocked successfully after table status updation" + table.getDatabaseName() - + "." + table.getFactTableName()); + + "." + table.getTableName()); } else { LOGGER.error("Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table - .getFactTableName() + " during table status updation"); + .getTableName() + " during table status updation"); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala index 31ed1f6..2c4d35f 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala @@ -147,7 +147,7 @@ object StreamSinkFactory { val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, parameters) optionsFinal.put("sort_scope", "no_sort") if (parameters.get("fileheader").isEmpty) { - optionsFinal.put("fileheader", carbonTable.getCreateOrderColumn(carbonTable.getFactTableName) + optionsFinal.put("fileheader", carbonTable.getCreateOrderColumn(carbonTable.getTableName) .asScala.map(_.getColName).mkString(",")) } val carbonLoadModel = new CarbonLoadModel() http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala index 6ee3296..c2789f4 100644 --- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala +++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala @@ -42,14 +42,14 @@ class CarbonStreamingQueryListener(spark: SparkSession) extends StreamingQueryLi LockUsage.STREAMING_LOCK) if (lock.lockWithRetries()) { LOGGER.info("Acquired the lock for stream table: " + carbonTable.getDatabaseName + "." + - carbonTable.getFactTableName) + carbonTable.getTableName) cache.put(event.id, lock) } else { LOGGER.error("Not able to acquire the lock for stream table:" + - carbonTable.getDatabaseName + "." + carbonTable.getFactTableName) + carbonTable.getDatabaseName + "." + carbonTable.getTableName) throw new InterruptedException( "Not able to acquire the lock for stream table: " + carbonTable.getDatabaseName + "." + - carbonTable.getFactTableName) + carbonTable.getTableName) } } }