[CARBONDATA-2001] Unable to Save DataFrame As Carbondata stream table 1.added table property for streaming in carbondataframewriter 2.added test case for same
This closes #1774 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/bef6af30 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/bef6af30 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/bef6af30 Branch: refs/heads/carbonstore Commit: bef6af30e5959a6edbaf95cd004da90ad1e6d646 Parents: bcc9cf0 Author: anubhav100 <[email protected]> Authored: Mon Jan 8 13:17:36 2018 +0530 Committer: Jacky Li <[email protected]> Committed: Fri Jan 19 21:27:58 2018 +0800 ---------------------------------------------------------------------- .../testsuite/dataload/TestLoadDataFrame.scala | 18 +++++++++++++++++- .../apache/spark/sql/CarbonDataFrameWriter.scala | 12 +++++++++--- 2 files changed, 26 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/bef6af30/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala index 574eb91..6f03493 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala @@ -73,6 +73,7 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE IF EXISTS carbon8") sql("DROP TABLE IF EXISTS carbon9") sql("DROP TABLE IF EXISTS carbon10") + sql("DROP TABLE IF EXISTS carbon11") sql("DROP TABLE IF EXISTS df_write_sort_column_not_specified") sql("DROP TABLE IF EXISTS df_write_specify_sort_column") sql("DROP TABLE IF EXISTS df_write_empty_sort_column") @@ -244,7 +245,22 @@ test("test the boolean data type"){ .message .contains("not found")) } - + test("test streaming Table") { + dataFrame.write + .format("carbondata") + .option("tableName", "carbon11") + .option("tempCSV", "true") + .option("single_pass", "false") + .option("compress", "false") + .option("streaming", "true") + .mode(SaveMode.Overwrite) + .save() + checkAnswer( + sql("SELECT decimal FROM carbon11"),Seq(Row(BigDecimal.valueOf(10000.00)),Row(BigDecimal.valueOf(1234.44)))) + val descResult =sql("desc formatted carbon11") + val isStreaming: String = descResult.collect().find(row=>row(0).asInstanceOf[String].trim.equalsIgnoreCase("streaming")).get.get(1).asInstanceOf[String] + assert(isStreaming.contains("true")) + } private def getSortColumnValue(tableName: String): Array[String] = { val desc = sql(s"desc formatted $tableName") val sortColumnRow = desc.collect.find(r => http://git-wip-us.apache.org/repos/asf/carbondata/blob/bef6af30/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala index d50f0b8..2b06375 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala @@ -167,19 +167,25 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) { val carbonSchema = schema.map { field => s"${ field.name } ${ convertToCarbonType(field.dataType) }" } + val property = Map( "SORT_COLUMNS" -> options.sortColumns, "DICTIONARY_INCLUDE" -> options.dictionaryInclude, "DICTIONARY_EXCLUDE" -> options.dictionaryExclude, - "TABLE_BLOCKSIZE" -> options.tableBlockSize - ).filter(_._2.isDefined).map(p => s"'${p._1}' = '${p._2.get}'").mkString(",") + "TABLE_BLOCKSIZE" -> options.tableBlockSize, + "STREAMING" -> Option(options.isStreaming.toString) + ).filter(_._2.isDefined) + .map(property => s"'${property._1}' = '${property._2.get}'").mkString(",") + val dbName = CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession) + s""" | CREATE TABLE IF NOT EXISTS $dbName.${options.tableName} | (${ carbonSchema.mkString(", ") }) | STORED BY 'carbondata' - | ${ if (property.nonEmpty) "TBLPROPERTIES (" + property + ")" else "" } | ${ if (options.tablePath.nonEmpty) s"LOCATION '${options.tablePath.get}'" else ""} + | ${ if (property.nonEmpty) "TBLPROPERTIES (" + property + ")" else "" } + | """.stripMargin }
