Changed default store location to spark-warehouse refactored code to use common code
Fixed style Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/34d2870a Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/34d2870a Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/34d2870a Branch: refs/heads/metadata Commit: 34d2870aebd10adfacd56e5892a9b1149fb6a08e Parents: 8d3c9bf Author: Ravindra Pesala <[email protected]> Authored: Fri Jul 14 11:12:57 2017 +0530 Committer: Jacky Li <[email protected]> Committed: Thu Jul 20 10:45:22 2017 +0800 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 4 - .../core/metadata/AbsoluteTableIdentifier.java | 5 +- .../carbondata/core/util/CarbonProperties.java | 4 - .../apache/carbondata/core/util/CarbonUtil.java | 28 +- .../spark/sql/common/util/QueryTest.scala | 6 +- .../carbondata/spark/load/CarbonLoaderUtil.java | 6 +- .../org/apache/carbondata/api/CarbonStore.scala | 3 +- .../spark/sql/test/TestQueryExecutor.scala | 1 - .../spark/thriftserver/CarbonThriftServer.scala | 4 +- .../org/apache/spark/sql/CarbonContext.scala | 12 +- .../sql/CarbonDatasourceHadoopRelation.scala | 3 +- .../apache/spark/sql/hive/CarbonMetastore.scala | 4 +- .../spark/thriftserver/CarbonThriftServer.scala | 4 +- .../carbondata/spark/util/CarbonSparkUtil.scala | 3 +- .../spark/sql/CarbonDataFrameWriter.scala | 2 +- .../spark/sql/CarbonDictionaryDecoder.scala | 4 +- .../scala/org/apache/spark/sql/CarbonEnv.scala | 12 +- .../org/apache/spark/sql/CarbonSession.scala | 13 +- .../org/apache/spark/sql/CarbonSource.scala | 54 ++- .../execution/CarbonLateDecodeStrategy.scala | 2 +- .../execution/command/AlterTableCommands.scala | 16 +- .../execution/command/CarbonHiveCommands.scala | 4 +- .../sql/execution/command/DDLStrategy.scala | 3 +- .../execution/command/carbonTableSchema.scala | 35 +- .../spark/sql/hive/CarbonFileMetastore.scala | 383 +++++++++---------- .../spark/sql/hive/CarbonHiveMetaStore.scala | 132 +------ .../apache/spark/sql/hive/CarbonMetaStore.scala | 54 ++- .../spark/sql/hive/CarbonSessionState.scala | 7 +- .../sql/test/Spark2TestQueryExecutor.scala | 2 +- .../org/apache/spark/util/AlterTableUtil.scala | 12 +- .../org/apache/spark/util/CleanFiles.scala | 3 +- .../org/apache/spark/util/Compaction.scala | 3 +- .../apache/spark/util/DeleteSegmentByDate.scala | 3 +- .../apache/spark/util/DeleteSegmentById.scala | 3 +- .../org/apache/spark/util/ShowSegments.scala | 3 +- .../org/apache/spark/util/TableLoader.scala | 3 +- .../apache/spark/util/CarbonCommandSuite.scala | 27 +- .../carbondata/processing/merger/TableMeta.java | 4 +- .../util/CarbonDataProcessorUtil.java | 6 +- 39 files changed, 410 insertions(+), 467 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 55a292e..f6e5c62 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -71,10 +71,6 @@ public final class CarbonCommonConstants { @CarbonProperty public static final String SORT_SIZE = "carbon.sort.size"; /** - * default location of the carbon member, hierarchy and fact files - */ - public static final String STORE_LOCATION_DEFAULT_VAL = "../carbon.store"; - /** * CARDINALITY_INCREMENT_DEFAULT_VALUE */ public static final int CARDINALITY_INCREMENT_VALUE_DEFAULT_VAL = 10; http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java index 3c39145..22faaf2 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java @@ -21,7 +21,6 @@ import java.io.Serializable; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.util.CarbonUtil; /** * identifier which will have store path and carbon table identifier @@ -68,9 +67,9 @@ public class AbsoluteTableIdentifier implements Serializable { return carbonTableIdentifier; } - public static AbsoluteTableIdentifier from(String dbName, String tableName) { + public static AbsoluteTableIdentifier from(String storePath, String dbName, String tableName) { CarbonTableIdentifier identifier = new CarbonTableIdentifier(dbName, tableName, ""); - return new AbsoluteTableIdentifier(CarbonUtil.getCarbonStorePath(), identifier); + return new AbsoluteTableIdentifier(storePath, identifier); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index d14b7ab..2f5874b 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -81,10 +81,6 @@ public final class CarbonProperties { } catch (IllegalAccessException e) { LOGGER.error("Illelagal access to declared field" + e.getMessage()); } - if (null == carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION)) { - carbonProperties.setProperty(CarbonCommonConstants.STORE_LOCATION, - CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL); - } validateBlockletSize(); validateNumCores(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index b9c164a..59f8cd8 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -730,15 +730,6 @@ public final class CarbonUtil { .startsWith("file://") || lowerPath.startsWith(ALLUXIO_PREFIX); } - public static String getCarbonStorePath() { - CarbonProperties prop = CarbonProperties.getInstance(); - if (null == prop) { - return null; - } - return prop.getProperty(CarbonCommonConstants.STORE_LOCATION, - CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL); - } - /** * This method will check the existence of a file at a given path */ @@ -1800,6 +1791,25 @@ public final class CarbonUtil { } /** + * Removes schema from properties + * @param properties + * @return + */ + public static Map<String, String> removeSchemaFromMap(Map<String, String> properties) { + Map<String, String> newMap = new HashMap<>(); + newMap.putAll(properties); + String partsNo = newMap.get("carbonSchemaPartsNo"); + if (partsNo == null) { + return newMap; + } + int no = Integer.parseInt(partsNo); + for (int i = 0; i < no; i++) { + newMap.remove("carbonSchema" + i); + } + return newMap; + } + + /** * This method will read the schema file from a given path * * @param schemaFilePath http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala index 9912ec4..9926c57 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala @@ -27,6 +27,9 @@ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.test.TestQueryExecutor import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + class QueryTest extends PlanTest { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) @@ -85,7 +88,8 @@ class QueryTest extends PlanTest { val sqlContext: SQLContext = TestQueryExecutor.INSTANCE.sqlContext - val storeLocation = TestQueryExecutor.storeLocation + lazy val storeLocation = CarbonProperties.getInstance(). + getProperty(CarbonCommonConstants.STORE_LOCATION) val resourcesPath = TestQueryExecutor.resourcesPath val integrationPath = TestQueryExecutor.integrationPath } http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java index 5b603aa..5cef14a 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java @@ -213,8 +213,10 @@ public final class CarbonLoaderUtil { String tempLocationKey = CarbonDataProcessorUtil .getTempStoreLocationKey(databaseName, tableName, loadModel.getTaskNo(), isCompactionFlow); // form local store location - final String localStoreLocation = CarbonProperties.getInstance() - .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL); + final String localStoreLocation = CarbonProperties.getInstance().getProperty(tempLocationKey); + if (localStoreLocation == null) { + throw new RuntimeException("Store location not set for the key " + tempLocationKey); + } // submit local folder clean up in another thread so that main thread execution is not blocked ExecutorService localFolderDeletionService = Executors.newFixedThreadPool(1); try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala index 45719fc..dc37360 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala @@ -157,8 +157,9 @@ object CarbonStore { def isSegmentValid( dbName: String, tableName: String, + storePath: String, segmentId: String): Boolean = { - val identifier = AbsoluteTableIdentifier.from(dbName, tableName) + val identifier = AbsoluteTableIdentifier.from(storePath, dbName, tableName) val validAndInvalidSegments: SegmentStatusManager.ValidAndInvalidSegmentsInfo = new SegmentStatusManager( identifier).getValidAndInvalidSegments http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala index b76bca3..149e3b1 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala @@ -53,7 +53,6 @@ object TestQueryExecutor { val INSTANCE = lookupQueryExecutor.newInstance().asInstanceOf[TestQueryExecutorRegister] CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE") - .addProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation) .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords") private def lookupQueryExecutor: Class[_] = { ServiceLoader.load(classOf[TestQueryExecutorRegister], Utils.getContextOrSparkClassLoader) http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala index b8ba9f7..f8275d1 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala @@ -57,8 +57,8 @@ object CarbonThriftServer { "Using default Value and proceeding") Thread.sleep(30000) } - - val cc = new CarbonContext(sc, args.head) + val storePath = if (args.length > 0) args.head else null + val cc = new CarbonContext(sc, storePath) HiveThriftServer2.startWithContext(cc) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala index 1aeda95..da4b210 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala @@ -44,7 +44,7 @@ class CarbonContext( def this(sc: SparkContext) = { this(sc, - new File(CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL).getCanonicalPath, + null, new File(CarbonCommonConstants.METASTORE_LOCATION_DEFAULT_VAL).getCanonicalPath) } @@ -66,8 +66,14 @@ class CarbonContext( @transient override lazy val catalog = { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.STORE_LOCATION, storePath) + val carbonProperties = CarbonProperties.getInstance() + if (storePath != null) { + carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath) + // In case if it is in carbon.properties for backward compatible + } else if (carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION) == null) { + carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, + conf.getConfString("spark.sql.warehouse.dir")) + } new CarbonMetastore(this, storePath, metadataHive, queryId) with OverrideCatalog } http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index 2fc93e6..71ef6a6 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -69,7 +69,8 @@ private[sql] case class CarbonDatasourceHadoopRelation( carbonTable.getDatabaseName, carbonTable.getFactTableName, CarbonSparkUtil.createSparkMeta(carbonTable), - new TableMeta(carbonTable.getCarbonTableIdentifier, paths.head, carbonTable), + new TableMeta(carbonTable.getCarbonTableIdentifier, + paths.head, absIdentifier.getTablePath, carbonTable), None )(sqlContext) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala index e8d3907..7790f59 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala @@ -236,7 +236,7 @@ class CarbonMetastore(hiveContext: HiveContext, val storePath: String, CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath, - carbonTable) + null, carbonTable) } } }) @@ -281,7 +281,7 @@ class CarbonMetastore(hiveContext: HiveContext, val storePath: String, tableInfo.setMetaDataFilepath(schemaMetadataPath) tableInfo.setStorePath(storePath) CarbonMetadata.getInstance().loadTableMetadata(tableInfo) - val tableMeta = new TableMeta(carbonTableIdentifier, storePath, + val tableMeta = new TableMeta(carbonTableIdentifier, storePath, null, CarbonMetadata.getInstance().getCarbonTable(dbName + "_" + tableName)) val fileType = FileFactory.getFileType(schemaMetadataPath) http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala index aba6891..34ac940 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala @@ -53,9 +53,9 @@ object CarbonThriftServer { System.setProperty("carbon.properties.filepath", sparkConf.get("carbon.properties.filepath")) } - CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION, args.head) + val storePath = if (args.length > 0) args.head else null - val spark = builder.getOrCreateCarbonSession(args.head) + val spark = builder.getOrCreateCarbonSession(storePath) val warmUpTime = CarbonProperties.getInstance().getProperty("carbon.spark.warmUpTime", "5000") try { Thread.sleep(Integer.parseInt(warmUpTime)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala index d1d3015..de7f3fb 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala @@ -48,7 +48,8 @@ object CarbonSparkUtil { def createCarbonRelation(tableInfo: TableInfo, tablePath: String): CarbonRelation = { val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) val table = CarbonTable.buildFromTableInfo(tableInfo) - val meta = new TableMeta(identifier.getCarbonTableIdentifier, identifier.getStorePath, table) + val meta = new TableMeta(identifier.getCarbonTableIdentifier, + identifier.getStorePath, tablePath, table) CarbonRelation(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName, CarbonSparkUtil.createSparkMeta(table), meta) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala index 1054c62..805a421 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala @@ -58,7 +58,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) { */ private def loadTempCSV(options: CarbonOption): Unit = { // temporary solution: write to csv file, then load the csv into carbon - val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.storePath + val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).storePath val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR) .append("tempCSV") .append(CarbonCommonConstants.UNDERSCORE).append(options.dbName) http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala index 33091aa..43f6a21 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala @@ -67,7 +67,7 @@ case class CarbonDictionaryDecoder( override def doExecute(): RDD[InternalRow] = { attachTree(this, "execute") { - val storePath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + val storePath = CarbonEnv.getInstance(sparkSession).storePath val absoluteTableIdentifiers = relations.map { relation => val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier) @@ -121,7 +121,7 @@ case class CarbonDictionaryDecoder( override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { - val storePath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + val storePath = CarbonEnv.getInstance(sparkSession).storePath val absoluteTableIdentifiers = relations.map { relation => val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier) http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index d19eb39..9d10ea0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -40,6 +40,8 @@ class CarbonEnv { var carbonSessionInfo: CarbonSessionInfo = _ + var storePath: String = _ + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) // set readsupport class global so that the executor can get it. @@ -61,10 +63,14 @@ class CarbonEnv { // add session params after adding DefaultCarbonParams config.addDefaultCarbonSessionParams() carbonMetastore = { - val storePath = - CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION) + val properties = CarbonProperties.getInstance() + storePath = properties.getProperty(CarbonCommonConstants.STORE_LOCATION) + if (storePath == null) { + storePath = sparkSession.conf.get("spark.sql.warehouse.dir") + properties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath) + } LOGGER.info(s"carbon env initial: $storePath") - CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf, storePath) + CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf) } CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true") initialized = true http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index b436891..7390cf3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -65,7 +65,7 @@ object CarbonSession { def getOrCreateCarbonSession(): SparkSession = { getOrCreateCarbonSession( - new File(CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL).getCanonicalPath, + null, new File(CarbonCommonConstants.METASTORE_LOCATION_DEFAULT_VAL).getCanonicalPath) } @@ -145,9 +145,14 @@ object CarbonSession { } sc } - - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.STORE_LOCATION, storePath) + val carbonProperties = CarbonProperties.getInstance() + if (storePath != null) { + carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath) + // In case if it is in carbon.properties for backward compatible + } else if (carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION) == null) { + carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, + sparkContext.conf.get("spark.sql.warehouse.dir")) + } session = new CarbonSession(sparkContext) options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } SparkSession.setDefaultSession(session) http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index 498ea03..bec163b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -25,15 +25,19 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.execution.CarbonLateDecodeStrategy -import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, TableModel, TableNewProcessor} +import org.apache.spark.sql.execution.command.{CreateTable, TableModel, TableNewProcessor} +import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.optimizer.CarbonLateDecodeRule import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.{DecimalType, StructType} +import org.apache.spark.sql.types.StructType import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.schema import org.apache.carbondata.core.metadata.schema.table.TableInfo import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.spark.CarbonOption import org.apache.carbondata.spark.exception.MalformedCarbonCommandException @@ -146,7 +150,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider private def createTableIfNotExists(sparkSession: SparkSession, parameters: Map[String, String], - dataSchema: StructType): String = { + dataSchema: StructType) = { val dbName: String = parameters.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase @@ -158,7 +162,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider } else { CarbonEnv.getInstance(sparkSession).carbonMetastore .lookupRelation(Option(dbName), tableName)(sparkSession) - CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName" + CarbonEnv.getInstance(sparkSession).storePath + s"/$dbName/$tableName" } } catch { case ex: NoSuchTableException => @@ -168,7 +172,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider dbName, tableName) CreateTable(cm, false).run(sparkSession) - CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName" + getPathForTable(sparkSession, dbName, tableName, parameters) case ex: Exception => throw new Exception("do not have dbname and tablename for carbon table", ex) } @@ -195,9 +199,9 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider if (parameters.contains("tablePath")) { parameters.get("tablePath").get } else { - CarbonEnv.getInstance(sparkSession).carbonMetastore - .lookupRelation(Option(dbName), tableName)(sparkSession) - CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName" + val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore + .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] + relation.tableMeta.tablePath } } catch { case ex: Exception => @@ -234,22 +238,46 @@ object CarbonSource { val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore val storageFormat = tableDesc.storage val properties = storageFormat.properties - if (metaStore.isReadFromHiveMetaStore && !properties.contains("carbonSchemaPartsNo")) { + if (!properties.contains("carbonSchemaPartsNo")) { val dbName: String = properties.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase val tableName: String = properties.getOrElse("tableName", "").toLowerCase val model = createTableInfoFromParams(properties, tableDesc.schema, dbName, tableName) val tableInfo: TableInfo = TableNewProcessor(model) - val (tablePath, carbonSchemaString) = - metaStore.createTableFromThrift(tableInfo, dbName, tableName)(sparkSession) - val map = CarbonUtil.convertToMultiStringMap(tableInfo) + val tablePath = CarbonEnv.getInstance(sparkSession).storePath + "/" + dbName + "/" + tableName + val schemaEvolutionEntry = new schema.SchemaEvolutionEntry + schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime) + tableInfo.getFactTable.getSchemaEvalution. + getSchemaEvolutionEntryList.add(schemaEvolutionEntry) + val map = if (metaStore.isReadFromHiveMetaStore) { + val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier) + val schemaMetadataPath = + CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath) + tableInfo.setMetaDataFilepath(schemaMetadataPath) + tableInfo.setStorePath(tableIdentifier.getStorePath) + CarbonUtil.convertToMultiStringMap(tableInfo) + } else { + metaStore.saveToDisk(tableInfo, tablePath) + new java.util.HashMap[String, String]() + } properties.foreach(e => map.put(e._1, e._2)) map.put("tablePath", tablePath) // updating params val updatedFormat = storageFormat.copy(properties = map.asScala.toMap) tableDesc.copy(storage = updatedFormat) } else { - tableDesc + val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava) + if (!metaStore.isReadFromHiveMetaStore) { + // save to disk + metaStore.saveToDisk(tableInfo, properties.get("tablePath").get) + // remove schema string from map as we don't store carbon schema to hive metastore + val map = CarbonUtil.removeSchemaFromMap(properties.asJava) + val updatedFormat = storageFormat.copy(properties = map.asScala.toMap) + tableDesc.copy(storage = updatedFormat) + } else { + tableDesc + } } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala index 1cc6668..33bba8f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala @@ -102,7 +102,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { CarbonAliasDecoderRelation(), rdd, output, - CarbonEnv.getInstance(SparkSession.getActiveSession.get).carbonMetastore.storePath, + CarbonEnv.getInstance(SparkSession.getActiveSession.get).storePath, table.carbonTable.getTableInfo.serialize()) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala index 0d5a821..17e456d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala @@ -22,6 +22,7 @@ import scala.collection.mutable.ListBuffer import scala.language.implicitConversions import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState} import org.apache.spark.util.AlterTableUtil @@ -29,7 +30,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} -import org.apache.carbondata.core.metadata.CarbonTableIdentifier +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.CarbonTable @@ -167,11 +168,12 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR locks = AlterTableUtil .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)( sparkSession) - carbonTable = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession) - .asInstanceOf[CarbonRelation].tableMeta.carbonTable + val tableMeta = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession) + .asInstanceOf[CarbonRelation].tableMeta + carbonTable = tableMeta.carbonTable // get the latest carbon table and check for column existence - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, - carbonTable.getCarbonTableIdentifier) + val carbonTablePath = CarbonStorePath. + getCarbonTablePath(AbsoluteTableIdentifier.fromTablePath(tableMeta.tablePath)) val tableMetadataFile = carbonTablePath.getPath val tableInfo: org.apache.carbondata.format.TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) @@ -196,7 +198,7 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR carbonTable.getCarbonTableIdentifier, tableInfo, schemaEvolutionEntry, - carbonTable.getStorePath)(sparkSession) + tableMeta.tablePath)(sparkSession) metastore.removeTableFromMetadata(oldDatabaseName, oldTableName) sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive .runSqlHive( @@ -206,6 +208,8 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" + s"('tableName'='$newTableName', " + s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')") + sparkSession.catalog.refreshTable(TableIdentifier(newTableName, + Some(oldDatabaseName)).quotedString) LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName") LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName") } catch { http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala index 609f39b..2731104 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala @@ -41,8 +41,8 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand) CarbonDropTableCommand(true, tableName.database, tableName.table).run(sparkSession) } } - CarbonUtil.dropDatabaseDirectory(dbName.toLowerCase, CarbonEnv.getInstance(sparkSession) - .carbonMetastore.storePath) + CarbonUtil.dropDatabaseDirectory(dbName.toLowerCase, + CarbonEnv.getInstance(sparkSession).storePath) rows } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala index 760cb06..18d2dc7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala @@ -62,8 +62,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { _, child: LogicalPlan, _, _) => ExecutedCommandExec(LoadTableByInsert(relation, child)) :: Nil case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) => - CarbonUtil.createDatabaseDirectory(dbName, CarbonEnv.getInstance(sparkSession). - carbonMetastore.storePath) + CarbonUtil.createDatabaseDirectory(dbName, CarbonEnv.getInstance(sparkSession).storePath) ExecutedCommandExec(createDb) :: Nil case drop@DropDatabaseCommand(dbName, ifExists, isCascade) => ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index cc18fa3..eeb2022 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -40,7 +40,7 @@ import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOp import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.dictionary.server.DictionaryServer import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage} -import org.apache.carbondata.core.metadata.CarbonTableIdentifier +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.TableInfo import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension @@ -156,7 +156,7 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName) carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName) - carbonLoadModel.setStorePath(relation.tableMeta.storePath) + carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath) var storeLocation = CarbonProperties.getInstance .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH, @@ -196,7 +196,9 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru } override def processSchema(sparkSession: SparkSession): Seq[Row] = { - CarbonEnv.getInstance(sparkSession).carbonMetastore.checkSchemasModifiedTimeAndReloadTables() + val storePath = CarbonEnv.getInstance(sparkSession).storePath + CarbonEnv.getInstance(sparkSession).carbonMetastore. + checkSchemasModifiedTimeAndReloadTables(storePath) val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) cm.databaseName = getDB.getDatabaseName(cm.databaseNameOp, sparkSession) val tbName = cm.tableName @@ -218,10 +220,11 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru sys.error(s"Table [$tbName] already exists under database [$dbName]") } } else { + val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName) // Add Database to catalog and persist val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore - val (tablePath, carbonSchemaString) = - catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession) + val tablePath = tableIdentifier.getTablePath + val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath) if (createDSTable) { try { val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size) @@ -239,7 +242,7 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName)) // call the drop table to delete the created table. CarbonEnv.getInstance(sparkSession).carbonMetastore - .dropTable(catalog.storePath, identifier)(sparkSession) + .dropTable(tablePath, identifier)(sparkSession) LOGGER.audit(s"Table creation with Database name [$dbName] " + s"and Table name [$tbName] failed") @@ -332,9 +335,7 @@ object LoadTable { tableInfo, entry, carbonTablePath.getPath)(sparkSession) // update the schema modified time - metastore.updateAndTouchSchemasUpdatedTime( - carbonLoadModel.getDatabaseName, - carbonLoadModel.getTableName) + metastore.updateAndTouchSchemasUpdatedTime(model.hdfsLocation) // update CarbonDataLoadSchema val carbonTable = metastore.lookupRelation(Option(model.table.getDatabaseName), @@ -526,7 +527,7 @@ case class LoadTable( val carbonLoadModel = new CarbonLoadModel() carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName) carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName) - carbonLoadModel.setStorePath(relation.tableMeta.storePath) + carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath) val table = relation.tableMeta.carbonTable carbonLoadModel.setTableName(table.getFactTableName) @@ -882,7 +883,10 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean, val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "") val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK) val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore - catalog.checkSchemasModifiedTimeAndReloadTables() + val tableIdentifier = + AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath, + dbName.toLowerCase, tableName.toLowerCase) + catalog.checkSchemasModifiedTimeAndReloadTables(tableIdentifier.getStorePath) val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer() try { locksToBeAcquired foreach { @@ -891,7 +895,7 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean, LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]") CarbonEnv.getInstance(sparkSession).carbonMetastore - .dropTable(catalog.storePath, identifier)(sparkSession) + .dropTable(tableIdentifier.getTablePath, identifier)(sparkSession) LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]") } catch { case ex: Exception => @@ -910,10 +914,11 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean, override def processData(sparkSession: SparkSession): Seq[Row] = { // delete the table folder val dbName = getDB.getDatabaseName(databaseNameOp, sparkSession) - val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "") val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore - val metadataFilePath = CarbonStorePath - .getCarbonTablePath(catalog.storePath, carbonTableIdentifier).getMetadataDirectoryPath + val tableIdentifier = + AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath, dbName, tableName) + val metadataFilePath = + CarbonStorePath.getCarbonTablePath(tableIdentifier).getMetadataDirectoryPath val fileType = FileFactory.getFileType(metadataFilePath) if (FileFactory.isFileExist(metadataFilePath, fileType)) { val file = FileFactory.getCarbonFile(metadataFilePath, fileType) http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index 549841b..2407054 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -22,22 +22,23 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.ArrayBuffer -import org.apache.spark.sql.{RuntimeConfig, SparkSession} +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, RuntimeConfig, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.datastore.impl.FileFactory.FileType import org.apache.carbondata.core.fileoperations.FileWriteOperation import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl +import org.apache.carbondata.core.metadata.schema +import org.apache.carbondata.core.metadata.schema.table import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants} -import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil} +import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.core.writer.ThriftWriter import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} @@ -62,7 +63,7 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) { } } -class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends CarbonMetaStore { +class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore { @transient val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog") @@ -77,7 +78,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca System.nanoTime() + "" } - lazy val metadata = loadMetadata(storePath, nextQueryId) + val metadata = MetaData(new ArrayBuffer[TableMeta]()) /** @@ -90,9 +91,22 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca override def createCarbonRelation(parameters: Map[String, String], absIdentifier: AbsoluteTableIdentifier, sparkSession: SparkSession): CarbonRelation = { - lookupRelation(TableIdentifier(absIdentifier.getCarbonTableIdentifier.getTableName, - Some(absIdentifier.getCarbonTableIdentifier.getDatabaseName)))(sparkSession) - .asInstanceOf[CarbonRelation] + val database = absIdentifier.getCarbonTableIdentifier.getDatabaseName + val tableName = absIdentifier.getCarbonTableIdentifier.getTableName + val tables = getTableFromMetadataCache(database, tableName) + tables match { + case Some(t) => + CarbonRelation(database, tableName, + CarbonSparkUtil.createSparkMeta(t.carbonTable), t) + case None => + readCarbonSchema(absIdentifier) match { + case Some(meta) => + CarbonRelation(database, tableName, + CarbonSparkUtil.createSparkMeta(meta.carbonTable), meta) + case None => + throw new NoSuchTableException(database, tableName) + } + } } def lookupRelation(dbName: Option[String], tableName: String) @@ -100,20 +114,21 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca lookupRelation(TableIdentifier(tableName, dbName))(sparkSession) } - def lookupRelation(tableIdentifier: TableIdentifier) + override def lookupRelation(tableIdentifier: TableIdentifier) (sparkSession: SparkSession): LogicalPlan = { - checkSchemasModifiedTimeAndReloadTables() val database = tableIdentifier.database.getOrElse( - sparkSession.catalog.currentDatabase - ) - val tables = getTableFromMetadata(database, tableIdentifier.table, true) - tables match { - case Some(t) => - CarbonRelation(database, tableIdentifier.table, - CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head) - case None => - throw new NoSuchTableException(database, tableIdentifier.table) + sparkSession.catalog.currentDatabase) + val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match { + case SubqueryAlias(_, + LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), + _) => + carbonDatasourceHadoopRelation.carbonRelation + case LogicalRelation( + carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) => + carbonDatasourceHadoopRelation.carbonRelation + case _ => throw new NoSuchTableException(database, tableIdentifier.table) } + relation } /** @@ -123,8 +138,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca * @param tableName * @return */ - def getTableFromMetadata(database: String, - tableName: String, readStore: Boolean = false): Option[TableMeta] = { + def getTableFromMetadataCache(database: String, tableName: String): Option[TableMeta] = { metadata.tablesMeta .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) && c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName)) @@ -136,99 +150,48 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca tableExists(TableIdentifier(table, databaseOp))(sparkSession) } - def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = { - checkSchemasModifiedTimeAndReloadTables() - val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) - val tables = metadata.tablesMeta.filter( - c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) && - c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table)) - tables.nonEmpty - } - - def loadMetadata(metadataPath: String, queryId: String): MetaData = { - val recorder = CarbonTimeStatisticsFactory.createDriverRecorder() - val statistic = new QueryStatistic() - // creating zookeeper instance once. - // if zookeeper is configured as carbon lock type. - val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null) - if (null != zookeeperurl) { - CarbonProperties.getInstance - .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl) - } - if (metadataPath == null) { - return null - } - // if no locktype is configured and store type is HDFS set HDFS lock as default - if (null == CarbonProperties.getInstance - .getProperty(CarbonCommonConstants.LOCK_TYPE) && - FileType.HDFS == FileFactory.getFileType(metadataPath)) { - CarbonProperties.getInstance - .addProperty(CarbonCommonConstants.LOCK_TYPE, - CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS - ) - LOGGER.info("Default lock type HDFSLOCK is configured") + override def tableExists(tableIdentifier: TableIdentifier) + (sparkSession: SparkSession): Boolean = { + try { + lookupRelation(tableIdentifier)(sparkSession) + } catch { + case e: Exception => + return false } - val fileType = FileFactory.getFileType(metadataPath) - val metaDataBuffer = new ArrayBuffer[TableMeta] - fillMetaData(metadataPath, fileType, metaDataBuffer) - updateSchemasUpdatedTime(readSchemaFileSystemTime("", "")) - statistic.addStatistics(QueryStatisticsConstants.LOAD_META, - System.currentTimeMillis()) - recorder.recordStatisticsForDriver(statistic, queryId) - MetaData(metaDataBuffer) + true } - private def fillMetaData(basePath: String, fileType: FileType, - metaDataBuffer: ArrayBuffer[TableMeta]): Unit = { - val databasePath = basePath // + "/schemas" - try { - if (FileFactory.isFileExist(databasePath, fileType)) { - val file = FileFactory.getCarbonFile(databasePath, fileType) - val databaseFolders = file.listFiles() - - databaseFolders.foreach(databaseFolder => { - if (databaseFolder.isDirectory) { - val dbName = databaseFolder.getName - val tableFolders = databaseFolder.listFiles() - - tableFolders.foreach(tableFolder => { - if (tableFolder.isDirectory) { - val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName, - tableFolder.getName, UUID.randomUUID().toString) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath, - carbonTableIdentifier) - val tableMetadataFile = carbonTablePath.getSchemaFilePath - - if (FileFactory.isFileExist(tableMetadataFile, fileType)) { - val tableName = tableFolder.getName - val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName - val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile) - val schemaConverter = new ThriftWrapperSchemaConverterImpl - val wrapperTableInfo = schemaConverter - .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath) - val schemaFilePath = CarbonStorePath - .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath - wrapperTableInfo.setStorePath(storePath) - wrapperTableInfo - .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath)) - CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) - val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) - metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath, - carbonTable) - } - } - }) - } - }) - } else { - // Create folders and files. - FileFactory.mkdirs(databasePath, fileType) - } - } catch { - case s: java.io.FileNotFoundException => - s.printStackTrace() - // Create folders and files. - FileFactory.mkdirs(databasePath, fileType) + private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[TableMeta] = { + val dbName = identifier.getCarbonTableIdentifier.getDatabaseName + val tableName = identifier.getCarbonTableIdentifier.getTableName + val storePath = identifier.getStorePath + val carbonTableIdentifier = new CarbonTableIdentifier(dbName.toLowerCase(), + tableName.toLowerCase(), UUID.randomUUID().toString) + val carbonTablePath = + CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier) + val tableMetadataFile = carbonTablePath.getSchemaFilePath + val fileType = FileFactory.getFileType(tableMetadataFile) + if (FileFactory.isFileExist(tableMetadataFile, fileType)) { + val tableUniqueName = dbName + "_" + tableName + val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile) + val schemaConverter = new ThriftWrapperSchemaConverterImpl + val wrapperTableInfo = schemaConverter + .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath) + val schemaFilePath = CarbonStorePath + .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath + wrapperTableInfo.setStorePath(storePath) + wrapperTableInfo + .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath)) + CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) + val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) + val tableMeta = new TableMeta(carbonTable.getCarbonTableIdentifier, + identifier.getStorePath, + identifier.getTablePath, + carbonTable) + metadata.tablesMeta += tableMeta + Some(tableMeta) + } else { + None } } @@ -238,15 +201,15 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca * @param newTableIdentifier * @param thriftTableInfo * @param schemaEvolutionEntry - * @param carbonStorePath + * @param tablePath * @param sparkSession */ def updateTableSchema(newTableIdentifier: CarbonTableIdentifier, oldTableIdentifier: CarbonTableIdentifier, thriftTableInfo: org.apache.carbondata.format.TableInfo, schemaEvolutionEntry: SchemaEvolutionEntry, - carbonStorePath: String) - (sparkSession: SparkSession): String = { + tablePath: String) (sparkSession: SparkSession): String = { + val absoluteTableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath) val schemaConverter = new ThriftWrapperSchemaConverterImpl if (schemaEvolutionEntry != null) { thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry) @@ -255,11 +218,19 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca .fromExternalToWrapperTableInfo(thriftTableInfo, newTableIdentifier.getDatabaseName, newTableIdentifier.getTableName, - carbonStorePath) - createSchemaThriftFile(wrapperTableInfo, + absoluteTableIdentifier.getStorePath) + val identifier = + new CarbonTableIdentifier(newTableIdentifier.getDatabaseName, + newTableIdentifier.getTableName, + wrapperTableInfo.getFactTable.getTableId) + val path = createSchemaThriftFile(wrapperTableInfo, thriftTableInfo, + identifier) + addTableCache(wrapperTableInfo, + AbsoluteTableIdentifier.from(absoluteTableIdentifier.getStorePath, newTableIdentifier.getDatabaseName, - newTableIdentifier.getTableName)(sparkSession) + newTableIdentifier.getTableName)) + path } /** @@ -267,25 +238,27 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca * * @param carbonTableIdentifier * @param thriftTableInfo - * @param carbonStorePath + * @param tablePath * @param sparkSession */ def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier, thriftTableInfo: org.apache.carbondata.format.TableInfo, - carbonStorePath: String) - (sparkSession: SparkSession): String = { + tablePath: String)(sparkSession: SparkSession): String = { + val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath) val schemaConverter = new ThriftWrapperSchemaConverterImpl val wrapperTableInfo = schemaConverter .fromExternalToWrapperTableInfo(thriftTableInfo, carbonTableIdentifier.getDatabaseName, carbonTableIdentifier.getTableName, - carbonStorePath) + tableIdentifier.getStorePath) val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history evolutionEntries.remove(evolutionEntries.size() - 1) - createSchemaThriftFile(wrapperTableInfo, + wrapperTableInfo.setStorePath(tableIdentifier.getStorePath) + val path = createSchemaThriftFile(wrapperTableInfo, thriftTableInfo, - carbonTableIdentifier.getDatabaseName, - carbonTableIdentifier.getTableName)(sparkSession) + tableIdentifier.getCarbonTableIdentifier) + addTableCache(wrapperTableInfo, tableIdentifier) + path } @@ -296,24 +269,38 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca * Load CarbonTable from wrapper tableInfo * */ - def createTableFromThrift( - tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo, - dbName: String, tableName: String)(sparkSession: SparkSession): (String, String) = { - if (tableExists(tableName, Some(dbName))(sparkSession)) { - sys.error(s"Table [$tableName] already exists under Database [$dbName]") - } - val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime) + def saveToDisk(tableInfo: schema.table.TableInfo, tablePath: String) { val schemaConverter = new ThriftWrapperSchemaConverterImpl + val dbName = tableInfo.getDatabaseName + val tableName = tableInfo.getFactTable.getTableName val thriftTableInfo = schemaConverter .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName) - thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history - .add(schemaEvolutionEntry) - val carbonTablePath = createSchemaThriftFile(tableInfo, + val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) + tableInfo.setStorePath(identifier.getStorePath) + createSchemaThriftFile(tableInfo, thriftTableInfo, - dbName, - tableName)(sparkSession) + identifier.getCarbonTableIdentifier) LOGGER.info(s"Table $tableName for Database $dbName created successfully.") - (carbonTablePath, "") + } + + /** + * Generates schema string from TableInfo + */ + override def generateTableSchemaString(tableInfo: schema.table.TableInfo, + tablePath: String): String = { + val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier) + val schemaMetadataPath = + CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath) + tableInfo.setMetaDataFilepath(schemaMetadataPath) + tableInfo.setStorePath(tableIdentifier.getStorePath) + val schemaEvolutionEntry = new schema.SchemaEvolutionEntry + schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime) + tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry) + removeTableFromMetadata(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName) + CarbonMetadata.getInstance().loadTableMetadata(tableInfo) + addTableCache(tableInfo, tableIdentifier) + CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",") } /** @@ -321,23 +308,16 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca * * @param tableInfo * @param thriftTableInfo - * @param dbName - * @param tableName - * @param sparkSession * @return */ - private def createSchemaThriftFile( - tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo, - thriftTableInfo: org.apache.carbondata.format.TableInfo, - dbName: String, tableName: String) - (sparkSession: SparkSession): String = { - val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, - tableInfo.getFactTable.getTableId) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier) + private def createSchemaThriftFile(tableInfo: schema.table.TableInfo, + thriftTableInfo: TableInfo, + carbonTableIdentifier: CarbonTableIdentifier): String = { + val carbonTablePath = CarbonStorePath. + getCarbonTablePath(tableInfo.getStorePath, carbonTableIdentifier) val schemaFilePath = carbonTablePath.getSchemaFilePath val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath) tableInfo.setMetaDataFilepath(schemaMetadataPath) - tableInfo.setStorePath(storePath) val fileType = FileFactory.getFileType(schemaMetadataPath) if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) { FileFactory.mkdirs(schemaMetadataPath, fileType) @@ -346,13 +326,20 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca thriftWriter.open(FileWriteOperation.OVERWRITE) thriftWriter.write(thriftTableInfo) thriftWriter.close() - removeTableFromMetadata(dbName, tableName) + updateSchemasUpdatedTime(touchSchemaFileSystemTime(tableInfo.getStorePath)) + carbonTablePath.getPath + } + + protected def addTableCache(tableInfo: table.TableInfo, + absoluteTableIdentifier: AbsoluteTableIdentifier) = { + val identifier = absoluteTableIdentifier.getCarbonTableIdentifier + CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName) + removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName) CarbonMetadata.getInstance().loadTableMetadata(tableInfo) - val tableMeta = new TableMeta(carbonTableIdentifier, storePath, - CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)) + val tableMeta = new TableMeta(identifier, absoluteTableIdentifier.getStorePath, + absoluteTableIdentifier.getTablePath, + CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName)) metadata.tablesMeta += tableMeta - updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName)) - carbonTablePath.getPath } /** @@ -362,13 +349,15 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca * @param tableName */ def removeTableFromMetadata(dbName: String, tableName: String): Unit = { - val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, tableName) + val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadataCache(dbName, tableName) metadataToBeRemoved match { case Some(tableMeta) => metadata.tablesMeta -= tableMeta CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName) case None => - LOGGER.debug(s"No entry for table $tableName in database $dbName") + if (LOGGER.isDebugEnabled) { + LOGGER.debug(s"No entry for table $tableName in database $dbName") + } } } @@ -402,23 +391,23 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = { - val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) - val tableName = tableIdentifier.table.toLowerCase - - val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath, - new CarbonTableIdentifier(dbName, tableName, "")).getPath - - val fileType = FileFactory.getFileType(tablePath) - FileFactory.isFileExist(tablePath, fileType) + try { + val tablePath = lookupRelation(tableIdentifier)(sparkSession). + asInstanceOf[CarbonRelation].tableMeta.tablePath + val fileType = FileFactory.getFileType(tablePath) + FileFactory.isFileExist(tablePath, fileType) + } catch { + case e: Exception => + false + } } - def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier) + def dropTable(tablePath: String, tableIdentifier: TableIdentifier) (sparkSession: SparkSession) { val dbName = tableIdentifier.database.get val tableName = tableIdentifier.table - - val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath, - new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath + val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) + val metadataFilePath = CarbonStorePath.getCarbonTablePath(identifier).getMetadataDirectoryPath val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) if (null != carbonTable) { // clear driver B-tree and dictionary cache @@ -429,26 +418,18 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca if (FileFactory.isFileExist(metadataFilePath, fileType)) { // while drop we should refresh the schema modified time so that if any thing has changed // in the other beeline need to update. - checkSchemasModifiedTimeAndReloadTables - - val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, - tableIdentifier.table) - metadataToBeRemoved match { - case Some(tableMeta) => - metadata.tablesMeta -= tableMeta - CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName) - updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName)) - case None => - LOGGER.info(s"Metadata does not contain entry for table $tableName in database $dbName") - } + checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath) + + removeTableFromMetadata(dbName, tableName) + updateSchemasUpdatedTime(touchSchemaFileSystemTime(identifier.getStorePath)) CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession) // discard cached table info in cachedDataSourceTables sparkSession.sessionState.catalog.refreshTable(tableIdentifier) } } - private def getTimestampFileAndType(databaseName: String, tableName: String) = { - val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE + private def getTimestampFileAndType(basePath: String) = { + val timestampFile = basePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE val timestampFileType = FileFactory.getFileType(timestampFile) (timestampFile, timestampFileType) } @@ -462,37 +443,20 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca tableModifiedTimeStore.put("default", timeStamp) } - def updateAndTouchSchemasUpdatedTime(databaseName: String, tableName: String) { - updateSchemasUpdatedTime(touchSchemaFileSystemTime(databaseName, tableName)) + def updateAndTouchSchemasUpdatedTime(basePath: String) { + updateSchemasUpdatedTime(touchSchemaFileSystemTime(basePath)) } - /** - * This method will read the timestamp of empty schema file - * - * @param databaseName - * @param tableName - * @return - */ - private def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = { - val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName) - if (FileFactory.isFileExist(timestampFile, timestampFileType)) { - FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime - } else { - System.currentTimeMillis() - } - } /** * This method will check and create an empty schema timestamp file * - * @param databaseName - * @param tableName * @return */ - private def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = { - val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName) + private def touchSchemaFileSystemTime(basePath: String): Long = { + val (timestampFile, timestampFileType) = getTimestampFileAndType(basePath) if (!FileFactory.isFileExist(timestampFile, timestampFileType)) { - LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName") + LOGGER.audit(s"Creating timestamp file for $basePath") FileFactory.createNewFile(timestampFile, timestampFileType) } val systemTime = System.currentTimeMillis() @@ -501,8 +465,9 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca systemTime } - def checkSchemasModifiedTimeAndReloadTables() { - val (timestampFile, timestampFileType) = getTimestampFileAndType("", "") + def checkSchemasModifiedTimeAndReloadTables(storePath: String) { + val (timestampFile, timestampFileType) = + getTimestampFileAndType(storePath) if (FileFactory.isFileExist(timestampFile, timestampFileType)) { if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType). getLastModifiedTime == @@ -513,7 +478,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca } private def refreshCache() { - metadata.tablesMeta = loadMetadata(storePath, nextQueryId).tablesMeta + metadata.tablesMeta.clear() } override def isReadFromHiveMetaStore: Boolean = false http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala index a8f92ce..c328130 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala @@ -16,21 +16,15 @@ */ package org.apache.spark.sql.hive -import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConverters._ -import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, RuntimeConfig, SparkSession} +import org.apache.spark.sql.{RuntimeConfig, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} -import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree -import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl -import org.apache.carbondata.core.metadata.schema -import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} +import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.format @@ -41,12 +35,10 @@ import org.apache.carbondata.spark.util.CarbonSparkUtil /** * Metastore to store carbonschema in hive */ -class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String) - extends CarbonFileMetastore(conf, storePath) { +class CarbonHiveMetaStore(conf: RuntimeConfig) extends CarbonFileMetastore(conf) { override def isReadFromHiveMetaStore: Boolean = true - /** * Create spark session from paramters. * @@ -61,7 +53,7 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String) if (info != null) { val table = CarbonTable.buildFromTableInfo(info) val meta = new TableMeta(table.getCarbonTableIdentifier, - table.getStorePath, table) + absIdentifier.getStorePath, absIdentifier.getTablePath, table) CarbonRelation(info.getDatabaseName, info.getFactTable.getTableName, CarbonSparkUtil.createSparkMeta(table), meta) } else { @@ -69,111 +61,30 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String) } } - override def lookupRelation(tableIdentifier: TableIdentifier) - (sparkSession: SparkSession): LogicalPlan = { - val database = tableIdentifier.database.getOrElse( - sparkSession.catalog.currentDatabase) - val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match { - case SubqueryAlias(_, - LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), - _) => - carbonDatasourceHadoopRelation.carbonRelation - case LogicalRelation( - carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) => - carbonDatasourceHadoopRelation.carbonRelation - case _ => throw new NoSuchTableException(database, tableIdentifier.table) - } - relation - } - - /** - * This method will search for a table in the catalog metadata - * - * @param database - * @param tableName - * @return - */ - override def getTableFromMetadata(database: String, - tableName: String, - readStore: Boolean): Option[TableMeta] = { - if (!readStore) { - None - } else { - super.getTableFromMetadata(database, tableName, readStore) - } - } - - override def tableExists(tableIdentifier: TableIdentifier) - (sparkSession: SparkSession): Boolean = { - try { - lookupRelation(tableIdentifier)(sparkSession) - } catch { - case e: Exception => - return false - } - true - } - - override def loadMetadata(metadataPath: String, - queryId: String): MetaData = { - MetaData(new ArrayBuffer[TableMeta]) - } - - - /** - * - * Prepare Thrift Schema from wrapper TableInfo and write to Schema file. - * Load CarbonTable from wrapper tableInfo - * - */ - override def createTableFromThrift(tableInfo: TableInfo, dbName: String, - tableName: String)(sparkSession: SparkSession): (String, String) = { - val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, - tableInfo.getFactTable.getTableId) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier) - val schemaMetadataPath = - CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath) - tableInfo.setMetaDataFilepath(schemaMetadataPath) - tableInfo.setStorePath(storePath) - val schemaEvolutionEntry = new schema.SchemaEvolutionEntry - schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime) - tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry) - removeTableFromMetadata(dbName, tableName) - CarbonMetadata.getInstance().loadTableMetadata(tableInfo) - (carbonTablePath.getPath, CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",")) - } - - /** - * This method will remove the table meta from catalog metadata array - * - * @param dbName - * @param tableName - */ - override def removeTableFromMetadata(dbName: String, - tableName: String): Unit = { - // do nothing - } override def isTablePathExists(tableIdentifier: TableIdentifier) (sparkSession: SparkSession): Boolean = { tableExists(tableIdentifier)(sparkSession) } - override def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier) + override def dropTable(tablePath: String, tableIdentifier: TableIdentifier) (sparkSession: SparkSession): Unit = { val dbName = tableIdentifier.database.get val tableName = tableIdentifier.table + val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) if (null != carbonTable) { // clear driver B-tree and dictionary cache ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable) } + checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath) + removeTableFromMetadata(dbName, tableName) CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession) // discard cached table info in cachedDataSourceTables sparkSession.sessionState.catalog.refreshTable(tableIdentifier) } - override def checkSchemasModifiedTimeAndReloadTables(): Unit = { + override def checkSchemasModifiedTimeAndReloadTables(storePath: String) { // do nothing now } @@ -200,23 +111,23 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String) * @param newTableIdentifier * @param thriftTableInfo * @param schemaEvolutionEntry - * @param carbonStorePath * @param sparkSession */ override def updateTableSchema(newTableIdentifier: CarbonTableIdentifier, oldTableIdentifier: CarbonTableIdentifier, thriftTableInfo: format.TableInfo, schemaEvolutionEntry: SchemaEvolutionEntry, - carbonStorePath: String) + tablePath: String) (sparkSession: SparkSession): String = { val schemaConverter = new ThriftWrapperSchemaConverterImpl + val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) if (schemaEvolutionEntry != null) { thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry) } updateHiveMetaStore(newTableIdentifier, oldTableIdentifier, thriftTableInfo, - carbonStorePath, + identifier.getStorePath, sparkSession, schemaConverter) } @@ -232,23 +143,20 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String) newTableIdentifier.getDatabaseName, newTableIdentifier.getTableName, carbonStorePath) - wrapperTableInfo.setStorePath(storePath) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, newTableIdentifier) + wrapperTableInfo.setStorePath(carbonStorePath) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier) val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath) wrapperTableInfo.setMetaDataFilepath(schemaMetadataPath) val dbName = oldTableIdentifier.getDatabaseName val tableName = oldTableIdentifier.getTableName - val carbonUpdatedIdentifier = new CarbonTableIdentifier(dbName, tableName, - wrapperTableInfo.getFactTable.getTableId) val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "") sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive.runSqlHive( s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)") sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString) - removeTableFromMetadata(wrapperTableInfo.getDatabaseName, - wrapperTableInfo.getFactTable.getTableName) + removeTableFromMetadata(dbName, tableName) CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) - CarbonStorePath.getCarbonTablePath(storePath, carbonUpdatedIdentifier).getPath + CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier).getPath } /** @@ -256,21 +164,23 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String) * * @param carbonTableIdentifier * @param thriftTableInfo - * @param carbonStorePath * @param sparkSession */ override def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier, thriftTableInfo: format.TableInfo, - carbonStorePath: String) + tablePath: String) (sparkSession: SparkSession): String = { val schemaConverter = new ThriftWrapperSchemaConverterImpl + val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history evolutionEntries.remove(evolutionEntries.size() - 1) updateHiveMetaStore(carbonTableIdentifier, carbonTableIdentifier, thriftTableInfo, - carbonStorePath, + identifier.getStorePath, sparkSession, schemaConverter) } + + }
