[CARBONDATA-1464] Fixed SparkSessionExample Not able to create table from SparkSession because of missing tablePath. This PR generates tablePath from storelocation.
This closes #1342 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2d75c466 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2d75c466 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2d75c466 Branch: refs/heads/streaming_ingest Commit: 2d75c4661583d9765c11874ffc9dd804154b74ea Parents: cd2332e Author: Ravindra Pesala <ravi.pes...@gmail.com> Authored: Fri Sep 8 21:20:18 2017 +0530 Committer: chenliang613 <chenliang...@apache.org> Committed: Sat Sep 9 07:58:02 2017 +0800 ---------------------------------------------------------------------- .../org/apache/spark/sql/CarbonSource.scala | 89 +++++++++++--------- 1 file changed, 48 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d75c466/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index bec163b..1b021b0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -25,8 +25,8 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.execution.CarbonLateDecodeStrategy -import org.apache.spark.sql.execution.command.{CreateTable, TableModel, TableNewProcessor} -import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.execution.command.{TableModel, TableNewProcessor} +import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonRelation} import org.apache.spark.sql.optimizer.CarbonLateDecodeRule import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.spark.sql.sources._ @@ -34,7 +34,7 @@ import org.apache.spark.sql.types.StructType import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.core.metadata.schema +import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry import org.apache.carbondata.core.metadata.schema.table.TableInfo import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} @@ -130,14 +130,14 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider if (tableName.contains(" ")) { sys.error("Table creation failed. Table name cannot contain blank space") } - val path = if (sqlContext.sparkSession.sessionState.catalog.listTables(dbName) + val (path, updatedParams) = if (sqlContext.sparkSession.sessionState.catalog.listTables(dbName) .exists(_.table.equalsIgnoreCase(tableName))) { getPathForTable(sqlContext.sparkSession, dbName, tableName, parameters) } else { createTableIfNotExists(sqlContext.sparkSession, parameters, dataSchema) } - CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(path), parameters, + CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(path), updatedParams, Option(dataSchema)) } @@ -162,17 +162,14 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider } else { CarbonEnv.getInstance(sparkSession).carbonMetastore .lookupRelation(Option(dbName), tableName)(sparkSession) - CarbonEnv.getInstance(sparkSession).storePath + s"/$dbName/$tableName" + (CarbonEnv.getInstance(sparkSession).storePath + s"/$dbName/$tableName", parameters) } } catch { case ex: NoSuchTableException => - val cm: TableModel = CarbonSource.createTableInfoFromParams( - parameters, - dataSchema, - dbName, - tableName) - CreateTable(cm, false).run(sparkSession) - getPathForTable(sparkSession, dbName, tableName, parameters) + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val updatedParams = + CarbonSource.updateAndCreateTable(dataSchema, sparkSession, metaStore, parameters) + getPathForTable(sparkSession, dbName, tableName, updatedParams) case ex: Exception => throw new Exception("do not have dbname and tablename for carbon table", ex) } @@ -187,7 +184,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider * @return */ private def getPathForTable(sparkSession: SparkSession, dbName: String, - tableName : String, parameters: Map[String, String]): String = { + tableName : String, parameters: Map[String, String]): (String, Map[String, String]) = { if (StringUtils.isBlank(tableName)) { throw new MalformedCarbonCommandException("The Specified Table Name is Blank") @@ -197,11 +194,13 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider } try { if (parameters.contains("tablePath")) { - parameters.get("tablePath").get + (parameters("tablePath"), parameters) + } else if (!sparkSession.isInstanceOf[CarbonSession]) { + (CarbonEnv.getInstance(sparkSession).storePath + "/" + dbName + "/" + tableName, parameters) } else { val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] - relation.tableMeta.tablePath + (relation.tableMeta.tablePath, parameters) } } catch { case ex: Exception => @@ -239,32 +238,9 @@ object CarbonSource { val storageFormat = tableDesc.storage val properties = storageFormat.properties if (!properties.contains("carbonSchemaPartsNo")) { - val dbName: String = properties.getOrElse("dbName", - CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase - val tableName: String = properties.getOrElse("tableName", "").toLowerCase - val model = createTableInfoFromParams(properties, tableDesc.schema, dbName, tableName) - val tableInfo: TableInfo = TableNewProcessor(model) - val tablePath = CarbonEnv.getInstance(sparkSession).storePath + "/" + dbName + "/" + tableName - val schemaEvolutionEntry = new schema.SchemaEvolutionEntry - schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime) - tableInfo.getFactTable.getSchemaEvalution. - getSchemaEvolutionEntryList.add(schemaEvolutionEntry) - val map = if (metaStore.isReadFromHiveMetaStore) { - val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier) - val schemaMetadataPath = - CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath) - tableInfo.setMetaDataFilepath(schemaMetadataPath) - tableInfo.setStorePath(tableIdentifier.getStorePath) - CarbonUtil.convertToMultiStringMap(tableInfo) - } else { - metaStore.saveToDisk(tableInfo, tablePath) - new java.util.HashMap[String, String]() - } - properties.foreach(e => map.put(e._1, e._2)) - map.put("tablePath", tablePath) + val map = updateAndCreateTable(tableDesc.schema, sparkSession, metaStore, properties) // updating params - val updatedFormat = storageFormat.copy(properties = map.asScala.toMap) + val updatedFormat = storageFormat.copy(properties = map) tableDesc.copy(storage = updatedFormat) } else { val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava) @@ -280,4 +256,35 @@ object CarbonSource { } } } + + def updateAndCreateTable(dataSchema: StructType, + sparkSession: SparkSession, + metaStore: CarbonMetaStore, + properties: Map[String, String]): Map[String, String] = { + val dbName: String = properties.getOrElse("dbName", + CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase + val tableName: String = properties.getOrElse("tableName", "").toLowerCase + val model = createTableInfoFromParams(properties, dataSchema, dbName, tableName) + val tableInfo: TableInfo = TableNewProcessor(model) + val tablePath = CarbonEnv.getInstance(sparkSession).storePath + "/" + dbName + "/" + tableName + val schemaEvolutionEntry = new SchemaEvolutionEntry + schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime) + tableInfo.getFactTable.getSchemaEvalution. + getSchemaEvolutionEntryList.add(schemaEvolutionEntry) + val map = if (metaStore.isReadFromHiveMetaStore) { + val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier) + val schemaMetadataPath = + CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath) + tableInfo.setMetaDataFilepath(schemaMetadataPath) + tableInfo.setStorePath(tableIdentifier.getStorePath) + CarbonUtil.convertToMultiStringMap(tableInfo) + } else { + metaStore.saveToDisk(tableInfo, tablePath) + new java.util.HashMap[String, String]() + } + properties.foreach(e => map.put(e._1, e._2)) + map.put("tablePath", tablePath) + map.asScala.toMap + } }