Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 0be0f3083 -> 699d0a050


delete success file


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/a03b4600
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/a03b4600
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/a03b4600

Branch: refs/heads/master
Commit: a03b460094ada6c9547542e6bfce51bd3e33ec8b
Parents: 0be0f30
Author: jackylk <jacky.li...@huawei.com>
Authored: Wed Sep 21 11:37:02 2016 -0700
Committer: chenliang613 <chenliang...@apache.org>
Committed: Thu Sep 22 00:18:45 2016 -0700

----------------------------------------------------------------------
 .../examples/DataFrameAPIExample.scala          |  1 +
 .../spark/implicit/DataFrameFuncs.scala         | 59 +++++++++++---------
 2 files changed, 33 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a03b4600/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
 
b/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
index d2ee959..31750ac 100644
--- 
a/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
+++ 
b/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
@@ -39,6 +39,7 @@ object DataFrameAPIExample {
     df.write
       .format("carbondata")
       .option("tableName", "carbon1")
+      .option("compress", "true")
       .mode(SaveMode.Overwrite)
       .save()
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a03b4600/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala
index d5185cc..44594eb 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala
@@ -18,16 +18,17 @@
 package org.apache.carbondata.spark
 
 import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.{CarbonContext, DataFrame, SaveMode}
+import org.apache.spark.Logging
+import org.apache.spark.sql.{CarbonContext, DataFrame, DataFrameWriter, 
SaveMode}
 import org.apache.spark.sql.types._
 
 import org.apache.carbondata.core.carbon.metadata.datatype.{DataType => 
CarbonType}
 
-class DataFrameFuncs(dataFrame: DataFrame) {
+class DataFrameFuncs(dataFrame: DataFrame) extends Logging {
 
   /**
-   * Saves DataFrame as CarbonData files.
-   */
+    * Saves DataFrame as CarbonData files.
+    */
   def saveAsCarbonFile(parameters: Map[String, String] = Map()): Unit = {
     // To avoid derby problem, dataframe need to be writen and read using 
CarbonContext
     require(dataFrame.sqlContext.isInstanceOf[CarbonContext],
@@ -36,42 +37,47 @@ class DataFrameFuncs(dataFrame: DataFrame) {
 
     val options = new CarbonOption(parameters)
     val tableName = options.tableName
-    val dbName = options.dbName
 
     // temporary solution: write to csv file, then load the csv into carbon
     val tempCSVFolder = s"./tempCSV"
-    dataFrame.write
-        .format(csvPackage)
-        .option("header", "true")
-        .mode(SaveMode.Overwrite)
-        .save(tempCSVFolder)
+    var writer: DataFrameWriter =
+      dataFrame.write
+          .format(csvPackage)
+          .option("header", "false")
+          .mode(SaveMode.Overwrite)
+
+    if (options.compress.equals("true")) {
+      writer = writer.option("codec", "gzip")
+    }
 
-    val sqlContext = dataFrame.sqlContext
-    val tempCSVPath = new Path(tempCSVFolder)
-    val fs = 
tempCSVPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+    writer.save(tempCSVFolder)
 
-    try {
-      sqlContext.sql(makeCreateTableString(dataFrame.schema, options))
+    val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext)
+    val tempCSVPath = new Path(tempCSVFolder)
+    val fs = 
tempCSVPath.getFileSystem(dataFrame.sqlContext.sparkContext.hadoopConfiguration)
 
-      // add 'csv' as file extension to all generated part file
+    def countSize(): Double = {
+      var size: Double = 0
       val itor = fs.listFiles(tempCSVPath, true)
       while (itor.hasNext) {
         val f = itor.next()
         if (f.getPath.getName.startsWith("part-")) {
-          val newPath = s"${ f.getPath.getParent }/${ f.getPath.getName }.csv"
-          if (!fs.rename(f.getPath, new Path(newPath))) {
-            sqlContext.sql(s"DROP TABLE IF EXISTS $dbName.$tableName")
-            throw new RuntimeException("File system rename failed when loading 
data into carbon")
-          }
+          size += f.getLen
         }
       }
-      sqlContext.sql(makeLoadString(tempCSVFolder, options))
+      size
+    }
+
+    try {
+      cc.sql(makeCreateTableString(dataFrame.schema, options))
+      logInfo(s"temporary CSV file size: ${countSize() / 1024 / 1024} MB")
+      cc.sql(makeLoadString(tableName, tempCSVFolder))
     } finally {
       fs.delete(tempCSVPath, true)
     }
   }
 
-  private def csvPackage: String = "com.databricks.spark.csv"
+  private def csvPackage: String = "com.databricks.spark.csv.newapi"
 
   private def convertToCarbonType(sparkType: DataType): String = {
     sparkType match {
@@ -101,12 +107,11 @@ class DataFrameFuncs(dataFrame: DataFrame) {
       """
   }
 
-  private def makeLoadString(csvFolder: String, option: CarbonOption): String 
= {
-    val tableName = option.tableName
-    val dbName = option.dbName
+  private def makeLoadString(tableName: String, csvFolder: String): String = {
     s"""
           LOAD DATA INPATH '$csvFolder'
-          INTO TABLE $dbName.$tableName
+          INTO TABLE $tableName
+          OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}')
       """
   }
 

Reply via email to