[CARBONDATA-1284]Implement hive based schema storage in carbon This closes #1149
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2a9debfc Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2a9debfc Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2a9debfc Branch: refs/heads/metadata Commit: 2a9debfccfdc5633e5e00e95b860efb8b368ea12 Parents: 4b69c9d Author: ravipesala <[email protected]> Authored: Sat Jul 8 17:14:04 2017 +0530 Committer: jackylk <[email protected]> Committed: Tue Jul 11 19:48:57 2017 +0800 ---------------------------------------------------------------------- .../dictionary/ManageDictionaryAndBTree.java | 39 +- .../core/constants/CarbonCommonConstants.java | 4 + .../carbondata/core/locks/CarbonLockUtil.java | 11 +- .../apache/carbondata/core/util/CarbonUtil.java | 127 +++ .../core/util/path/CarbonTablePath.java | 4 + .../partition/TestDDLForPartitionTable.scala | 1 + .../spark/rdd/AlterTableAddColumnRDD.scala | 7 + .../carbondata/spark/rdd/CarbonMergerRDD.scala | 2 + .../apache/carbondata/spark/rdd/Compactor.scala | 2 - .../spark/rdd/DataManagementFunc.scala | 1 - .../spark/rdd/NewCarbonDataLoadRDD.scala | 4 - .../execution/command/carbonTableSchema.scala | 3 - .../spark/rdd/CarbonDataRDDFactory.scala | 47 +- .../spark/DictionaryDetailHelper.scala | 18 +- .../spark/rdd/CarbonDataRDDFactory.scala | 44 +- .../carbondata/spark/util/CarbonSparkUtil.scala | 15 +- .../sql/CarbonDatasourceHadoopRelation.scala | 8 +- .../scala/org/apache/spark/sql/CarbonEnv.scala | 6 +- .../org/apache/spark/sql/CarbonSource.scala | 93 +- .../execution/command/AlterTableCommands.scala | 53 +- .../execution/command/CarbonHiveCommands.scala | 14 +- .../sql/execution/command/DDLStrategy.scala | 14 +- .../sql/execution/command/IUDCommands.scala | 5 +- .../execution/command/carbonTableSchema.scala | 87 +- .../spark/sql/hive/CarbonFileMetastore.scala | 531 ++++++++++ .../spark/sql/hive/CarbonHiveMetaStore.scala | 287 ++++++ .../apache/spark/sql/hive/CarbonMetaStore.scala | 170 ++++ .../apache/spark/sql/hive/CarbonMetastore.scala | 960 ------------------- .../apache/spark/sql/hive/CarbonRelation.scala | 292 ++++++ .../org/apache/spark/util/AlterTableUtil.scala | 57 +- .../org/apache/spark/util/CleanFiles.scala | 6 +- .../apache/spark/util/DeleteSegmentByDate.scala | 8 +- .../apache/spark/util/DeleteSegmentById.scala | 6 +- .../org/apache/spark/util/ShowSegments.scala | 6 +- .../spark/util/AllDictionaryTestCase.scala | 10 + .../util/ExternalColumnDictionaryTestCase.scala | 10 + .../carbondata/CarbonDataSourceSuite.scala | 1 + .../processing/merger/CarbonCompactionUtil.java | 11 +- 38 files changed, 1744 insertions(+), 1220 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java index a50bf15..bae9189 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java @@ -67,25 +67,28 @@ public class ManageDictionaryAndBTree { String metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath(); CarbonFile metadataDir = FileFactory .getCarbonFile(metadataDirectoryPath, FileFactory.getFileType(metadataDirectoryPath)); - // sort index file is created with dictionary size appended to it. So all the files - // with a given column ID need to be listed - CarbonFile[] listFiles = metadataDir.listFiles(new CarbonFileFilter() { - @Override public boolean accept(CarbonFile path) { - if (path.getName().startsWith(columnSchema.getColumnUniqueId())) { - return true; + if (metadataDir.exists()) { + // sort index file is created with dictionary size appended to it. So all the files + // with a given column ID need to be listed + CarbonFile[] listFiles = metadataDir.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile path) { + if (path.getName().startsWith(columnSchema.getColumnUniqueId())) { + return true; + } + return false; + } + }); + for (CarbonFile file : listFiles) { + // try catch is inside for loop because even if one deletion fails, other files + // still need to be deleted + try { + FileFactory.deleteFile(file.getCanonicalPath(), + FileFactory.getFileType(file.getCanonicalPath())); + } catch (IOException e) { + LOGGER.error("Failed to delete dictionary or sortIndex file for column " + + columnSchema.getColumnName() + "with column ID " + + columnSchema.getColumnUniqueId()); } - return false; - } - }); - for (CarbonFile file : listFiles) { - // try catch is inside for loop because even if one deletion fails, other files - // still need to be deleted - try { - FileFactory - .deleteFile(file.getCanonicalPath(), FileFactory.getFileType(file.getCanonicalPath())); - } catch (IOException e) { - LOGGER.error("Failed to delete dictionary or sortIndex file for column " + columnSchema - .getColumnName() + "with column ID " + columnSchema.getColumnUniqueId()); } } // remove dictionary cache http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 6393131..55a292e 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1264,6 +1264,10 @@ public final class CarbonCommonConstants { public static final String CARBON_BAD_RECORDS_ACTION_DEFAULT = "FORCE"; + public static final String ENABLE_HIVE_SCHEMA_META_STORE = "spark.carbon.hive.schema.store"; + + public static final String ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT = "false"; + private CarbonCommonConstants() { } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java index eaaaf94..60a7564 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java @@ -19,7 +19,7 @@ package org.apache.carbondata.core.locks; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; /** * This class contains all carbon lock utilities @@ -66,15 +66,12 @@ public class CarbonLockUtil { * Given a lock type this method will return a new lock object if not acquired by any other * operation * - * @param carbonTable + * @param identifier * @param lockType * @return */ - public static ICarbonLock getLockObject(CarbonTable carbonTable, - String lockType) { - ICarbonLock carbonLock = CarbonLockFactory - .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier().getCarbonTableIdentifier(), - lockType); + public static ICarbonLock getLockObject(CarbonTableIdentifier identifier, String lockType) { + ICarbonLock carbonLock = CarbonLockFactory.getCarbonLockObj(identifier, lockType); LOGGER.info("Trying to acquire lock: " + carbonLock); if (carbonLock.lockWithRetries()) { LOGGER.info("Successfully acquired the lock " + carbonLock); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 8298600..b9c164a 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -31,8 +31,10 @@ import java.nio.charset.Charset; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.carbondata.common.logging.LogService; @@ -58,6 +60,7 @@ import org.apache.carbondata.core.metadata.ValueEncoderMeta; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; @@ -70,9 +73,11 @@ import org.apache.carbondata.core.service.PathService; import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.core.writer.ThriftWriter; import org.apache.carbondata.format.DataChunk2; import org.apache.carbondata.format.DataChunk3; +import com.google.gson.Gson; import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; @@ -1720,5 +1725,127 @@ public final class CarbonUtil { public static boolean isValidBadStorePath(String badRecordsLocation) { return !(null == badRecordsLocation || badRecordsLocation.length() == 0); } + + public static String convertToMultiGsonStrings(TableInfo tableInfo, String seperator, + String quote, String prefix) { + Gson gson = new Gson(); + String schemaString = gson.toJson(tableInfo); + int schemaLen = schemaString.length(); + int splitLen = 4000; + int parts = schemaLen / splitLen; + if (schemaLen % splitLen > 0) { + parts++; + } + StringBuilder builder = + new StringBuilder(prefix).append(quote).append("carbonSchemaPartsNo").append(quote) + .append(seperator).append("'").append(parts).append("',"); + int runningLen = 0; + int endLen = splitLen; + for (int i = 0; i < parts; i++) { + if (i == parts - 1) { + endLen = schemaLen % splitLen; + } + builder.append(quote).append("carbonSchema").append(i).append(quote).append(seperator); + builder.append("'").append(schemaString.substring(runningLen, runningLen + endLen)) + .append("'"); + if (i < parts - 1) { + builder.append(","); + } + runningLen += splitLen; + } + return builder.toString(); + } + + public static Map<String, String> convertToMultiStringMap(TableInfo tableInfo) { + Map<String, String> map = new HashMap<>(); + Gson gson = new Gson(); + String schemaString = gson.toJson(tableInfo); + int schemaLen = schemaString.length(); + int splitLen = 4000; + int parts = schemaLen / splitLen; + if (schemaLen % splitLen > 0) { + parts++; + } + map.put("carbonSchemaPartsNo", parts + ""); + int runningLen = 0; + int endLen = splitLen; + for (int i = 0; i < parts; i++) { + if (i == parts - 1) { + endLen = schemaLen % splitLen; + } + map.put("carbonSchema" + i, schemaString.substring(runningLen, runningLen + endLen)); + runningLen += splitLen; + } + return map; + } + + + public static TableInfo convertGsonToTableInfo(Map<String, String> properties) { + Gson gson = new Gson(); + String partsNo = properties.get("carbonSchemaPartsNo"); + if (partsNo == null) { + return null; + } + int no = Integer.parseInt(partsNo); + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < no; i++) { + String part = properties.get("carbonSchema" + i); + if (part == null) { + throw new RuntimeException("Some thing wrong in getting schema from hive metastore"); + } + builder.append(part); + } + TableInfo tableInfo = gson.fromJson(builder.toString(), TableInfo.class); + return tableInfo; + } + + /** + * This method will read the schema file from a given path + * + * @param schemaFilePath + * @return + */ + public static org.apache.carbondata.format.TableInfo readSchemaFile(String schemaFilePath) + throws IOException { + TBaseCreator createTBase = new ThriftReader.TBaseCreator() { + public org.apache.thrift.TBase<org.apache.carbondata.format.TableInfo, + org.apache.carbondata.format.TableInfo._Fields> create() { + return new org.apache.carbondata.format.TableInfo(); + } + }; + ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase); + thriftReader.open(); + org.apache.carbondata.format.TableInfo tableInfo = + (org.apache.carbondata.format.TableInfo) thriftReader.read(); + thriftReader.close(); + return tableInfo; + } + + public static void writeThriftTableToSchemaFile(String schemaFilePath, + org.apache.carbondata.format.TableInfo tableInfo) throws IOException { + ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false); + try { + thriftWriter.open(); + thriftWriter.write(tableInfo); + } finally { + thriftWriter.close(); + } + } + + public static void createDatabaseDirectory(String dbName, String storePath) throws IOException { + String databasePath = storePath + File.separator + dbName.toLowerCase(); + FileFactory.FileType fileType = FileFactory.getFileType(databasePath); + FileFactory.mkdirs(databasePath, fileType); + } + + public static void dropDatabaseDirectory(String dbName, String storePath) + throws IOException, InterruptedException { + String databasePath = storePath + File.separator + dbName; + FileFactory.FileType fileType = FileFactory.getFileType(databasePath); + if (FileFactory.isFileExist(databasePath, fileType)) { + CarbonFile dbPath = FileFactory.getCarbonFile(databasePath, fileType); + CarbonUtil.deleteFoldersAndFiles(dbPath); + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java index 101419d..5824d76 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java +++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java @@ -432,6 +432,10 @@ public class CarbonTablePath extends Path { return tablePath + File.separator + FACT_DIR; } + public CarbonTableIdentifier getCarbonTableIdentifier() { + return carbonTableIdentifier; + } + @Override public boolean equals(Object o) { if (!(o instanceof CarbonTablePath)) { return false; http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala index 5b77a24..c38b249 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala @@ -128,6 +128,7 @@ class TestDDLForPartitionTable extends QueryTest with BeforeAndAfterAll { } test("test describe formatted for partition column") { + sql("drop table if exists des") sql( """create table des(a int, b string) partitioned by (c string) stored by 'carbondata' |tblproperties ('partition_type'='list','list_info'='1,2')""".stripMargin) http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala index 7eea95d..2fb72f7 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala @@ -22,6 +22,7 @@ import org.apache.spark.rdd.RDD import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.CarbonTableIdentifier import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema @@ -80,6 +81,12 @@ class AlterTableAddColumnRDD[K, V](sc: SparkContext, CarbonCommonConstants.DEFAULT_CHARSET_CLASS) } CarbonProperties.getInstance.addProperty(CarbonCommonConstants.LOCK_TYPE, lockType) + // Create table and metadata folders if not exist + val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath + val fileType = FileFactory.getFileType(metadataDirectoryPath) + if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) { + FileFactory.mkdirs(metadataDirectoryPath, fileType) + } GlobalDictionaryUtil .loadDefaultDictionaryValueForNewColumn(carbonTablePath, columnSchema, http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 815dba3..bc5ca06 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -265,6 +265,8 @@ class CarbonMergerRDD[K, V]( val jobConf: JobConf = new JobConf(new Configuration) val job: Job = new Job(jobConf) val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job) + CarbonInputFormat.setTableInfo(job.getConfiguration, + carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo) var updateDetails: UpdateVO = null // initialise query_id for job job.getConfiguration.set("query.id", queryId) http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala index 1a237f6..4cebcd8 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala @@ -39,7 +39,6 @@ object Compactor { val storePath = compactionCallableModel.storePath val storeLocation = compactionCallableModel.storeLocation val carbonTable = compactionCallableModel.carbonTable - val cubeCreationTime = compactionCallableModel.cubeCreationTime val loadsToMerge = compactionCallableModel.loadsToMerge val sc = compactionCallableModel.sqlContext val carbonLoadModel = compactionCallableModel.carbonLoadModel @@ -57,7 +56,6 @@ object Compactor { storePath, carbonTable.getMetaDataFilepath, mergedLoadName, - cubeCreationTime, databaseName, factTableName, validSegments, http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala index 954303f..5ab8160 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala @@ -250,7 +250,6 @@ object DataManagementFunc { carbonLoadModel, storeLocation, compactionModel.carbonTable, - compactionModel.tableCreationTime, loadsToMerge, sqlContext, compactionModel.compactionType http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 129c642..7cbe9a4 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -388,8 +388,6 @@ class NewDataFrameLoaderRDD[K, V]( result: DataLoadResult[K, V], carbonLoadModel: CarbonLoadModel, loadCount: Integer, - tableCreationTime: Long, - schemaLastUpdatedTime: Long, prev: DataLoadCoalescedRDD[Row]) extends CarbonRDD[(K, V)](prev) { override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { @@ -583,8 +581,6 @@ class PartitionTableDataLoaderRDD[K, V]( result: DataLoadResult[K, V], carbonLoadModel: CarbonLoadModel, loadCount: Integer, - tableCreationTime: Long, - schemaLastUpdatedTime: Long, prev: RDD[Row]) extends CarbonRDD[(K, V)](prev) { override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 6174f7c..500e18e 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -90,7 +90,6 @@ case class CarbonMergerMapping(storeLocation: String, hdfsStoreLocation: String, metadataFilePath: String, var mergedLoadName: String, - tableCreationTime: Long, databaseName: String, factTableName: String, validSegments: Array[String], @@ -117,14 +116,12 @@ case class UpdateTableModel(isUpdate: Boolean, case class CompactionModel(compactionSize: Long, compactionType: CompactionType, carbonTable: CarbonTable, - tableCreationTime: Long, isDDLTrigger: Boolean) case class CompactionCallableModel(storePath: String, carbonLoadModel: CarbonLoadModel, storeLocation: String, carbonTable: CarbonTable, - cubeCreationTime: Long, loadsToMerge: util.List[LoadMetadataDetails], sqlContext: SQLContext, compactionType: CompactionType) http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 3579b8a..31b05bc 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -104,8 +104,6 @@ object CarbonDataRDDFactory { LOGGER.audit(s"Compaction request received for table " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - val tableCreationTime = CarbonEnv.get.carbonMetastore - .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName) if (null == carbonLoadModel.getLoadMetadataDetails) { CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath) @@ -118,7 +116,6 @@ object CarbonDataRDDFactory { val compactionModel = CompactionModel(compactionSize, compactionType, carbonTable, - tableCreationTime, isCompactionTriggerByDDl ) @@ -272,23 +269,19 @@ object CarbonDataRDDFactory { if (!isConcurrentCompactionAllowed) { LOGGER.info("System level compaction lock is enabled.") val skipCompactionTables = ListBuffer[CarbonTableIdentifier]() - var tableForCompaction = CarbonCompactionUtil - .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata.tablesMeta.toArray, + var table: CarbonTable = CarbonCompactionUtil + .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata. + tablesMeta.map(_.carbonTable).toArray, skipCompactionTables.toList.asJava) - while (null != tableForCompaction) { + while (null != table) { LOGGER.info("Compaction request has been identified for table " + - s"${ tableForCompaction.carbonTable.getDatabaseName }." + - s"${ tableForCompaction.carbonTableIdentifier.getTableName }") - val table: CarbonTable = tableForCompaction.carbonTable + s"${ table.getDatabaseName }." + + s"${ table.getFactTableName }") val metadataPath = table.getMetaDataFilepath val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath) val newCarbonLoadModel = new CarbonLoadModel() DataManagementFunc.prepareCarbonLoadModel(storePath, table, newCarbonLoadModel) - val tableCreationTime = CarbonEnv.get.carbonMetastore - .getTableCreationTime(newCarbonLoadModel.getDatabaseName, - newCarbonLoadModel.getTableName - ) val compactionSize = CarbonDataMergerUtil .getCompactionSize(CompactionType.MAJOR_COMPACTION) @@ -296,7 +289,6 @@ object CarbonDataRDDFactory { val newcompactionModel = CompactionModel(compactionSize, compactionType, table, - tableCreationTime, compactionModel.isDDLTrigger ) // proceed for compaction @@ -309,8 +301,8 @@ object CarbonDataRDDFactory { } catch { case e: Exception => LOGGER.error("Exception in compaction thread for table " + - s"${ tableForCompaction.carbonTable.getDatabaseName }." + - s"${ tableForCompaction.carbonTableIdentifier.getTableName }") + s"${ table.getDatabaseName }." + + s"${ table.getFactTableName }") // not handling the exception. only logging as this is not the table triggered // by user. } finally { @@ -319,16 +311,16 @@ object CarbonDataRDDFactory { .deleteCompactionRequiredFile(metadataPath, compactionType)) { // if the compaction request file is not been able to delete then // add those tables details to the skip list so that it wont be considered next. - skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier) + skipCompactionTables.+=:(table.getCarbonTableIdentifier) LOGGER.error("Compaction request file can not be deleted for table " + - s"${ tableForCompaction.carbonTable.getDatabaseName }." + - s"${ tableForCompaction.carbonTableIdentifier.getTableName }") + s"${ table.getDatabaseName }." + + s"${ table.getFactTableName }") } } // ********* check again for all the tables. - tableForCompaction = CarbonCompactionUtil + table = CarbonCompactionUtil .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata - .tablesMeta.toArray, skipCompactionTables.asJava + .tablesMeta.map(_.carbonTable).toArray, skipCompactionTables.asJava ) } // giving the user his error for telling in the beeline if his triggered table @@ -360,7 +352,7 @@ object CarbonDataRDDFactory { val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val isAgg = false // for handling of the segment Merging. - def handleSegmentMerging(tableCreationTime: Long): Unit = { + def handleSegmentMerging(): Unit = { LOGGER.info(s"compaction need status is" + s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }") if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) { @@ -371,7 +363,6 @@ object CarbonDataRDDFactory { val compactionModel = CompactionModel(compactionSize, CompactionType.MINOR_COMPACTION, carbonTable, - tableCreationTime, isCompactionTriggerByDDl ) var storeLocation = "" @@ -479,10 +470,6 @@ object CarbonDataRDDFactory { // reading the start time of data load. val loadStartTime = CarbonUpdateUtil.readCurrentTime(); carbonLoadModel.setFactTimeStamp(loadStartTime) - val tableCreationTime = CarbonEnv.get.carbonMetastore - .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName) - val schemaLastUpdatedTime = CarbonEnv.get.carbonMetastore - .getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName) // get partition way from configuration // val isTableSplitPartition = CarbonProperties.getInstance().getProperty( @@ -634,8 +621,6 @@ object CarbonDataRDDFactory { new DataLoadResultImpl(), carbonLoadModel, currentLoadCount, - tableCreationTime, - schemaLastUpdatedTime, newRdd).collect() } catch { case ex: Exception => @@ -748,8 +733,6 @@ object CarbonDataRDDFactory { new DataLoadResultImpl(), carbonLoadModel, currentLoadCount, - tableCreationTime, - schemaLastUpdatedTime, rdd).collect() } catch { case ex: Exception => @@ -974,7 +957,7 @@ object CarbonDataRDDFactory { } try { // compaction handling - handleSegmentMerging(tableCreationTime) + handleSegmentMerging() } catch { case e: Exception => throw new Exception( http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala index 32ba6cf..779ace1 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala @@ -36,15 +36,17 @@ class DictionaryDetailHelper extends DictionaryDetailService { // Metadata folder val metadataDirectory = FileFactory.getCarbonFile(dictfolderPath, fileType) // need list all dictionary file paths with exists flag - val carbonFiles = metadataDirectory.listFiles(new CarbonFileFilter { - @Override def accept(pathname: CarbonFile): Boolean = { - CarbonTablePath.isDictionaryFile(pathname) - } - }) - // 2 put dictionary file names to fileNamesMap val fileNamesMap = new HashMap[String, Int] - for (i <- 0 until carbonFiles.length) { - fileNamesMap.put(carbonFiles(i).getName, i) + if (metadataDirectory.exists()) { + val carbonFiles = metadataDirectory.listFiles(new CarbonFileFilter { + @Override def accept(pathname: CarbonFile): Boolean = { + CarbonTablePath.isDictionaryFile(pathname) + } + }) + // 2 put dictionary file names to fileNamesMap + for (i <- 0 until carbonFiles.length) { + fileNamesMap.put(carbonFiles(i).getName, i) + } } // 3 lookup fileNamesMap, if file name is in fileNamesMap, file is exists, or not. primDimensions.zipWithIndex.foreach { f => http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 5c20808..fc813d1 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -105,8 +105,6 @@ object CarbonDataRDDFactory { LOGGER.audit(s"Compaction request received for table " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - val tableCreationTime = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore - .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName) if (null == carbonLoadModel.getLoadMetadataDetails) { CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath) @@ -124,7 +122,6 @@ object CarbonDataRDDFactory { val compactionModel = CompactionModel(compactionSize, compactionType, carbonTable, - tableCreationTime, isCompactionTriggerByDDl ) @@ -285,22 +282,18 @@ object CarbonDataRDDFactory { val skipCompactionTables = ListBuffer[CarbonTableIdentifier]() var tableForCompaction = CarbonCompactionUtil .getNextTableToCompact(CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore - .metadata.tablesMeta.toArray, + .listAllTables(sqlContext.sparkSession).toArray, skipCompactionTables.toList.asJava) while (null != tableForCompaction) { LOGGER.info("Compaction request has been identified for table " + - s"${ tableForCompaction.carbonTable.getDatabaseName }." + - s"${ tableForCompaction.carbonTableIdentifier.getTableName }") - val table: CarbonTable = tableForCompaction.carbonTable + s"${ tableForCompaction.getDatabaseName }." + + s"${ tableForCompaction.getFactTableName}") + val table: CarbonTable = tableForCompaction val metadataPath = table.getMetaDataFilepath val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath) val newCarbonLoadModel = new CarbonLoadModel() DataManagementFunc.prepareCarbonLoadModel(storePath, table, newCarbonLoadModel) - val tableCreationTime = CarbonEnv.getInstance(sqlContext.sparkSession) - .carbonMetastore.getTableCreationTime(newCarbonLoadModel.getDatabaseName, - newCarbonLoadModel.getTableName - ) val compactionSize = CarbonDataMergerUtil .getCompactionSize(CompactionType.MAJOR_COMPACTION) @@ -308,7 +301,6 @@ object CarbonDataRDDFactory { val newcompactionModel = CompactionModel(compactionSize, compactionType, table, - tableCreationTime, compactionModel.isDDLTrigger ) // proceed for compaction @@ -321,8 +313,8 @@ object CarbonDataRDDFactory { } catch { case e: Exception => LOGGER.error("Exception in compaction thread for table " + - s"${ tableForCompaction.carbonTable.getDatabaseName }." + - s"${ tableForCompaction.carbonTableIdentifier.getTableName }") + s"${ tableForCompaction.getDatabaseName }." + + s"${ tableForCompaction.getFactTableName }") // not handling the exception. only logging as this is not the table triggered // by user. } finally { @@ -331,17 +323,17 @@ object CarbonDataRDDFactory { .deleteCompactionRequiredFile(metadataPath, compactionType)) { // if the compaction request file is not been able to delete then // add those tables details to the skip list so that it wont be considered next. - skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier) + skipCompactionTables.+=:(tableForCompaction.getCarbonTableIdentifier) LOGGER.error("Compaction request file can not be deleted for table " + - s"${ tableForCompaction.carbonTable.getDatabaseName }." + - s"${ tableForCompaction.carbonTableIdentifier.getTableName }") + s"${ tableForCompaction.getDatabaseName }." + + s"${ tableForCompaction.getFactTableName }") } } // ********* check again for all the tables. tableForCompaction = CarbonCompactionUtil .getNextTableToCompact(CarbonEnv.getInstance(sqlContext.sparkSession) - .carbonMetastore.metadata - .tablesMeta.toArray, skipCompactionTables.asJava + .carbonMetastore.listAllTables(sqlContext.sparkSession).toArray, + skipCompactionTables.asJava ) } } @@ -373,7 +365,7 @@ object CarbonDataRDDFactory { val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val isAgg = false // for handling of the segment Merging. - def handleSegmentMerging(tableCreationTime: Long): Unit = { + def handleSegmentMerging(): Unit = { LOGGER.info(s"compaction need status is" + s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }") if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) { @@ -384,7 +376,6 @@ object CarbonDataRDDFactory { val compactionModel = CompactionModel(compactionSize, CompactionType.MINOR_COMPACTION, carbonTable, - tableCreationTime, isCompactionTriggerByDDl ) var storeLocation = "" @@ -492,11 +483,6 @@ object CarbonDataRDDFactory { // reading the start time of data load. val loadStartTime = CarbonUpdateUtil.readCurrentTime(); carbonLoadModel.setFactTimeStamp(loadStartTime) - val tableCreationTime = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore - .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName) - val schemaLastUpdatedTime = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore - .getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName) - // get partition way from configuration // val isTableSplitPartition = CarbonProperties.getInstance().getProperty( // CarbonCommonConstants.TABLE_SPLIT_PARTITION, @@ -645,8 +631,6 @@ object CarbonDataRDDFactory { new DataLoadResultImpl(), carbonLoadModel, currentLoadCount, - tableCreationTime, - schemaLastUpdatedTime, newRdd).collect() } catch { @@ -760,8 +744,6 @@ object CarbonDataRDDFactory { new DataLoadResultImpl(), carbonLoadModel, currentLoadCount, - tableCreationTime, - schemaLastUpdatedTime, rdd).collect() } catch { case ex: Exception => @@ -998,7 +980,7 @@ object CarbonDataRDDFactory { } try { // compaction handling - handleSegmentMerging(tableCreationTime) + handleSegmentMerging() } catch { case e: Exception => throw new Exception( http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala index 2f65fbc..d1d3015 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala @@ -19,11 +19,13 @@ package org.apache.carbondata.spark.util import scala.collection.JavaConverters._ -import org.apache.spark.sql.hive.{CarbonMetaData, DictionaryMap} +import org.apache.spark.sql.hive.{CarbonMetaData, CarbonRelation, DictionaryMap} +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.encoder.Encoding -import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.processing.merger.TableMeta case class TransformHolder(rdd: Any, mataData: CarbonMetaData) @@ -42,4 +44,13 @@ object CarbonSparkUtil { } CarbonMetaData(dimensionsAttr, measureAttr, carbonTable, DictionaryMap(dictionary.toMap)) } + + def createCarbonRelation(tableInfo: TableInfo, tablePath: String): CarbonRelation = { + val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) + val table = CarbonTable.buildFromTableInfo(tableInfo) + val meta = new TableMeta(identifier.getCarbonTableIdentifier, identifier.getStorePath, table) + CarbonRelation(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName, + CarbonSparkUtil.createSparkMeta(table), meta) + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index d1baf79..7411e6e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -54,11 +54,9 @@ case class CarbonDatasourceHadoopRelation( ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo) @transient lazy val carbonRelation: CarbonRelation = - CarbonEnv.getInstance(sparkSession).carbonMetastore - .lookupRelation( - Some(identifier.getCarbonTableIdentifier.getDatabaseName), - identifier.getCarbonTableIdentifier.getTableName)(sparkSession) - .asInstanceOf[CarbonRelation] + CarbonEnv.getInstance(sparkSession).carbonMetastore. + createCarbonRelation(parameters, identifier, sparkSession) + @transient lazy val carbonTable: CarbonTable = carbonRelation.tableMeta.carbonTable http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index 925b82b..d19eb39 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql import java.util.Map import java.util.concurrent.ConcurrentHashMap -import org.apache.spark.sql.hive.{CarbonMetastore, CarbonSessionCatalog} +import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonMetaStoreFactory, CarbonSessionCatalog} import org.apache.spark.sql.internal.CarbonSQLConf import org.apache.carbondata.common.logging.LogServiceFactory @@ -34,7 +34,7 @@ import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl */ class CarbonEnv { - var carbonMetastore: CarbonMetastore = _ + var carbonMetastore: CarbonMetaStore = _ var sessionParams: SessionParams = _ @@ -64,7 +64,7 @@ class CarbonEnv { val storePath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION) LOGGER.info(s"carbon env initial: $storePath") - new CarbonMetastore(sparkSession.conf, storePath) + CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf, storePath) } CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true") initialized = true http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index 1c16143..498ea03 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -17,20 +17,23 @@ package org.apache.spark.sql +import scala.collection.JavaConverters._ import scala.language.implicitConversions import org.apache.commons.lang.StringUtils import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.execution.CarbonLateDecodeStrategy -import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field} +import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, TableModel, TableNewProcessor} import org.apache.spark.sql.optimizer.CarbonLateDecodeRule import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DecimalType, StructType} import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.metadata.schema.table.TableInfo +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.spark.CarbonOption import org.apache.carbondata.spark.exception.MalformedCarbonCommandException @@ -125,7 +128,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider } val path = if (sqlContext.sparkSession.sessionState.catalog.listTables(dbName) .exists(_.table.equalsIgnoreCase(tableName))) { - getPathForTable(sqlContext.sparkSession, dbName, tableName) + getPathForTable(sqlContext.sparkSession, dbName, tableName, parameters) } else { createTableIfNotExists(sqlContext.sparkSession, parameters, dataSchema) } @@ -148,20 +151,22 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider val dbName: String = parameters.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase val tableName: String = parameters.getOrElse("tableName", "").toLowerCase + try { - CarbonEnv.getInstance(sparkSession).carbonMetastore - .lookupRelation(Option(dbName), tableName)(sparkSession) - CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName" + if (parameters.contains("carbonSchemaPartsNo")) { + getPathForTable(sparkSession, dbName, tableName, parameters) + } else { + CarbonEnv.getInstance(sparkSession).carbonMetastore + .lookupRelation(Option(dbName), tableName)(sparkSession) + CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName" + } } catch { case ex: NoSuchTableException => - val sqlParser = new CarbonSpark2SqlParser - val fields = sqlParser.getFields(dataSchema) - val map = scala.collection.mutable.Map[String, String]() - parameters.foreach { case (key, value) => map.put(key, value.toLowerCase()) } - val options = new CarbonOption(parameters) - val bucketFields = sqlParser.getBucketFields(map, fields, options) - val cm = sqlParser.prepareTableModel(ifNotExistPresent = false, Option(dbName), - tableName, fields, Nil, map, bucketFields) + val cm: TableModel = CarbonSource.createTableInfoFromParams( + parameters, + dataSchema, + dbName, + tableName) CreateTable(cm, false).run(sparkSession) CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName" case ex: Exception => @@ -171,13 +176,14 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider /** * Returns the path of the table + * * @param sparkSession * @param dbName * @param tableName * @return */ private def getPathForTable(sparkSession: SparkSession, dbName: String, - tableName : String): String = { + tableName : String, parameters: Map[String, String]): String = { if (StringUtils.isBlank(tableName)) { throw new MalformedCarbonCommandException("The Specified Table Name is Blank") @@ -186,9 +192,13 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider throw new MalformedCarbonCommandException("Table Name Should not have spaces ") } try { - CarbonEnv.getInstance(sparkSession).carbonMetastore - .lookupRelation(Option(dbName), tableName)(sparkSession) - CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName" + if (parameters.contains("tablePath")) { + parameters.get("tablePath").get + } else { + CarbonEnv.getInstance(sparkSession).carbonMetastore + .lookupRelation(Option(dbName), tableName)(sparkSession) + CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName" + } } catch { case ex: Exception => throw new Exception(s"Do not have $dbName and $tableName", ex) @@ -196,3 +206,50 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider } } + +object CarbonSource { + + def createTableInfoFromParams(parameters: Map[String, String], + dataSchema: StructType, + dbName: String, + tableName: String): TableModel = { + val sqlParser = new CarbonSpark2SqlParser + val fields = sqlParser.getFields(dataSchema) + val map = scala.collection.mutable.Map[String, String]() + parameters.foreach { case (key, value) => map.put(key, value.toLowerCase()) } + val options = new CarbonOption(parameters) + val bucketFields = sqlParser.getBucketFields(map, fields, options) + sqlParser.prepareTableModel(ifNotExistPresent = false, Option(dbName), + tableName, fields, Nil, map, bucketFields) + } + + /** + * Update spark catalog table with schema information in case of schema storage is hive metastore + * @param tableDesc + * @param sparkSession + * @return + */ + def updateCatalogTableWithCarbonSchema(tableDesc: CatalogTable, + sparkSession: SparkSession): CatalogTable = { + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val storageFormat = tableDesc.storage + val properties = storageFormat.properties + if (metaStore.isReadFromHiveMetaStore && !properties.contains("carbonSchemaPartsNo")) { + val dbName: String = properties.getOrElse("dbName", + CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase + val tableName: String = properties.getOrElse("tableName", "").toLowerCase + val model = createTableInfoFromParams(properties, tableDesc.schema, dbName, tableName) + val tableInfo: TableInfo = TableNewProcessor(model) + val (tablePath, carbonSchemaString) = + metaStore.createTableFromThrift(tableInfo, dbName, tableName)(sparkSession) + val map = CarbonUtil.convertToMultiStringMap(tableInfo) + properties.foreach(e => map.put(e._1, e._2)) + map.put("tablePath", tablePath) + // updating params + val updatedFormat = storageFormat.copy(properties = map.asScala.toMap) + tableDesc.copy(storage = updatedFormat) + } else { + tableDesc + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala index b08c113..0d5a821 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala @@ -63,16 +63,15 @@ private[sql] case class AlterTableAddColumns( // completion of 1st operation but as look up relation is called before it will have the // older carbon table and this can lead to inconsistent state in the system. Therefor look // up relation should be called after acquiring the lock - carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + carbonTable = metastore .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] .tableMeta.carbonTable // get the latest carbon table and check for column existence // read the latest schema file val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, carbonTable.getCarbonTableIdentifier) - val tableMetadataFile = carbonTablePath.getSchemaFilePath - val thriftTableInfo: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore - .readSchemaFile(tableMetadataFile) + val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) val schemaConverter = new ThriftWrapperSchemaConverterImpl() val wrapperTableInfo = schemaConverter .fromExternalToWrapperTableInfo(thriftTableInfo, @@ -104,8 +103,8 @@ private[sql] case class AlterTableAddColumns( LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName") LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName") } catch { - case e: Exception => LOGGER - .error("Alter table add columns failed :" + e.getMessage) + case e: Exception => + LOGGER.error(e, "Alter table add columns failed") if (newCols.nonEmpty) { LOGGER.info("Cleaning up the dictionary files as alter table add operation failed") new AlterTableDropColumnRDD(sparkSession.sparkContext, @@ -147,9 +146,9 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR val newTableName = newTableIdentifier.table.toLowerCase LOGGER.audit(s"Rename table request has been received for $oldDatabaseName.$oldTableName") LOGGER.info(s"Rename table request has been received for $oldDatabaseName.$oldTableName") + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore val relation: CarbonRelation = - CarbonEnv.getInstance(sparkSession).carbonMetastore - .lookupRelation(oldTableIdentifier.database, oldTableName)(sparkSession) + metastore.lookupRelation(oldTableIdentifier.database, oldTableName)(sparkSession) .asInstanceOf[CarbonRelation] if (relation == null) { LOGGER.audit(s"Rename table request has failed. " + @@ -168,15 +167,14 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR locks = AlterTableUtil .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)( sparkSession) - carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore - .lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession) + carbonTable = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession) .asInstanceOf[CarbonRelation].tableMeta.carbonTable // get the latest carbon table and check for column existence val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, carbonTable.getCarbonTableIdentifier) - val tableMetadataFile = carbonTablePath.getSchemaFilePath - val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.getInstance(sparkSession) - .carbonMetastore.readSchemaFile(tableMetadataFile) + val tableMetadataFile = carbonTablePath.getPath + val tableInfo: org.apache.carbondata.format.TableInfo = + metastore.getThriftTableInfo(carbonTablePath)(sparkSession) val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis) schemaEvolutionEntry.setTableName(newTableName) timeStamp = System.currentTimeMillis() @@ -193,15 +191,13 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR } } val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName, - newTableName, - carbonTable.getCarbonTableIdentifier.getTableId) - val newTablePath = CarbonEnv.getInstance(sparkSession).carbonMetastore - .updateTableSchema(newTableIdentifier, + newTableName, carbonTable.getCarbonTableIdentifier.getTableId) + val newTablePath = metastore.updateTableSchema(newTableIdentifier, + carbonTable.getCarbonTableIdentifier, tableInfo, schemaEvolutionEntry, carbonTable.getStorePath)(sparkSession) - CarbonEnv.getInstance(sparkSession).carbonMetastore - .removeTableFromMetadata(oldDatabaseName, oldTableName) + metastore.removeTableFromMetadata(oldDatabaseName, oldTableName) sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive .runSqlHive( s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName") @@ -213,8 +209,8 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName") LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName") } catch { - case e: Exception => LOGGER - .error("Rename table failed: " + e.getMessage) + case e: Exception => + LOGGER.error(e, "Rename table failed: " + e.getMessage) if (carbonTable != null) { AlterTableUtil .revertRenameTableChanges(oldTableIdentifier, @@ -279,7 +275,8 @@ private[sql] case class AlterTableDropColumns( try { locks = AlterTableUtil .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession) - carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + carbonTable = metastore .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] .tableMeta.carbonTable val partitionInfo = carbonTable.getPartitionInfo(tableName) @@ -329,9 +326,8 @@ private[sql] case class AlterTableDropColumns( // read the latest schema file val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, carbonTable.getCarbonTableIdentifier) - val tableMetadataFile = carbonTablePath.getSchemaFilePath - val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.getInstance(sparkSession) - .carbonMetastore.readSchemaFile(tableMetadataFile) + val tableInfo: org.apache.carbondata.format.TableInfo = + metastore.getThriftTableInfo(carbonTablePath)(sparkSession) // maintain the deleted columns for schema evolution history var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]() val columnSchemaList = tableInfo.fact_table.table_columns.asScala @@ -393,7 +389,8 @@ private[sql] case class AlterTableDataTypeChange( try { locks = AlterTableUtil .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession) - carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + carbonTable = metastore .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] .tableMeta.carbonTable val columnName = alterTableDataTypeChangeModel.columnName @@ -415,9 +412,7 @@ private[sql] case class AlterTableDataTypeChange( // read the latest schema file val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, carbonTable.getCarbonTableIdentifier) - val tableMetadataFile = carbonTablePath.getSchemaFilePath - val tableInfo: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore - .readSchemaFile(tableMetadataFile) + val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) // maintain the added column for schema evolution history var addColumnSchema: ColumnSchema = null var deletedColumnSchema: ColumnSchema = null http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala index d2022be..609f39b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala @@ -18,9 +18,10 @@ package org.apache.spark.sql.hive.execution.command import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.{CarbonDropTableCommand, DropDatabaseCommand, ResetCommand, RunnableCommand, SetCommand} -import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} case class CarbonDropDatabaseCommand(command: DropDatabaseCommand) extends RunnableCommand { @@ -29,16 +30,19 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand) override def run(sparkSession: SparkSession): Seq[Row] = { val dbName = command.databaseName + var tablesInDB: Seq[TableIdentifier] = null + if (sparkSession.sessionState.catalog.listDatabases().exists(_.equalsIgnoreCase(dbName))) { + tablesInDB = sparkSession.sessionState.catalog.listTables(dbName) + } // DropHiveDB command will fail if cascade is false and one or more table exists in database val rows = command.run(sparkSession) - if (command.cascade) { - val tablesInDB = CarbonEnv.getInstance(sparkSession).carbonMetastore.getAllTables() - .filter(_.database.exists(_.equalsIgnoreCase(dbName))) + if (command.cascade && tablesInDB != null) { tablesInDB.foreach { tableName => CarbonDropTableCommand(true, tableName.database, tableName.table).run(sparkSession) } } - CarbonEnv.getInstance(sparkSession).carbonMetastore.dropDatabaseDirectory(dbName.toLowerCase) + CarbonUtil.dropDatabaseDirectory(dbName.toLowerCase, CarbonEnv.getInstance(sparkSession) + .carbonMetastore.storePath) rows } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala index 6087736..760cb06 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala @@ -16,13 +16,14 @@ */ package org.apache.spark.sql.execution.command -import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, InsertIntoCarbonTable, ShowLoadsCommand, SparkSession} +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand} +import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.spark.exception.MalformedCarbonCommandException /** @@ -61,7 +62,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { _, child: LogicalPlan, _, _) => ExecutedCommandExec(LoadTableByInsert(relation, child)) :: Nil case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) => - CarbonEnv.getInstance(sparkSession).carbonMetastore.createDatabaseDirectory(dbName) + CarbonUtil.createDatabaseDirectory(dbName, CarbonEnv.getInstance(sparkSession). + carbonMetastore.storePath) ExecutedCommandExec(createDb) :: Nil case drop@DropDatabaseCommand(dbName, ifExists, isCascade) => ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil @@ -127,6 +129,14 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { ExecutedCommandExec(CarbonSetCommand(set)) :: Nil case reset@ResetCommand => ExecutedCommandExec(CarbonResetCommand()) :: Nil + case org.apache.spark.sql.execution.datasources.CreateTable(tableDesc, mode, None) + if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER + && tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource") => + val updatedCatalog = + CarbonSource.updateCatalogTableWithCarbonSchema(tableDesc, sparkSession) + val cmd = + CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists = mode == SaveMode.Ignore) + ExecutedCommandExec(cmd) :: Nil case _ => Nil } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala index 2fccd0c..2c1de52 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala @@ -42,6 +42,7 @@ import org.apache.carbondata.core.statusmanager.{SegmentStatusManager, SegmentUp import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl +import org.apache.carbondata.hadoop.CarbonInputFormat import org.apache.carbondata.processing.exception.MultipleMatchingException import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType} import org.apache.carbondata.spark.DeleteDelataResultImpl @@ -107,7 +108,7 @@ private[sql] case class ProjectForDeleteCommand( CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString) case e: Exception => - LOG.error("Exception in Delete data operation " + e.getMessage) + LOG.error(e, "Exception in Delete data operation " + e.getMessage) // ****** start clean up. // In case of failure , clean all related delete delta files CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp) @@ -548,7 +549,7 @@ object deleteExecution { val (carbonInputFormat, job) = QueryPlanUtil.createCarbonInputFormat(absoluteTableIdentifier) - + CarbonInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo) val keyRdd = deleteRdd.map({ row => val tupleId: String = row .getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index ce66733..88e89ad 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -29,14 +29,13 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.hive.{CarbonMetastore, CarbonRelation} +import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.util.FileUtils import org.codehaus.jackson.map.ObjectMapper import org.apache.carbondata.api.CarbonStore import org.apache.carbondata.common.constants.LoggerAction import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.dictionary.server.DictionaryServer @@ -48,6 +47,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.format import org.apache.carbondata.processing.constants.TableOptionConstant import org.apache.carbondata.processing.etl.DataLoadingException import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel} @@ -186,23 +186,24 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru } else { // Add Database to catalog and persist val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore - // Need to fill partitioner class when we support partition - val tablePath = catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession) + val (tablePath, carbonSchemaString) = + catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession) if (createDSTable) { try { val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size) cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f) cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f) + sparkSession.sql( s"""CREATE TABLE $dbName.$tbName |(${ fields.map(f => f.rawSchema).mkString(",") }) |USING org.apache.spark.sql.CarbonSource""".stripMargin + - s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath "$tablePath") """) + s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin + + s""""$tablePath" $carbonSchemaString) """) } catch { case e: Exception => val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName)) // call the drop table to delete the created table. - CarbonEnv.getInstance(sparkSession).carbonMetastore .dropTable(catalog.storePath, identifier)(sparkSession) @@ -234,9 +235,9 @@ case class DeleteLoadsById( def run(sparkSession: SparkSession): Seq[Row] = { Checker.validateTableExists(databaseNameOp, tableName, sparkSession) - val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore - .getTableFromMetadata(getDB.getDatabaseName(databaseNameOp, sparkSession), tableName) - .map(_.carbonTable).getOrElse(null) + val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore. + lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. + tableMeta.carbonTable CarbonStore.deleteLoadById( loadids, getDB.getDatabaseName(databaseNameOp, sparkSession), @@ -259,9 +260,9 @@ case class DeleteLoadsByLoadDate( def run(sparkSession: SparkSession): Seq[Row] = { Checker.validateTableExists(databaseNameOp, tableName, sparkSession) - val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore - .getTableFromMetadata(getDB.getDatabaseName(databaseNameOp, sparkSession), tableName) - .map(_.carbonTable).getOrElse(null) + val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore. + lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. + tableMeta.carbonTable CarbonStore.deleteLoadByDate( loadDate, getDB.getDatabaseName(databaseNameOp, sparkSession), @@ -279,13 +280,13 @@ object LoadTable { sqlContext: SQLContext, model: DictionaryLoadModel, noDictDimension: Array[CarbonDimension]): Unit = { - + val sparkSession = sqlContext.sparkSession val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation, model.table) - val schemaFilePath = carbonTablePath.getSchemaFilePath + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore // read TableInfo - val tableInfo = CarbonMetastore.readSchemaFileToThriftTable(schemaFilePath) + val tableInfo: format.TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) // modify TableInfo val columns = tableInfo.getFact_table.getTable_columns @@ -294,23 +295,21 @@ object LoadTable { columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY) } } + val entry = tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) + entry.setTime_stamp(System.currentTimeMillis()) // write TableInfo - CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo) - - val catalog = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore + metastore.updateTableSchema(carbonTablePath.getCarbonTableIdentifier, + carbonTablePath.getCarbonTableIdentifier, + tableInfo, entry, carbonTablePath.getPath)(sparkSession) // update the schema modified time - catalog.updateSchemasUpdatedTime(catalog.touchSchemaFileSystemTime( + metastore.updateAndTouchSchemasUpdatedTime( carbonLoadModel.getDatabaseName, - carbonLoadModel.getTableName)) - - // update Metadata - catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo, - model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath) + carbonLoadModel.getTableName) // update CarbonDataLoadSchema - val carbonTable = catalog.lookupRelation(Option(model.table.getDatabaseName), + val carbonTable = metastore.lookupRelation(Option(model.table.getDatabaseName), model.table.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].tableMeta .carbonTable carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) @@ -628,6 +627,14 @@ case class LoadTable( LOGGER.audit(s"Cannot use single_pass=true for $dbName.$tableName during the first load") carbonLoadModel.setUseOnePass(false) } + // Create table and metadata folders if not exist + val carbonTablePath = CarbonStorePath + .getCarbonTablePath(storePath, table.getCarbonTableIdentifier) + val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath + val fileType = FileFactory.getFileType(metadataDirectoryPath) + if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) { + FileFactory.mkdirs(metadataDirectoryPath, fileType) + } if (carbonLoadModel.getUseOnePass) { val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier @@ -691,8 +698,7 @@ case class LoadTable( server, dataFrame, updateModel) - } - else { + } else { val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) { val fields = dataFrame.get.schema.fields import org.apache.spark.sql.functions.udf @@ -726,6 +732,7 @@ case class LoadTable( } else { (dataFrame, dataFrame) } + GlobalDictionaryUtil .generateGlobalDictionary( sparkSession.sqlContext, @@ -794,9 +801,9 @@ private[sql] case class DeleteLoadByDate( def run(sparkSession: SparkSession): Seq[Row] = { Checker.validateTableExists(databaseNameOp, tableName, sparkSession) - val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore - .getTableFromMetadata(getDB.getDatabaseName(databaseNameOp, sparkSession), tableName) - .map(_.carbonTable).getOrElse(null) + val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore. + lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. + tableMeta.carbonTable CarbonStore.deleteLoadByDate( loadDate, getDB.getDatabaseName(databaseNameOp, sparkSession), @@ -818,9 +825,7 @@ case class CleanFiles( val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore val relation = catalog .lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation] - val carbonTable = catalog - .getTableFromMetadata(getDB.getDatabaseName(databaseNameOp, sparkSession), tableName) - .map(_.carbonTable).getOrElse(null) + val carbonTable = relation.tableMeta.carbonTable CarbonStore.cleanFiles( getDB.getDatabaseName(databaseNameOp, sparkSession), tableName, @@ -839,9 +844,9 @@ case class ShowLoads( def run(sparkSession: SparkSession): Seq[Row] = { Checker.validateTableExists(databaseNameOp, tableName, sparkSession) - val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore - .getTableFromMetadata(getDB.getDatabaseName(databaseNameOp, sparkSession), tableName) - .map(_.carbonTable).getOrElse(null) + val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore. + lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. + tableMeta.carbonTable CarbonStore.showSegments( getDB.getDatabaseName(databaseNameOp, sparkSession), tableName, @@ -867,15 +872,11 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean, catalog.checkSchemasModifiedTimeAndReloadTables() val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer() try { - val carbonTable = catalog.getTableFromMetadata(dbName, tableName).map(_.carbonTable).orNull - locksToBeAcquired foreach { - lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTable, lock) + locksToBeAcquired foreach { + lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTableIdentifier, lock) } LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]") - if (null != carbonTable) { - // clear driver B-tree and dictionary cache - ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable) - } + CarbonEnv.getInstance(sparkSession).carbonMetastore .dropTable(storePath, identifier)(sparkSession) LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
