[CARBONDATA-1464] Fixed SparkSessionExample

Not able to create table from SparkSession because of missing tablePath. This 
PR generates tablePath from storelocation.

This closes #1342


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2d75c466
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2d75c466
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2d75c466

Branch: refs/heads/streaming_ingest
Commit: 2d75c4661583d9765c11874ffc9dd804154b74ea
Parents: cd2332e
Author: Ravindra Pesala <ravi.pes...@gmail.com>
Authored: Fri Sep 8 21:20:18 2017 +0530
Committer: chenliang613 <chenliang...@apache.org>
Committed: Sat Sep 9 07:58:02 2017 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/CarbonSource.scala     | 89 +++++++++++---------
 1 file changed, 48 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d75c466/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index bec163b..1b021b0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -25,8 +25,8 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
-import org.apache.spark.sql.execution.command.{CreateTable, TableModel, 
TableNewProcessor}
-import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.execution.command.{TableModel, TableNewProcessor}
+import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonRelation}
 import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.sql.sources._
@@ -34,7 +34,7 @@ import org.apache.spark.sql.types.StructType
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
@@ -130,14 +130,14 @@ class CarbonSource extends CreatableRelationProvider with 
RelationProvider
     if (tableName.contains(" ")) {
       sys.error("Table creation failed. Table name cannot contain blank space")
     }
-    val path = if 
(sqlContext.sparkSession.sessionState.catalog.listTables(dbName)
+    val (path, updatedParams) = if 
(sqlContext.sparkSession.sessionState.catalog.listTables(dbName)
       .exists(_.table.equalsIgnoreCase(tableName))) {
         getPathForTable(sqlContext.sparkSession, dbName, tableName, parameters)
     } else {
         createTableIfNotExists(sqlContext.sparkSession, parameters, dataSchema)
     }
 
-    CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(path), 
parameters,
+    CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(path), 
updatedParams,
       Option(dataSchema))
   }
 
@@ -162,17 +162,14 @@ class CarbonSource extends CreatableRelationProvider with 
RelationProvider
       } else {
         CarbonEnv.getInstance(sparkSession).carbonMetastore
           .lookupRelation(Option(dbName), tableName)(sparkSession)
-        CarbonEnv.getInstance(sparkSession).storePath + s"/$dbName/$tableName"
+        (CarbonEnv.getInstance(sparkSession).storePath + 
s"/$dbName/$tableName", parameters)
       }
     } catch {
       case ex: NoSuchTableException =>
-        val cm: TableModel = CarbonSource.createTableInfoFromParams(
-          parameters,
-          dataSchema,
-          dbName,
-          tableName)
-        CreateTable(cm, false).run(sparkSession)
-        getPathForTable(sparkSession, dbName, tableName, parameters)
+        val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+        val updatedParams =
+          CarbonSource.updateAndCreateTable(dataSchema, sparkSession, 
metaStore, parameters)
+        getPathForTable(sparkSession, dbName, tableName, updatedParams)
       case ex: Exception =>
         throw new Exception("do not have dbname and tablename for carbon 
table", ex)
     }
@@ -187,7 +184,7 @@ class CarbonSource extends CreatableRelationProvider with 
RelationProvider
    * @return
    */
   private def getPathForTable(sparkSession: SparkSession, dbName: String,
-      tableName : String, parameters: Map[String, String]): String = {
+      tableName : String, parameters: Map[String, String]): (String, 
Map[String, String]) = {
 
     if (StringUtils.isBlank(tableName)) {
       throw new MalformedCarbonCommandException("The Specified Table Name is 
Blank")
@@ -197,11 +194,13 @@ class CarbonSource extends CreatableRelationProvider with 
RelationProvider
     }
     try {
       if (parameters.contains("tablePath")) {
-        parameters.get("tablePath").get
+        (parameters("tablePath"), parameters)
+      } else if (!sparkSession.isInstanceOf[CarbonSession]) {
+        (CarbonEnv.getInstance(sparkSession).storePath + "/" + dbName + "/" + 
tableName, parameters)
       } else {
         val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
           .lookupRelation(Option(dbName), 
tableName)(sparkSession).asInstanceOf[CarbonRelation]
-        relation.tableMeta.tablePath
+        (relation.tableMeta.tablePath, parameters)
       }
     } catch {
       case ex: Exception =>
@@ -239,32 +238,9 @@ object CarbonSource {
     val storageFormat = tableDesc.storage
     val properties = storageFormat.properties
     if (!properties.contains("carbonSchemaPartsNo")) {
-      val dbName: String = properties.getOrElse("dbName",
-        CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
-      val tableName: String = properties.getOrElse("tableName", "").toLowerCase
-      val model = createTableInfoFromParams(properties, tableDesc.schema, 
dbName, tableName)
-      val tableInfo: TableInfo = TableNewProcessor(model)
-      val tablePath = CarbonEnv.getInstance(sparkSession).storePath + "/" + 
dbName + "/" + tableName
-      val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
-      schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
-      tableInfo.getFactTable.getSchemaEvalution.
-        getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
-      val map = if (metaStore.isReadFromHiveMetaStore) {
-        val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
-        val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(tableIdentifier)
-        val schemaMetadataPath =
-          
CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
-        tableInfo.setMetaDataFilepath(schemaMetadataPath)
-        tableInfo.setStorePath(tableIdentifier.getStorePath)
-        CarbonUtil.convertToMultiStringMap(tableInfo)
-      } else {
-        metaStore.saveToDisk(tableInfo, tablePath)
-        new java.util.HashMap[String, String]()
-      }
-      properties.foreach(e => map.put(e._1, e._2))
-      map.put("tablePath", tablePath)
+      val map = updateAndCreateTable(tableDesc.schema, sparkSession, 
metaStore, properties)
       // updating params
-      val updatedFormat = storageFormat.copy(properties = map.asScala.toMap)
+      val updatedFormat = storageFormat.copy(properties = map)
       tableDesc.copy(storage = updatedFormat)
     } else {
       val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava)
@@ -280,4 +256,35 @@ object CarbonSource {
       }
     }
   }
+
+  def updateAndCreateTable(dataSchema: StructType,
+      sparkSession: SparkSession,
+      metaStore: CarbonMetaStore,
+      properties: Map[String, String]): Map[String, String] = {
+    val dbName: String = properties.getOrElse("dbName",
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
+    val tableName: String = properties.getOrElse("tableName", "").toLowerCase
+    val model = createTableInfoFromParams(properties, dataSchema, dbName, 
tableName)
+    val tableInfo: TableInfo = TableNewProcessor(model)
+    val tablePath = CarbonEnv.getInstance(sparkSession).storePath + "/" + 
dbName + "/" + tableName
+    val schemaEvolutionEntry = new SchemaEvolutionEntry
+    schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
+    tableInfo.getFactTable.getSchemaEvalution.
+      getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
+    val map = if (metaStore.isReadFromHiveMetaStore) {
+      val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+      val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier)
+      val schemaMetadataPath =
+        
CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
+      tableInfo.setMetaDataFilepath(schemaMetadataPath)
+      tableInfo.setStorePath(tableIdentifier.getStorePath)
+      CarbonUtil.convertToMultiStringMap(tableInfo)
+    } else {
+      metaStore.saveToDisk(tableInfo, tablePath)
+      new java.util.HashMap[String, String]()
+    }
+    properties.foreach(e => map.put(e._1, e._2))
+    map.put("tablePath", tablePath)
+    map.asScala.toMap
+  }
 }

Reply via email to