Repository: carbondata
Updated Branches:
  refs/heads/master 6c097cbf3 -> da129d527


[CARBONDATA-2110]deprecate 'tempCSV' option of dataframe load

deprecate 'tempCSV' option of dataframe load, it won't generate temp file on 
hdfs, no matter the value of tempCSV

This closes #1916


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/da129d52
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/da129d52
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/da129d52

Branch: refs/heads/master
Commit: da129d5277babe498fa5686fe53d01433d112bab
Parents: 6c097cb
Author: qiuchenjian <807169...@qq.com>
Authored: Sat Feb 3 00:14:07 2018 +0800
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Sat Feb 3 15:29:08 2018 +0800

----------------------------------------------------------------------
 .../testsuite/dataload/TestLoadDataFrame.scala  | 19 ++++
 .../spark/sql/CarbonDataFrameWriter.scala       | 98 +-------------------
 2 files changed, 20 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/da129d52/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
index 6f03493..693c145 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
@@ -29,6 +29,7 @@ class TestLoadDataFrame extends QueryTest with 
BeforeAndAfterAll {
   var df: DataFrame = _
   var dataFrame: DataFrame = _
   var df2: DataFrame = _
+  var df3: DataFrame = _
   var booldf:DataFrame = _
 
 
@@ -52,6 +53,10 @@ class TestLoadDataFrame extends QueryTest with 
BeforeAndAfterAll {
       .map(x => ("key_" + x, "str_" + x, x, x * 2, x * 3))
       .toDF("c1", "c2", "c3", "c4", "c5")
 
+    df3 = sqlContext.sparkContext.parallelize(1 to 3)
+      .map(x => (x.toString + "te,s\nt", x))
+      .toDF("c1", "c2")
+
     val boolrdd = sqlContext.sparkContext.parallelize(
       Row("anubhav",true) ::
         Row("prince",false) :: Nil)
@@ -74,6 +79,7 @@ class TestLoadDataFrame extends QueryTest with 
BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS carbon9")
     sql("DROP TABLE IF EXISTS carbon10")
     sql("DROP TABLE IF EXISTS carbon11")
+    sql("DROP TABLE IF EXISTS carbon12")
     sql("DROP TABLE IF EXISTS df_write_sort_column_not_specified")
     sql("DROP TABLE IF EXISTS df_write_specify_sort_column")
     sql("DROP TABLE IF EXISTS df_write_empty_sort_column")
@@ -261,6 +267,19 @@ test("test the boolean data type"){
     val isStreaming: String = 
descResult.collect().find(row=>row(0).asInstanceOf[String].trim.equalsIgnoreCase("streaming")).get.get(1).asInstanceOf[String]
     assert(isStreaming.contains("true"))
   }
+
+  test("test datasource table with specified char") {
+
+    df3.write
+      .format("carbondata")
+      .option("tableName", "carbon12")
+      .option("tempCSV", "true")
+      .mode(SaveMode.Overwrite)
+      .save()
+    checkAnswer(
+      sql("select count(*) from carbon12"), Row(3)
+    )
+  }
   private def getSortColumnValue(tableName: String): Array[String] = {
     val desc = sql(s"desc formatted $tableName")
     val sortColumnRow = desc.collect.find(r =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da129d52/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 2b06375..2be89b1 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -17,16 +17,12 @@
 
 package org.apache.spark.sql
 
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.compress.GzipCodec
 import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonType}
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.CarbonOption
 
 class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
@@ -46,90 +42,8 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val 
dataFrame: DataFrame) {
 
   private def writeToCarbonFile(parameters: Map[String, String] = Map()): Unit 
= {
     val options = new CarbonOption(parameters)
-    if (options.tempCSV) {
-      loadTempCSV(options)
-    } else {
-      loadDataFrame(options)
-    }
+    loadDataFrame(options)
   }
-
-  /**
-   * Firstly, saving DataFrame to CSV files
-   * Secondly, load CSV files
-   * @param options
-   */
-  private def loadTempCSV(options: CarbonOption): Unit = {
-    // temporary solution: write to csv file, then load the csv into carbon
-    val storePath = CarbonProperties.getStorePath
-    val tempCSVFolder = new 
StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR)
-      .append("tempCSV")
-      .append(CarbonCommonConstants.UNDERSCORE)
-      
.append(CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession))
-      .append(CarbonCommonConstants.UNDERSCORE)
-      .append(options.tableName)
-      .append(CarbonCommonConstants.UNDERSCORE)
-      .append(System.nanoTime())
-      .toString
-    writeToTempCSVFile(tempCSVFolder, options)
-
-    val tempCSVPath = new Path(tempCSVFolder)
-    val fs = 
tempCSVPath.getFileSystem(dataFrame.sqlContext.sparkContext.hadoopConfiguration)
-
-    def countSize(): Double = {
-      var size: Double = 0
-      val itor = fs.listFiles(tempCSVPath, true)
-      while (itor.hasNext) {
-        val f = itor.next()
-        if (f.getPath.getName.startsWith("part-")) {
-          size += f.getLen
-        }
-      }
-      size
-    }
-
-    LOGGER.info(s"temporary CSV file size: ${countSize / 1024 / 1024} MB")
-
-    try {
-      sqlContext.sql(makeLoadString(tempCSVFolder, options))
-    } finally {
-      fs.delete(tempCSVPath, true)
-    }
-  }
-
-  private def writeToTempCSVFile(tempCSVFolder: String, options: 
CarbonOption): Unit = {
-    val strRDD = dataFrame.rdd.mapPartitions { case iter =>
-      new Iterator[String] {
-        override def hasNext = iter.hasNext
-
-        def convertToCSVString(seq: Seq[Any]): String = {
-          val build = new java.lang.StringBuilder()
-          if (seq.head != null) {
-            build.append(seq.head.toString)
-          }
-          val itemIter = seq.tail.iterator
-          while (itemIter.hasNext) {
-            build.append(CarbonCommonConstants.COMMA)
-            val value = itemIter.next()
-            if (value != null) {
-              build.append(value.toString)
-            }
-          }
-          build.toString
-        }
-
-        override def next: String = {
-          convertToCSVString(iter.next.toSeq)
-        }
-      }
-    }
-
-    if (options.compress) {
-      strRDD.saveAsTextFile(tempCSVFolder, classOf[GzipCodec])
-    } else {
-      strRDD.saveAsTextFile(tempCSVFolder)
-    }
-  }
-
   /**
    * Loading DataFrame directly without saving DataFrame to CSV files.
    * @param options
@@ -189,14 +103,4 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val 
dataFrame: DataFrame) {
      """.stripMargin
   }
 
-  private def makeLoadString(csvFolder: String, options: CarbonOption): String 
= {
-    val dbName = 
CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)
-    s"""
-       | LOAD DATA INPATH '$csvFolder'
-       | INTO TABLE $dbName.${options.tableName}
-       | OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}',
-       | 'SINGLE_PASS' = '${options.singlePass}')
-     """.stripMargin
-  }
-
 }

Reply via email to