[Compatibility] Added changes for backward compatibility This PR will fix the issues related to old version and new version compatibility. Issues fixed: 1. Schema file name was different in one of the previous versions. 2. Bucket number was not supported in the previous versions. 3. Table parameters were stored in lower case while in the current version we are reading in camel case.
This closes #1747 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/02eefca1 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/02eefca1 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/02eefca1 Branch: refs/heads/branch-1.3 Commit: 02eefca15862a8667d53e247272afb68efe7af60 Parents: 1b224a4 Author: kunal642 <[email protected]> Authored: Mon Nov 20 20:36:54 2017 +0530 Committer: manishgupta88 <[email protected]> Committed: Fri Feb 2 12:08:50 2018 +0530 ---------------------------------------------------------------------- .../core/util/path/CarbonTablePath.java | 22 ++++++++- .../carbondata/spark/util/CarbonScalaUtil.scala | 47 ++++++++++++++++++++ .../org/apache/spark/sql/CarbonSource.scala | 33 ++++++++------ 3 files changed, 87 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/02eefca1/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java index fab6289..d8c64c4 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java +++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java @@ -233,7 +233,7 @@ public class CarbonTablePath extends Path { * @return absolute path of schema file */ public String getSchemaFilePath() { - return getMetaDataDir() + File.separator + SCHEMA_FILE; + return getActualSchemaFilePath(tablePath); } /** @@ -242,7 +242,22 @@ public class CarbonTablePath extends Path { * @return schema file path */ public static String getSchemaFilePath(String tablePath) { - return tablePath + File.separator + METADATA_DIR + File.separator + SCHEMA_FILE; + return getActualSchemaFilePath(tablePath); + } + + private static String getActualSchemaFilePath(String tablePath) { + String metaPath = tablePath + CarbonCommonConstants.FILE_SEPARATOR + METADATA_DIR; + CarbonFile carbonFile = FileFactory.getCarbonFile(metaPath); + CarbonFile[] schemaFile = carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().startsWith(SCHEMA_FILE); + } + }); + if (schemaFile != null && schemaFile.length > 0) { + return schemaFile[0].getAbsolutePath(); + } else { + return metaPath + CarbonCommonConstants.FILE_SEPARATOR + SCHEMA_FILE; + } } /** @@ -351,6 +366,9 @@ public class CarbonTablePath extends Path { private static String getCarbonIndexFileName(String taskNo, int bucketNumber, String factUpdatedtimeStamp) { + if (bucketNumber == -1) { + return taskNo + "-" + factUpdatedtimeStamp + INDEX_FILE_EXT; + } return taskNo + "-" + bucketNumber + "-" + factUpdatedtimeStamp + INDEX_FILE_EXT; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/02eefca1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala index 86d25b4..262adf2 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala @@ -404,4 +404,51 @@ object CarbonScalaUtil { }) otherFields } + + /** + * If the table is from an old store then the table parameters are in lowercase. In the current + * code we are reading the parameters as camel case. + * This method will convert all the schema parts to camel case + * + * @param parameters + * @return + */ + def getDeserializedParameters(parameters: Map[String, String]): Map[String, String] = { + val keyParts = parameters.getOrElse("spark.sql.sources.options.keys.numparts", "0").toInt + if (keyParts == 0) { + parameters + } else { + val keyStr = 0 until keyParts map { + i => parameters(s"spark.sql.sources.options.keys.part.$i") + } + val finalProperties = scala.collection.mutable.Map.empty[String, String] + keyStr foreach { + key => + var value = "" + for (numValues <- 0 until parameters(key.toLowerCase() + ".numparts").toInt) { + value += parameters(key.toLowerCase() + ".part" + numValues) + } + finalProperties.put(key, value) + } + // Database name would be extracted from the parameter first. There can be a scenario where + // the dbName is not written to the old schema therefore to be on a safer side we are + // extracting dbName from tableName if it exists. + val dbAndTableName = finalProperties("tableName").split(".") + if (dbAndTableName.length > 1) { + finalProperties.put("dbName", dbAndTableName(0)) + finalProperties.put("tableName", dbAndTableName(1)) + } else { + finalProperties.put("tableName", dbAndTableName(0)) + } + // Overriding the tablePath in case tablepath already exists. This will happen when old + // table schema is updated by the new code then both `path` and `tablepath` will exist. In + // this case use tablepath + parameters.get("tablepath") match { + case Some(tablePath) => finalProperties.put("tablePath", tablePath) + case None => + } + finalProperties.toMap + } + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/02eefca1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index e61b636..7d70534 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -42,6 +42,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.spark.CarbonOption import org.apache.carbondata.spark.exception.MalformedCarbonCommandException +import org.apache.carbondata.spark.util.CarbonScalaUtil import org.apache.carbondata.streaming.{CarbonStreamException, StreamSinkFactory} /** @@ -59,16 +60,20 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider CarbonEnv.getInstance(sqlContext.sparkSession) // if path is provided we can directly create Hadoop relation. \ // Otherwise create datasource relation - parameters.get("tablePath") match { + val newParameters = CarbonScalaUtil.getDeserializedParameters(parameters) + newParameters.get("tablePath") match { case Some(path) => CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(path), - parameters, + newParameters, None) case _ => - val options = new CarbonOption(parameters) + val options = new CarbonOption(newParameters) val tablePath = CarbonEnv.getTablePath(options.dbName, options.tableName)(sqlContext.sparkSession) - CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(tablePath), parameters, None) + CarbonDatasourceHadoopRelation(sqlContext.sparkSession, + Array(tablePath), + newParameters, + None) } } @@ -79,13 +84,14 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider parameters: Map[String, String], data: DataFrame): BaseRelation = { CarbonEnv.getInstance(sqlContext.sparkSession) + val newParameters = CarbonScalaUtil.getDeserializedParameters(parameters) // User should not specify path since only one store is supported in carbon currently, // after we support multi-store, we can remove this limitation - require(!parameters.contains("path"), "'path' should not be specified, " + + require(!newParameters.contains("path"), "'path' should not be specified, " + "the path to store carbon file is the 'storePath' " + "specified when creating CarbonContext") - val options = new CarbonOption(parameters) + val options = new CarbonOption(newParameters) val tablePath = new Path( CarbonEnv.getTablePath(options.dbName, options.tableName)(sqlContext.sparkSession)) val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) @@ -108,12 +114,12 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider if (doSave) { // save data when the save mode is Overwrite. - new CarbonDataFrameWriter(sqlContext, data).saveAsCarbonFile(parameters) + new CarbonDataFrameWriter(sqlContext, data).saveAsCarbonFile(newParameters) } else if (doAppend) { - new CarbonDataFrameWriter(sqlContext, data).appendToCarbonFile(parameters) + new CarbonDataFrameWriter(sqlContext, data).appendToCarbonFile(newParameters) } - createRelation(sqlContext, parameters, data.schema) + createRelation(sqlContext, newParameters, data.schema) } // called by DDL operation with a USING clause @@ -123,9 +129,10 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider dataSchema: StructType): BaseRelation = { CarbonEnv.getInstance(sqlContext.sparkSession) addLateDecodeOptimization(sqlContext.sparkSession) + val newParameters = CarbonScalaUtil.getDeserializedParameters(parameters) val dbName: String = - CarbonEnv.getDatabaseName(parameters.get("dbName"))(sqlContext.sparkSession) - val tableOption: Option[String] = parameters.get("tableName") + CarbonEnv.getDatabaseName(newParameters.get("dbName"))(sqlContext.sparkSession) + val tableOption: Option[String] = newParameters.get("tableName") if (tableOption.isEmpty) { CarbonException.analysisException("Table creation failed. Table name is not specified") } @@ -136,9 +143,9 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider } val (path, updatedParams) = if (sqlContext.sparkSession.sessionState.catalog.listTables(dbName) .exists(_.table.equalsIgnoreCase(tableName))) { - getPathForTable(sqlContext.sparkSession, dbName, tableName, parameters) + getPathForTable(sqlContext.sparkSession, dbName, tableName, newParameters) } else { - createTableIfNotExists(sqlContext.sparkSession, parameters, dataSchema) + createTableIfNotExists(sqlContext.sparkSession, newParameters, dataSchema) } CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(path), updatedParams,
