[CARBONDATA-1311] Added carbon storelocation to spark warehouse. And extracted storelocation out of metastore
1. Changed default storelocation to sparkwouse, ,so if user does not provide storelocation then it chooses sparkwarelocation as store location. 2. Changed file metastore and avoid reading all schema files once keep it in memory, instead implemented cache based storage where it reads when request comes. 3. Extracted store location out of metastore and refactored carbonmetastore. This closes #1176 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9d8abbdf Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9d8abbdf Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9d8abbdf Branch: refs/heads/metadata Commit: 9d8abbdf955d0f42ff4572995719efd561bea961 Parents: 8d3c9bf Author: Ravindra Pesala <[email protected]> Authored: Fri Jul 14 11:12:57 2017 +0530 Committer: Jacky Li <[email protected]> Committed: Thu Jul 20 10:52:18 2017 +0800 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 4 - .../core/metadata/AbsoluteTableIdentifier.java | 5 +- .../carbondata/core/util/CarbonProperties.java | 4 - .../apache/carbondata/core/util/CarbonUtil.java | 28 +- .../spark/sql/common/util/QueryTest.scala | 6 +- .../carbondata/spark/load/CarbonLoaderUtil.java | 6 +- .../org/apache/carbondata/api/CarbonStore.scala | 3 +- .../spark/sql/test/TestQueryExecutor.scala | 1 - .../spark/thriftserver/CarbonThriftServer.scala | 4 +- .../org/apache/spark/sql/CarbonContext.scala | 12 +- .../sql/CarbonDatasourceHadoopRelation.scala | 3 +- .../apache/spark/sql/hive/CarbonMetastore.scala | 4 +- .../spark/thriftserver/CarbonThriftServer.scala | 4 +- .../carbondata/spark/util/CarbonSparkUtil.scala | 3 +- .../spark/sql/CarbonDataFrameWriter.scala | 2 +- .../spark/sql/CarbonDictionaryDecoder.scala | 4 +- .../scala/org/apache/spark/sql/CarbonEnv.scala | 12 +- .../org/apache/spark/sql/CarbonSession.scala | 13 +- .../org/apache/spark/sql/CarbonSource.scala | 54 ++- .../execution/CarbonLateDecodeStrategy.scala | 2 +- .../execution/command/AlterTableCommands.scala | 16 +- .../execution/command/CarbonHiveCommands.scala | 4 +- .../sql/execution/command/DDLStrategy.scala | 3 +- .../execution/command/carbonTableSchema.scala | 35 +- .../spark/sql/hive/CarbonFileMetastore.scala | 383 +++++++++---------- .../spark/sql/hive/CarbonHiveMetaStore.scala | 132 +------ .../apache/spark/sql/hive/CarbonMetaStore.scala | 54 ++- .../spark/sql/hive/CarbonSessionState.scala | 7 +- .../sql/test/Spark2TestQueryExecutor.scala | 2 +- .../org/apache/spark/util/AlterTableUtil.scala | 12 +- .../org/apache/spark/util/CleanFiles.scala | 3 +- .../org/apache/spark/util/Compaction.scala | 3 +- .../apache/spark/util/DeleteSegmentByDate.scala | 3 +- .../apache/spark/util/DeleteSegmentById.scala | 3 +- .../org/apache/spark/util/ShowSegments.scala | 3 +- .../org/apache/spark/util/TableLoader.scala | 3 +- .../apache/spark/util/CarbonCommandSuite.scala | 27 +- .../carbondata/processing/merger/TableMeta.java | 4 +- .../util/CarbonDataProcessorUtil.java | 6 +- 39 files changed, 410 insertions(+), 467 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 55a292e..f6e5c62 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -71,10 +71,6 @@ public final class CarbonCommonConstants { @CarbonProperty public static final String SORT_SIZE = "carbon.sort.size"; /** - * default location of the carbon member, hierarchy and fact files - */ - public static final String STORE_LOCATION_DEFAULT_VAL = "../carbon.store"; - /** * CARDINALITY_INCREMENT_DEFAULT_VALUE */ public static final int CARDINALITY_INCREMENT_VALUE_DEFAULT_VAL = 10; http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java index 3c39145..22faaf2 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java @@ -21,7 +21,6 @@ import java.io.Serializable; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.util.CarbonUtil; /** * identifier which will have store path and carbon table identifier @@ -68,9 +67,9 @@ public class AbsoluteTableIdentifier implements Serializable { return carbonTableIdentifier; } - public static AbsoluteTableIdentifier from(String dbName, String tableName) { + public static AbsoluteTableIdentifier from(String storePath, String dbName, String tableName) { CarbonTableIdentifier identifier = new CarbonTableIdentifier(dbName, tableName, ""); - return new AbsoluteTableIdentifier(CarbonUtil.getCarbonStorePath(), identifier); + return new AbsoluteTableIdentifier(storePath, identifier); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index d14b7ab..2f5874b 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -81,10 +81,6 @@ public final class CarbonProperties { } catch (IllegalAccessException e) { LOGGER.error("Illelagal access to declared field" + e.getMessage()); } - if (null == carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION)) { - carbonProperties.setProperty(CarbonCommonConstants.STORE_LOCATION, - CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL); - } validateBlockletSize(); validateNumCores(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index b9c164a..59f8cd8 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -730,15 +730,6 @@ public final class CarbonUtil { .startsWith("file://") || lowerPath.startsWith(ALLUXIO_PREFIX); } - public static String getCarbonStorePath() { - CarbonProperties prop = CarbonProperties.getInstance(); - if (null == prop) { - return null; - } - return prop.getProperty(CarbonCommonConstants.STORE_LOCATION, - CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL); - } - /** * This method will check the existence of a file at a given path */ @@ -1800,6 +1791,25 @@ public final class CarbonUtil { } /** + * Removes schema from properties + * @param properties + * @return + */ + public static Map<String, String> removeSchemaFromMap(Map<String, String> properties) { + Map<String, String> newMap = new HashMap<>(); + newMap.putAll(properties); + String partsNo = newMap.get("carbonSchemaPartsNo"); + if (partsNo == null) { + return newMap; + } + int no = Integer.parseInt(partsNo); + for (int i = 0; i < no; i++) { + newMap.remove("carbonSchema" + i); + } + return newMap; + } + + /** * This method will read the schema file from a given path * * @param schemaFilePath http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala index 9912ec4..9926c57 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala @@ -27,6 +27,9 @@ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.test.TestQueryExecutor import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + class QueryTest extends PlanTest { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) @@ -85,7 +88,8 @@ class QueryTest extends PlanTest { val sqlContext: SQLContext = TestQueryExecutor.INSTANCE.sqlContext - val storeLocation = TestQueryExecutor.storeLocation + lazy val storeLocation = CarbonProperties.getInstance(). + getProperty(CarbonCommonConstants.STORE_LOCATION) val resourcesPath = TestQueryExecutor.resourcesPath val integrationPath = TestQueryExecutor.integrationPath } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java index 5b603aa..5cef14a 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java @@ -213,8 +213,10 @@ public final class CarbonLoaderUtil { String tempLocationKey = CarbonDataProcessorUtil .getTempStoreLocationKey(databaseName, tableName, loadModel.getTaskNo(), isCompactionFlow); // form local store location - final String localStoreLocation = CarbonProperties.getInstance() - .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL); + final String localStoreLocation = CarbonProperties.getInstance().getProperty(tempLocationKey); + if (localStoreLocation == null) { + throw new RuntimeException("Store location not set for the key " + tempLocationKey); + } // submit local folder clean up in another thread so that main thread execution is not blocked ExecutorService localFolderDeletionService = Executors.newFixedThreadPool(1); try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala index 45719fc..dc37360 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala @@ -157,8 +157,9 @@ object CarbonStore { def isSegmentValid( dbName: String, tableName: String, + storePath: String, segmentId: String): Boolean = { - val identifier = AbsoluteTableIdentifier.from(dbName, tableName) + val identifier = AbsoluteTableIdentifier.from(storePath, dbName, tableName) val validAndInvalidSegments: SegmentStatusManager.ValidAndInvalidSegmentsInfo = new SegmentStatusManager( identifier).getValidAndInvalidSegments http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala index b76bca3..149e3b1 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala @@ -53,7 +53,6 @@ object TestQueryExecutor { val INSTANCE = lookupQueryExecutor.newInstance().asInstanceOf[TestQueryExecutorRegister] CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE") - .addProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation) .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords") private def lookupQueryExecutor: Class[_] = { ServiceLoader.load(classOf[TestQueryExecutorRegister], Utils.getContextOrSparkClassLoader) http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala index b8ba9f7..f8275d1 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala @@ -57,8 +57,8 @@ object CarbonThriftServer { "Using default Value and proceeding") Thread.sleep(30000) } - - val cc = new CarbonContext(sc, args.head) + val storePath = if (args.length > 0) args.head else null + val cc = new CarbonContext(sc, storePath) HiveThriftServer2.startWithContext(cc) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala index 1aeda95..da4b210 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala @@ -44,7 +44,7 @@ class CarbonContext( def this(sc: SparkContext) = { this(sc, - new File(CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL).getCanonicalPath, + null, new File(CarbonCommonConstants.METASTORE_LOCATION_DEFAULT_VAL).getCanonicalPath) } @@ -66,8 +66,14 @@ class CarbonContext( @transient override lazy val catalog = { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.STORE_LOCATION, storePath) + val carbonProperties = CarbonProperties.getInstance() + if (storePath != null) { + carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath) + // In case if it is in carbon.properties for backward compatible + } else if (carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION) == null) { + carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, + conf.getConfString("spark.sql.warehouse.dir")) + } new CarbonMetastore(this, storePath, metadataHive, queryId) with OverrideCatalog } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index 2fc93e6..71ef6a6 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -69,7 +69,8 @@ private[sql] case class CarbonDatasourceHadoopRelation( carbonTable.getDatabaseName, carbonTable.getFactTableName, CarbonSparkUtil.createSparkMeta(carbonTable), - new TableMeta(carbonTable.getCarbonTableIdentifier, paths.head, carbonTable), + new TableMeta(carbonTable.getCarbonTableIdentifier, + paths.head, absIdentifier.getTablePath, carbonTable), None )(sqlContext) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala index e8d3907..7790f59 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala @@ -236,7 +236,7 @@ class CarbonMetastore(hiveContext: HiveContext, val storePath: String, CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath, - carbonTable) + null, carbonTable) } } }) @@ -281,7 +281,7 @@ class CarbonMetastore(hiveContext: HiveContext, val storePath: String, tableInfo.setMetaDataFilepath(schemaMetadataPath) tableInfo.setStorePath(storePath) CarbonMetadata.getInstance().loadTableMetadata(tableInfo) - val tableMeta = new TableMeta(carbonTableIdentifier, storePath, + val tableMeta = new TableMeta(carbonTableIdentifier, storePath, null, CarbonMetadata.getInstance().getCarbonTable(dbName + "_" + tableName)) val fileType = FileFactory.getFileType(schemaMetadataPath) http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala index aba6891..34ac940 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala @@ -53,9 +53,9 @@ object CarbonThriftServer { System.setProperty("carbon.properties.filepath", sparkConf.get("carbon.properties.filepath")) } - CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION, args.head) + val storePath = if (args.length > 0) args.head else null - val spark = builder.getOrCreateCarbonSession(args.head) + val spark = builder.getOrCreateCarbonSession(storePath) val warmUpTime = CarbonProperties.getInstance().getProperty("carbon.spark.warmUpTime", "5000") try { Thread.sleep(Integer.parseInt(warmUpTime)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala index d1d3015..de7f3fb 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala @@ -48,7 +48,8 @@ object CarbonSparkUtil { def createCarbonRelation(tableInfo: TableInfo, tablePath: String): CarbonRelation = { val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) val table = CarbonTable.buildFromTableInfo(tableInfo) - val meta = new TableMeta(identifier.getCarbonTableIdentifier, identifier.getStorePath, table) + val meta = new TableMeta(identifier.getCarbonTableIdentifier, + identifier.getStorePath, tablePath, table) CarbonRelation(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName, CarbonSparkUtil.createSparkMeta(table), meta) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala index 1054c62..805a421 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala @@ -58,7 +58,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) { */ private def loadTempCSV(options: CarbonOption): Unit = { // temporary solution: write to csv file, then load the csv into carbon - val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.storePath + val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).storePath val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR) .append("tempCSV") .append(CarbonCommonConstants.UNDERSCORE).append(options.dbName) http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala index 33091aa..43f6a21 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala @@ -67,7 +67,7 @@ case class CarbonDictionaryDecoder( override def doExecute(): RDD[InternalRow] = { attachTree(this, "execute") { - val storePath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + val storePath = CarbonEnv.getInstance(sparkSession).storePath val absoluteTableIdentifiers = relations.map { relation => val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier) @@ -121,7 +121,7 @@ case class CarbonDictionaryDecoder( override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { - val storePath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + val storePath = CarbonEnv.getInstance(sparkSession).storePath val absoluteTableIdentifiers = relations.map { relation => val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier) http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index d19eb39..9d10ea0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -40,6 +40,8 @@ class CarbonEnv { var carbonSessionInfo: CarbonSessionInfo = _ + var storePath: String = _ + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) // set readsupport class global so that the executor can get it. @@ -61,10 +63,14 @@ class CarbonEnv { // add session params after adding DefaultCarbonParams config.addDefaultCarbonSessionParams() carbonMetastore = { - val storePath = - CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION) + val properties = CarbonProperties.getInstance() + storePath = properties.getProperty(CarbonCommonConstants.STORE_LOCATION) + if (storePath == null) { + storePath = sparkSession.conf.get("spark.sql.warehouse.dir") + properties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath) + } LOGGER.info(s"carbon env initial: $storePath") - CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf, storePath) + CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf) } CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true") initialized = true http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index b436891..7390cf3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -65,7 +65,7 @@ object CarbonSession { def getOrCreateCarbonSession(): SparkSession = { getOrCreateCarbonSession( - new File(CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL).getCanonicalPath, + null, new File(CarbonCommonConstants.METASTORE_LOCATION_DEFAULT_VAL).getCanonicalPath) } @@ -145,9 +145,14 @@ object CarbonSession { } sc } - - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.STORE_LOCATION, storePath) + val carbonProperties = CarbonProperties.getInstance() + if (storePath != null) { + carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath) + // In case if it is in carbon.properties for backward compatible + } else if (carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION) == null) { + carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, + sparkContext.conf.get("spark.sql.warehouse.dir")) + } session = new CarbonSession(sparkContext) options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } SparkSession.setDefaultSession(session) http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index 498ea03..bec163b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -25,15 +25,19 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.execution.CarbonLateDecodeStrategy -import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, TableModel, TableNewProcessor} +import org.apache.spark.sql.execution.command.{CreateTable, TableModel, TableNewProcessor} +import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.optimizer.CarbonLateDecodeRule import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.{DecimalType, StructType} +import org.apache.spark.sql.types.StructType import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.schema import org.apache.carbondata.core.metadata.schema.table.TableInfo import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.spark.CarbonOption import org.apache.carbondata.spark.exception.MalformedCarbonCommandException @@ -146,7 +150,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider private def createTableIfNotExists(sparkSession: SparkSession, parameters: Map[String, String], - dataSchema: StructType): String = { + dataSchema: StructType) = { val dbName: String = parameters.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase @@ -158,7 +162,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider } else { CarbonEnv.getInstance(sparkSession).carbonMetastore .lookupRelation(Option(dbName), tableName)(sparkSession) - CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName" + CarbonEnv.getInstance(sparkSession).storePath + s"/$dbName/$tableName" } } catch { case ex: NoSuchTableException => @@ -168,7 +172,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider dbName, tableName) CreateTable(cm, false).run(sparkSession) - CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName" + getPathForTable(sparkSession, dbName, tableName, parameters) case ex: Exception => throw new Exception("do not have dbname and tablename for carbon table", ex) } @@ -195,9 +199,9 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider if (parameters.contains("tablePath")) { parameters.get("tablePath").get } else { - CarbonEnv.getInstance(sparkSession).carbonMetastore - .lookupRelation(Option(dbName), tableName)(sparkSession) - CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName" + val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore + .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] + relation.tableMeta.tablePath } } catch { case ex: Exception => @@ -234,22 +238,46 @@ object CarbonSource { val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore val storageFormat = tableDesc.storage val properties = storageFormat.properties - if (metaStore.isReadFromHiveMetaStore && !properties.contains("carbonSchemaPartsNo")) { + if (!properties.contains("carbonSchemaPartsNo")) { val dbName: String = properties.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase val tableName: String = properties.getOrElse("tableName", "").toLowerCase val model = createTableInfoFromParams(properties, tableDesc.schema, dbName, tableName) val tableInfo: TableInfo = TableNewProcessor(model) - val (tablePath, carbonSchemaString) = - metaStore.createTableFromThrift(tableInfo, dbName, tableName)(sparkSession) - val map = CarbonUtil.convertToMultiStringMap(tableInfo) + val tablePath = CarbonEnv.getInstance(sparkSession).storePath + "/" + dbName + "/" + tableName + val schemaEvolutionEntry = new schema.SchemaEvolutionEntry + schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime) + tableInfo.getFactTable.getSchemaEvalution. + getSchemaEvolutionEntryList.add(schemaEvolutionEntry) + val map = if (metaStore.isReadFromHiveMetaStore) { + val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier) + val schemaMetadataPath = + CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath) + tableInfo.setMetaDataFilepath(schemaMetadataPath) + tableInfo.setStorePath(tableIdentifier.getStorePath) + CarbonUtil.convertToMultiStringMap(tableInfo) + } else { + metaStore.saveToDisk(tableInfo, tablePath) + new java.util.HashMap[String, String]() + } properties.foreach(e => map.put(e._1, e._2)) map.put("tablePath", tablePath) // updating params val updatedFormat = storageFormat.copy(properties = map.asScala.toMap) tableDesc.copy(storage = updatedFormat) } else { - tableDesc + val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava) + if (!metaStore.isReadFromHiveMetaStore) { + // save to disk + metaStore.saveToDisk(tableInfo, properties.get("tablePath").get) + // remove schema string from map as we don't store carbon schema to hive metastore + val map = CarbonUtil.removeSchemaFromMap(properties.asJava) + val updatedFormat = storageFormat.copy(properties = map.asScala.toMap) + tableDesc.copy(storage = updatedFormat) + } else { + tableDesc + } } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala index 1cc6668..33bba8f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala @@ -102,7 +102,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { CarbonAliasDecoderRelation(), rdd, output, - CarbonEnv.getInstance(SparkSession.getActiveSession.get).carbonMetastore.storePath, + CarbonEnv.getInstance(SparkSession.getActiveSession.get).storePath, table.carbonTable.getTableInfo.serialize()) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala index 0d5a821..17e456d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala @@ -22,6 +22,7 @@ import scala.collection.mutable.ListBuffer import scala.language.implicitConversions import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState} import org.apache.spark.util.AlterTableUtil @@ -29,7 +30,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} -import org.apache.carbondata.core.metadata.CarbonTableIdentifier +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.CarbonTable @@ -167,11 +168,12 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR locks = AlterTableUtil .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)( sparkSession) - carbonTable = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession) - .asInstanceOf[CarbonRelation].tableMeta.carbonTable + val tableMeta = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession) + .asInstanceOf[CarbonRelation].tableMeta + carbonTable = tableMeta.carbonTable // get the latest carbon table and check for column existence - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, - carbonTable.getCarbonTableIdentifier) + val carbonTablePath = CarbonStorePath. + getCarbonTablePath(AbsoluteTableIdentifier.fromTablePath(tableMeta.tablePath)) val tableMetadataFile = carbonTablePath.getPath val tableInfo: org.apache.carbondata.format.TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) @@ -196,7 +198,7 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR carbonTable.getCarbonTableIdentifier, tableInfo, schemaEvolutionEntry, - carbonTable.getStorePath)(sparkSession) + tableMeta.tablePath)(sparkSession) metastore.removeTableFromMetadata(oldDatabaseName, oldTableName) sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive .runSqlHive( @@ -206,6 +208,8 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" + s"('tableName'='$newTableName', " + s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')") + sparkSession.catalog.refreshTable(TableIdentifier(newTableName, + Some(oldDatabaseName)).quotedString) LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName") LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName") } catch { http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala index 609f39b..2731104 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala @@ -41,8 +41,8 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand) CarbonDropTableCommand(true, tableName.database, tableName.table).run(sparkSession) } } - CarbonUtil.dropDatabaseDirectory(dbName.toLowerCase, CarbonEnv.getInstance(sparkSession) - .carbonMetastore.storePath) + CarbonUtil.dropDatabaseDirectory(dbName.toLowerCase, + CarbonEnv.getInstance(sparkSession).storePath) rows } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala index 760cb06..18d2dc7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala @@ -62,8 +62,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { _, child: LogicalPlan, _, _) => ExecutedCommandExec(LoadTableByInsert(relation, child)) :: Nil case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) => - CarbonUtil.createDatabaseDirectory(dbName, CarbonEnv.getInstance(sparkSession). - carbonMetastore.storePath) + CarbonUtil.createDatabaseDirectory(dbName, CarbonEnv.getInstance(sparkSession).storePath) ExecutedCommandExec(createDb) :: Nil case drop@DropDatabaseCommand(dbName, ifExists, isCascade) => ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index cc18fa3..eeb2022 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -40,7 +40,7 @@ import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOp import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.dictionary.server.DictionaryServer import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage} -import org.apache.carbondata.core.metadata.CarbonTableIdentifier +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.TableInfo import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension @@ -156,7 +156,7 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName) carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName) - carbonLoadModel.setStorePath(relation.tableMeta.storePath) + carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath) var storeLocation = CarbonProperties.getInstance .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH, @@ -196,7 +196,9 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru } override def processSchema(sparkSession: SparkSession): Seq[Row] = { - CarbonEnv.getInstance(sparkSession).carbonMetastore.checkSchemasModifiedTimeAndReloadTables() + val storePath = CarbonEnv.getInstance(sparkSession).storePath + CarbonEnv.getInstance(sparkSession).carbonMetastore. + checkSchemasModifiedTimeAndReloadTables(storePath) val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) cm.databaseName = getDB.getDatabaseName(cm.databaseNameOp, sparkSession) val tbName = cm.tableName @@ -218,10 +220,11 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru sys.error(s"Table [$tbName] already exists under database [$dbName]") } } else { + val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName) // Add Database to catalog and persist val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore - val (tablePath, carbonSchemaString) = - catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession) + val tablePath = tableIdentifier.getTablePath + val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath) if (createDSTable) { try { val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size) @@ -239,7 +242,7 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName)) // call the drop table to delete the created table. CarbonEnv.getInstance(sparkSession).carbonMetastore - .dropTable(catalog.storePath, identifier)(sparkSession) + .dropTable(tablePath, identifier)(sparkSession) LOGGER.audit(s"Table creation with Database name [$dbName] " + s"and Table name [$tbName] failed") @@ -332,9 +335,7 @@ object LoadTable { tableInfo, entry, carbonTablePath.getPath)(sparkSession) // update the schema modified time - metastore.updateAndTouchSchemasUpdatedTime( - carbonLoadModel.getDatabaseName, - carbonLoadModel.getTableName) + metastore.updateAndTouchSchemasUpdatedTime(model.hdfsLocation) // update CarbonDataLoadSchema val carbonTable = metastore.lookupRelation(Option(model.table.getDatabaseName), @@ -526,7 +527,7 @@ case class LoadTable( val carbonLoadModel = new CarbonLoadModel() carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName) carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName) - carbonLoadModel.setStorePath(relation.tableMeta.storePath) + carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath) val table = relation.tableMeta.carbonTable carbonLoadModel.setTableName(table.getFactTableName) @@ -882,7 +883,10 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean, val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "") val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK) val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore - catalog.checkSchemasModifiedTimeAndReloadTables() + val tableIdentifier = + AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath, + dbName.toLowerCase, tableName.toLowerCase) + catalog.checkSchemasModifiedTimeAndReloadTables(tableIdentifier.getStorePath) val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer() try { locksToBeAcquired foreach { @@ -891,7 +895,7 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean, LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]") CarbonEnv.getInstance(sparkSession).carbonMetastore - .dropTable(catalog.storePath, identifier)(sparkSession) + .dropTable(tableIdentifier.getTablePath, identifier)(sparkSession) LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]") } catch { case ex: Exception => @@ -910,10 +914,11 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean, override def processData(sparkSession: SparkSession): Seq[Row] = { // delete the table folder val dbName = getDB.getDatabaseName(databaseNameOp, sparkSession) - val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "") val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore - val metadataFilePath = CarbonStorePath - .getCarbonTablePath(catalog.storePath, carbonTableIdentifier).getMetadataDirectoryPath + val tableIdentifier = + AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath, dbName, tableName) + val metadataFilePath = + CarbonStorePath.getCarbonTablePath(tableIdentifier).getMetadataDirectoryPath val fileType = FileFactory.getFileType(metadataFilePath) if (FileFactory.isFileExist(metadataFilePath, fileType)) { val file = FileFactory.getCarbonFile(metadataFilePath, fileType) http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index 549841b..2407054 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -22,22 +22,23 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.ArrayBuffer -import org.apache.spark.sql.{RuntimeConfig, SparkSession} +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, RuntimeConfig, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.datastore.impl.FileFactory.FileType import org.apache.carbondata.core.fileoperations.FileWriteOperation import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl +import org.apache.carbondata.core.metadata.schema +import org.apache.carbondata.core.metadata.schema.table import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants} -import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil} +import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.core.writer.ThriftWriter import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} @@ -62,7 +63,7 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) { } } -class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends CarbonMetaStore { +class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore { @transient val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog") @@ -77,7 +78,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca System.nanoTime() + "" } - lazy val metadata = loadMetadata(storePath, nextQueryId) + val metadata = MetaData(new ArrayBuffer[TableMeta]()) /** @@ -90,9 +91,22 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca override def createCarbonRelation(parameters: Map[String, String], absIdentifier: AbsoluteTableIdentifier, sparkSession: SparkSession): CarbonRelation = { - lookupRelation(TableIdentifier(absIdentifier.getCarbonTableIdentifier.getTableName, - Some(absIdentifier.getCarbonTableIdentifier.getDatabaseName)))(sparkSession) - .asInstanceOf[CarbonRelation] + val database = absIdentifier.getCarbonTableIdentifier.getDatabaseName + val tableName = absIdentifier.getCarbonTableIdentifier.getTableName + val tables = getTableFromMetadataCache(database, tableName) + tables match { + case Some(t) => + CarbonRelation(database, tableName, + CarbonSparkUtil.createSparkMeta(t.carbonTable), t) + case None => + readCarbonSchema(absIdentifier) match { + case Some(meta) => + CarbonRelation(database, tableName, + CarbonSparkUtil.createSparkMeta(meta.carbonTable), meta) + case None => + throw new NoSuchTableException(database, tableName) + } + } } def lookupRelation(dbName: Option[String], tableName: String) @@ -100,20 +114,21 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca lookupRelation(TableIdentifier(tableName, dbName))(sparkSession) } - def lookupRelation(tableIdentifier: TableIdentifier) + override def lookupRelation(tableIdentifier: TableIdentifier) (sparkSession: SparkSession): LogicalPlan = { - checkSchemasModifiedTimeAndReloadTables() val database = tableIdentifier.database.getOrElse( - sparkSession.catalog.currentDatabase - ) - val tables = getTableFromMetadata(database, tableIdentifier.table, true) - tables match { - case Some(t) => - CarbonRelation(database, tableIdentifier.table, - CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head) - case None => - throw new NoSuchTableException(database, tableIdentifier.table) + sparkSession.catalog.currentDatabase) + val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match { + case SubqueryAlias(_, + LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), + _) => + carbonDatasourceHadoopRelation.carbonRelation + case LogicalRelation( + carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) => + carbonDatasourceHadoopRelation.carbonRelation + case _ => throw new NoSuchTableException(database, tableIdentifier.table) } + relation } /** @@ -123,8 +138,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca * @param tableName * @return */ - def getTableFromMetadata(database: String, - tableName: String, readStore: Boolean = false): Option[TableMeta] = { + def getTableFromMetadataCache(database: String, tableName: String): Option[TableMeta] = { metadata.tablesMeta .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) && c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName)) @@ -136,99 +150,48 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca tableExists(TableIdentifier(table, databaseOp))(sparkSession) } - def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = { - checkSchemasModifiedTimeAndReloadTables() - val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) - val tables = metadata.tablesMeta.filter( - c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) && - c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table)) - tables.nonEmpty - } - - def loadMetadata(metadataPath: String, queryId: String): MetaData = { - val recorder = CarbonTimeStatisticsFactory.createDriverRecorder() - val statistic = new QueryStatistic() - // creating zookeeper instance once. - // if zookeeper is configured as carbon lock type. - val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null) - if (null != zookeeperurl) { - CarbonProperties.getInstance - .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl) - } - if (metadataPath == null) { - return null - } - // if no locktype is configured and store type is HDFS set HDFS lock as default - if (null == CarbonProperties.getInstance - .getProperty(CarbonCommonConstants.LOCK_TYPE) && - FileType.HDFS == FileFactory.getFileType(metadataPath)) { - CarbonProperties.getInstance - .addProperty(CarbonCommonConstants.LOCK_TYPE, - CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS - ) - LOGGER.info("Default lock type HDFSLOCK is configured") + override def tableExists(tableIdentifier: TableIdentifier) + (sparkSession: SparkSession): Boolean = { + try { + lookupRelation(tableIdentifier)(sparkSession) + } catch { + case e: Exception => + return false } - val fileType = FileFactory.getFileType(metadataPath) - val metaDataBuffer = new ArrayBuffer[TableMeta] - fillMetaData(metadataPath, fileType, metaDataBuffer) - updateSchemasUpdatedTime(readSchemaFileSystemTime("", "")) - statistic.addStatistics(QueryStatisticsConstants.LOAD_META, - System.currentTimeMillis()) - recorder.recordStatisticsForDriver(statistic, queryId) - MetaData(metaDataBuffer) + true } - private def fillMetaData(basePath: String, fileType: FileType, - metaDataBuffer: ArrayBuffer[TableMeta]): Unit = { - val databasePath = basePath // + "/schemas" - try { - if (FileFactory.isFileExist(databasePath, fileType)) { - val file = FileFactory.getCarbonFile(databasePath, fileType) - val databaseFolders = file.listFiles() - - databaseFolders.foreach(databaseFolder => { - if (databaseFolder.isDirectory) { - val dbName = databaseFolder.getName - val tableFolders = databaseFolder.listFiles() - - tableFolders.foreach(tableFolder => { - if (tableFolder.isDirectory) { - val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName, - tableFolder.getName, UUID.randomUUID().toString) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath, - carbonTableIdentifier) - val tableMetadataFile = carbonTablePath.getSchemaFilePath - - if (FileFactory.isFileExist(tableMetadataFile, fileType)) { - val tableName = tableFolder.getName - val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName - val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile) - val schemaConverter = new ThriftWrapperSchemaConverterImpl - val wrapperTableInfo = schemaConverter - .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath) - val schemaFilePath = CarbonStorePath - .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath - wrapperTableInfo.setStorePath(storePath) - wrapperTableInfo - .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath)) - CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) - val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) - metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath, - carbonTable) - } - } - }) - } - }) - } else { - // Create folders and files. - FileFactory.mkdirs(databasePath, fileType) - } - } catch { - case s: java.io.FileNotFoundException => - s.printStackTrace() - // Create folders and files. - FileFactory.mkdirs(databasePath, fileType) + private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[TableMeta] = { + val dbName = identifier.getCarbonTableIdentifier.getDatabaseName + val tableName = identifier.getCarbonTableIdentifier.getTableName + val storePath = identifier.getStorePath + val carbonTableIdentifier = new CarbonTableIdentifier(dbName.toLowerCase(), + tableName.toLowerCase(), UUID.randomUUID().toString) + val carbonTablePath = + CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier) + val tableMetadataFile = carbonTablePath.getSchemaFilePath + val fileType = FileFactory.getFileType(tableMetadataFile) + if (FileFactory.isFileExist(tableMetadataFile, fileType)) { + val tableUniqueName = dbName + "_" + tableName + val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile) + val schemaConverter = new ThriftWrapperSchemaConverterImpl + val wrapperTableInfo = schemaConverter + .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath) + val schemaFilePath = CarbonStorePath + .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath + wrapperTableInfo.setStorePath(storePath) + wrapperTableInfo + .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath)) + CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) + val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) + val tableMeta = new TableMeta(carbonTable.getCarbonTableIdentifier, + identifier.getStorePath, + identifier.getTablePath, + carbonTable) + metadata.tablesMeta += tableMeta + Some(tableMeta) + } else { + None } } @@ -238,15 +201,15 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca * @param newTableIdentifier * @param thriftTableInfo * @param schemaEvolutionEntry - * @param carbonStorePath + * @param tablePath * @param sparkSession */ def updateTableSchema(newTableIdentifier: CarbonTableIdentifier, oldTableIdentifier: CarbonTableIdentifier, thriftTableInfo: org.apache.carbondata.format.TableInfo, schemaEvolutionEntry: SchemaEvolutionEntry, - carbonStorePath: String) - (sparkSession: SparkSession): String = { + tablePath: String) (sparkSession: SparkSession): String = { + val absoluteTableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath) val schemaConverter = new ThriftWrapperSchemaConverterImpl if (schemaEvolutionEntry != null) { thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry) @@ -255,11 +218,19 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca .fromExternalToWrapperTableInfo(thriftTableInfo, newTableIdentifier.getDatabaseName, newTableIdentifier.getTableName, - carbonStorePath) - createSchemaThriftFile(wrapperTableInfo, + absoluteTableIdentifier.getStorePath) + val identifier = + new CarbonTableIdentifier(newTableIdentifier.getDatabaseName, + newTableIdentifier.getTableName, + wrapperTableInfo.getFactTable.getTableId) + val path = createSchemaThriftFile(wrapperTableInfo, thriftTableInfo, + identifier) + addTableCache(wrapperTableInfo, + AbsoluteTableIdentifier.from(absoluteTableIdentifier.getStorePath, newTableIdentifier.getDatabaseName, - newTableIdentifier.getTableName)(sparkSession) + newTableIdentifier.getTableName)) + path } /** @@ -267,25 +238,27 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca * * @param carbonTableIdentifier * @param thriftTableInfo - * @param carbonStorePath + * @param tablePath * @param sparkSession */ def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier, thriftTableInfo: org.apache.carbondata.format.TableInfo, - carbonStorePath: String) - (sparkSession: SparkSession): String = { + tablePath: String)(sparkSession: SparkSession): String = { + val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath) val schemaConverter = new ThriftWrapperSchemaConverterImpl val wrapperTableInfo = schemaConverter .fromExternalToWrapperTableInfo(thriftTableInfo, carbonTableIdentifier.getDatabaseName, carbonTableIdentifier.getTableName, - carbonStorePath) + tableIdentifier.getStorePath) val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history evolutionEntries.remove(evolutionEntries.size() - 1) - createSchemaThriftFile(wrapperTableInfo, + wrapperTableInfo.setStorePath(tableIdentifier.getStorePath) + val path = createSchemaThriftFile(wrapperTableInfo, thriftTableInfo, - carbonTableIdentifier.getDatabaseName, - carbonTableIdentifier.getTableName)(sparkSession) + tableIdentifier.getCarbonTableIdentifier) + addTableCache(wrapperTableInfo, tableIdentifier) + path } @@ -296,24 +269,38 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca * Load CarbonTable from wrapper tableInfo * */ - def createTableFromThrift( - tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo, - dbName: String, tableName: String)(sparkSession: SparkSession): (String, String) = { - if (tableExists(tableName, Some(dbName))(sparkSession)) { - sys.error(s"Table [$tableName] already exists under Database [$dbName]") - } - val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime) + def saveToDisk(tableInfo: schema.table.TableInfo, tablePath: String) { val schemaConverter = new ThriftWrapperSchemaConverterImpl + val dbName = tableInfo.getDatabaseName + val tableName = tableInfo.getFactTable.getTableName val thriftTableInfo = schemaConverter .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName) - thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history - .add(schemaEvolutionEntry) - val carbonTablePath = createSchemaThriftFile(tableInfo, + val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) + tableInfo.setStorePath(identifier.getStorePath) + createSchemaThriftFile(tableInfo, thriftTableInfo, - dbName, - tableName)(sparkSession) + identifier.getCarbonTableIdentifier) LOGGER.info(s"Table $tableName for Database $dbName created successfully.") - (carbonTablePath, "") + } + + /** + * Generates schema string from TableInfo + */ + override def generateTableSchemaString(tableInfo: schema.table.TableInfo, + tablePath: String): String = { + val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier) + val schemaMetadataPath = + CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath) + tableInfo.setMetaDataFilepath(schemaMetadataPath) + tableInfo.setStorePath(tableIdentifier.getStorePath) + val schemaEvolutionEntry = new schema.SchemaEvolutionEntry + schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime) + tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry) + removeTableFromMetadata(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName) + CarbonMetadata.getInstance().loadTableMetadata(tableInfo) + addTableCache(tableInfo, tableIdentifier) + CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",") } /** @@ -321,23 +308,16 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca * * @param tableInfo * @param thriftTableInfo - * @param dbName - * @param tableName - * @param sparkSession * @return */ - private def createSchemaThriftFile( - tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo, - thriftTableInfo: org.apache.carbondata.format.TableInfo, - dbName: String, tableName: String) - (sparkSession: SparkSession): String = { - val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, - tableInfo.getFactTable.getTableId) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier) + private def createSchemaThriftFile(tableInfo: schema.table.TableInfo, + thriftTableInfo: TableInfo, + carbonTableIdentifier: CarbonTableIdentifier): String = { + val carbonTablePath = CarbonStorePath. + getCarbonTablePath(tableInfo.getStorePath, carbonTableIdentifier) val schemaFilePath = carbonTablePath.getSchemaFilePath val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath) tableInfo.setMetaDataFilepath(schemaMetadataPath) - tableInfo.setStorePath(storePath) val fileType = FileFactory.getFileType(schemaMetadataPath) if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) { FileFactory.mkdirs(schemaMetadataPath, fileType) @@ -346,13 +326,20 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca thriftWriter.open(FileWriteOperation.OVERWRITE) thriftWriter.write(thriftTableInfo) thriftWriter.close() - removeTableFromMetadata(dbName, tableName) + updateSchemasUpdatedTime(touchSchemaFileSystemTime(tableInfo.getStorePath)) + carbonTablePath.getPath + } + + protected def addTableCache(tableInfo: table.TableInfo, + absoluteTableIdentifier: AbsoluteTableIdentifier) = { + val identifier = absoluteTableIdentifier.getCarbonTableIdentifier + CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName) + removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName) CarbonMetadata.getInstance().loadTableMetadata(tableInfo) - val tableMeta = new TableMeta(carbonTableIdentifier, storePath, - CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)) + val tableMeta = new TableMeta(identifier, absoluteTableIdentifier.getStorePath, + absoluteTableIdentifier.getTablePath, + CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName)) metadata.tablesMeta += tableMeta - updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName)) - carbonTablePath.getPath } /** @@ -362,13 +349,15 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca * @param tableName */ def removeTableFromMetadata(dbName: String, tableName: String): Unit = { - val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, tableName) + val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadataCache(dbName, tableName) metadataToBeRemoved match { case Some(tableMeta) => metadata.tablesMeta -= tableMeta CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName) case None => - LOGGER.debug(s"No entry for table $tableName in database $dbName") + if (LOGGER.isDebugEnabled) { + LOGGER.debug(s"No entry for table $tableName in database $dbName") + } } } @@ -402,23 +391,23 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = { - val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) - val tableName = tableIdentifier.table.toLowerCase - - val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath, - new CarbonTableIdentifier(dbName, tableName, "")).getPath - - val fileType = FileFactory.getFileType(tablePath) - FileFactory.isFileExist(tablePath, fileType) + try { + val tablePath = lookupRelation(tableIdentifier)(sparkSession). + asInstanceOf[CarbonRelation].tableMeta.tablePath + val fileType = FileFactory.getFileType(tablePath) + FileFactory.isFileExist(tablePath, fileType) + } catch { + case e: Exception => + false + } } - def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier) + def dropTable(tablePath: String, tableIdentifier: TableIdentifier) (sparkSession: SparkSession) { val dbName = tableIdentifier.database.get val tableName = tableIdentifier.table - - val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath, - new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath + val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) + val metadataFilePath = CarbonStorePath.getCarbonTablePath(identifier).getMetadataDirectoryPath val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) if (null != carbonTable) { // clear driver B-tree and dictionary cache @@ -429,26 +418,18 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca if (FileFactory.isFileExist(metadataFilePath, fileType)) { // while drop we should refresh the schema modified time so that if any thing has changed // in the other beeline need to update. - checkSchemasModifiedTimeAndReloadTables - - val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, - tableIdentifier.table) - metadataToBeRemoved match { - case Some(tableMeta) => - metadata.tablesMeta -= tableMeta - CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName) - updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName)) - case None => - LOGGER.info(s"Metadata does not contain entry for table $tableName in database $dbName") - } + checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath) + + removeTableFromMetadata(dbName, tableName) + updateSchemasUpdatedTime(touchSchemaFileSystemTime(identifier.getStorePath)) CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession) // discard cached table info in cachedDataSourceTables sparkSession.sessionState.catalog.refreshTable(tableIdentifier) } } - private def getTimestampFileAndType(databaseName: String, tableName: String) = { - val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE + private def getTimestampFileAndType(basePath: String) = { + val timestampFile = basePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE val timestampFileType = FileFactory.getFileType(timestampFile) (timestampFile, timestampFileType) } @@ -462,37 +443,20 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca tableModifiedTimeStore.put("default", timeStamp) } - def updateAndTouchSchemasUpdatedTime(databaseName: String, tableName: String) { - updateSchemasUpdatedTime(touchSchemaFileSystemTime(databaseName, tableName)) + def updateAndTouchSchemasUpdatedTime(basePath: String) { + updateSchemasUpdatedTime(touchSchemaFileSystemTime(basePath)) } - /** - * This method will read the timestamp of empty schema file - * - * @param databaseName - * @param tableName - * @return - */ - private def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = { - val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName) - if (FileFactory.isFileExist(timestampFile, timestampFileType)) { - FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime - } else { - System.currentTimeMillis() - } - } /** * This method will check and create an empty schema timestamp file * - * @param databaseName - * @param tableName * @return */ - private def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = { - val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName) + private def touchSchemaFileSystemTime(basePath: String): Long = { + val (timestampFile, timestampFileType) = getTimestampFileAndType(basePath) if (!FileFactory.isFileExist(timestampFile, timestampFileType)) { - LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName") + LOGGER.audit(s"Creating timestamp file for $basePath") FileFactory.createNewFile(timestampFile, timestampFileType) } val systemTime = System.currentTimeMillis() @@ -501,8 +465,9 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca systemTime } - def checkSchemasModifiedTimeAndReloadTables() { - val (timestampFile, timestampFileType) = getTimestampFileAndType("", "") + def checkSchemasModifiedTimeAndReloadTables(storePath: String) { + val (timestampFile, timestampFileType) = + getTimestampFileAndType(storePath) if (FileFactory.isFileExist(timestampFile, timestampFileType)) { if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType). getLastModifiedTime == @@ -513,7 +478,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca } private def refreshCache() { - metadata.tablesMeta = loadMetadata(storePath, nextQueryId).tablesMeta + metadata.tablesMeta.clear() } override def isReadFromHiveMetaStore: Boolean = false http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala index a8f92ce..c328130 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala @@ -16,21 +16,15 @@ */ package org.apache.spark.sql.hive -import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConverters._ -import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, RuntimeConfig, SparkSession} +import org.apache.spark.sql.{RuntimeConfig, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} -import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree -import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl -import org.apache.carbondata.core.metadata.schema -import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} +import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.format @@ -41,12 +35,10 @@ import org.apache.carbondata.spark.util.CarbonSparkUtil /** * Metastore to store carbonschema in hive */ -class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String) - extends CarbonFileMetastore(conf, storePath) { +class CarbonHiveMetaStore(conf: RuntimeConfig) extends CarbonFileMetastore(conf) { override def isReadFromHiveMetaStore: Boolean = true - /** * Create spark session from paramters. * @@ -61,7 +53,7 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String) if (info != null) { val table = CarbonTable.buildFromTableInfo(info) val meta = new TableMeta(table.getCarbonTableIdentifier, - table.getStorePath, table) + absIdentifier.getStorePath, absIdentifier.getTablePath, table) CarbonRelation(info.getDatabaseName, info.getFactTable.getTableName, CarbonSparkUtil.createSparkMeta(table), meta) } else { @@ -69,111 +61,30 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String) } } - override def lookupRelation(tableIdentifier: TableIdentifier) - (sparkSession: SparkSession): LogicalPlan = { - val database = tableIdentifier.database.getOrElse( - sparkSession.catalog.currentDatabase) - val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match { - case SubqueryAlias(_, - LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), - _) => - carbonDatasourceHadoopRelation.carbonRelation - case LogicalRelation( - carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) => - carbonDatasourceHadoopRelation.carbonRelation - case _ => throw new NoSuchTableException(database, tableIdentifier.table) - } - relation - } - - /** - * This method will search for a table in the catalog metadata - * - * @param database - * @param tableName - * @return - */ - override def getTableFromMetadata(database: String, - tableName: String, - readStore: Boolean): Option[TableMeta] = { - if (!readStore) { - None - } else { - super.getTableFromMetadata(database, tableName, readStore) - } - } - - override def tableExists(tableIdentifier: TableIdentifier) - (sparkSession: SparkSession): Boolean = { - try { - lookupRelation(tableIdentifier)(sparkSession) - } catch { - case e: Exception => - return false - } - true - } - - override def loadMetadata(metadataPath: String, - queryId: String): MetaData = { - MetaData(new ArrayBuffer[TableMeta]) - } - - - /** - * - * Prepare Thrift Schema from wrapper TableInfo and write to Schema file. - * Load CarbonTable from wrapper tableInfo - * - */ - override def createTableFromThrift(tableInfo: TableInfo, dbName: String, - tableName: String)(sparkSession: SparkSession): (String, String) = { - val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, - tableInfo.getFactTable.getTableId) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier) - val schemaMetadataPath = - CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath) - tableInfo.setMetaDataFilepath(schemaMetadataPath) - tableInfo.setStorePath(storePath) - val schemaEvolutionEntry = new schema.SchemaEvolutionEntry - schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime) - tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry) - removeTableFromMetadata(dbName, tableName) - CarbonMetadata.getInstance().loadTableMetadata(tableInfo) - (carbonTablePath.getPath, CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",")) - } - - /** - * This method will remove the table meta from catalog metadata array - * - * @param dbName - * @param tableName - */ - override def removeTableFromMetadata(dbName: String, - tableName: String): Unit = { - // do nothing - } override def isTablePathExists(tableIdentifier: TableIdentifier) (sparkSession: SparkSession): Boolean = { tableExists(tableIdentifier)(sparkSession) } - override def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier) + override def dropTable(tablePath: String, tableIdentifier: TableIdentifier) (sparkSession: SparkSession): Unit = { val dbName = tableIdentifier.database.get val tableName = tableIdentifier.table + val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) if (null != carbonTable) { // clear driver B-tree and dictionary cache ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable) } + checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath) + removeTableFromMetadata(dbName, tableName) CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession) // discard cached table info in cachedDataSourceTables sparkSession.sessionState.catalog.refreshTable(tableIdentifier) } - override def checkSchemasModifiedTimeAndReloadTables(): Unit = { + override def checkSchemasModifiedTimeAndReloadTables(storePath: String) { // do nothing now } @@ -200,23 +111,23 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String) * @param newTableIdentifier * @param thriftTableInfo * @param schemaEvolutionEntry - * @param carbonStorePath * @param sparkSession */ override def updateTableSchema(newTableIdentifier: CarbonTableIdentifier, oldTableIdentifier: CarbonTableIdentifier, thriftTableInfo: format.TableInfo, schemaEvolutionEntry: SchemaEvolutionEntry, - carbonStorePath: String) + tablePath: String) (sparkSession: SparkSession): String = { val schemaConverter = new ThriftWrapperSchemaConverterImpl + val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) if (schemaEvolutionEntry != null) { thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry) } updateHiveMetaStore(newTableIdentifier, oldTableIdentifier, thriftTableInfo, - carbonStorePath, + identifier.getStorePath, sparkSession, schemaConverter) } @@ -232,23 +143,20 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String) newTableIdentifier.getDatabaseName, newTableIdentifier.getTableName, carbonStorePath) - wrapperTableInfo.setStorePath(storePath) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, newTableIdentifier) + wrapperTableInfo.setStorePath(carbonStorePath) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier) val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath) wrapperTableInfo.setMetaDataFilepath(schemaMetadataPath) val dbName = oldTableIdentifier.getDatabaseName val tableName = oldTableIdentifier.getTableName - val carbonUpdatedIdentifier = new CarbonTableIdentifier(dbName, tableName, - wrapperTableInfo.getFactTable.getTableId) val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "") sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive.runSqlHive( s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)") sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString) - removeTableFromMetadata(wrapperTableInfo.getDatabaseName, - wrapperTableInfo.getFactTable.getTableName) + removeTableFromMetadata(dbName, tableName) CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) - CarbonStorePath.getCarbonTablePath(storePath, carbonUpdatedIdentifier).getPath + CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier).getPath } /** @@ -256,21 +164,23 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String) * * @param carbonTableIdentifier * @param thriftTableInfo - * @param carbonStorePath * @param sparkSession */ override def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier, thriftTableInfo: format.TableInfo, - carbonStorePath: String) + tablePath: String) (sparkSession: SparkSession): String = { val schemaConverter = new ThriftWrapperSchemaConverterImpl + val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history evolutionEntries.remove(evolutionEntries.size() - 1) updateHiveMetaStore(carbonTableIdentifier, carbonTableIdentifier, thriftTableInfo, - carbonStorePath, + identifier.getStorePath, sparkSession, schemaConverter) } + + }
