Repository: incubator-carbondata Updated Branches: refs/heads/12-dev 222a80762 -> 974cb1e58
Added options to include and exclude dictionary columns in dataframe Fixed a bug in spark2 dataframe writer that cause an error when only dictionary_include is specified Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/84ed893d Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/84ed893d Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/84ed893d Branch: refs/heads/12-dev Commit: 84ed893d716ae09d7543bea99e3464ff68f3420b Parents: 222a807 Author: Sanoj MG <sanoj.george....@gmail.com> Authored: Wed Apr 12 11:21:15 2017 +0400 Committer: Sanoj MG <sanoj.george....@gmail.com> Committed: Wed Apr 12 11:21:15 2017 +0400 ---------------------------------------------------------------------- .../testsuite/dataload/TestLoadDataFrame.scala | 48 +++++++++++++++++++- .../spark/CarbonDataFrameWriter.scala | 5 ++ .../spark/sql/CarbonDataFrameWriter.scala | 19 ++------ 3 files changed, 56 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/84ed893d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala index 3b0fd4a..6e79a10 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala @@ -27,6 +27,7 @@ import org.scalatest.BeforeAndAfterAll class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll { var df: DataFrame = _ var dataFrame: DataFrame = _ + var df2: DataFrame = _ def buildTestData() = { @@ -45,6 +46,9 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll { StructField("string", StringType, nullable = false) :: Nil) dataFrame = sqlContext.createDataFrame(rdd, schema) + df2 = sqlContext.sparkContext.parallelize(1 to 1000) + .map(x => ("key_" + x, "str_" + x, x, x * 2, x * 3)) + .toDF("c1", "c2", "c3", "c4", "c5") } def dropTable() = { @@ -52,7 +56,9 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE IF EXISTS carbon2") sql("DROP TABLE IF EXISTS carbon3") sql("DROP TABLE IF EXISTS carbon4") - + sql("DROP TABLE IF EXISTS carbon5") + sql("DROP TABLE IF EXISTS carbon6") + sql("DROP TABLE IF EXISTS carbon7") } @@ -114,6 +120,46 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll { sql("SELECT decimal FROM carbon4"),Seq(Row(BigDecimal.valueOf(10000.00)),Row(BigDecimal.valueOf(1234.44)))) } + test("test load dataframe with integer columns included in the dictionary"){ + df2.write + .format("carbondata") + .option("tableName", "carbon5") + .option("compress", "true") + .option("dictionary_include","c3,c4") + .mode(SaveMode.Overwrite) + .save() + checkAnswer( + sql("select count(*) from carbon5 where c3 > 300"), Row(700) + ) + } + + test("test load dataframe with string column excluded from the dictionary"){ + df2.write + .format("carbondata") + .option("tableName", "carbon6") + .option("compress", "true") + .option("dictionary_exclude","c2") + .mode(SaveMode.Overwrite) + .save() + checkAnswer( + sql("select count(*) from carbon6 where c3 > 300"), Row(700) + ) + } + + test("test load dataframe with both dictionary include and exclude specified"){ + df2.write + .format("carbondata") + .option("tableName", "carbon7") + .option("compress", "true") + .option("dictionary_include","c3,c4") + .option("dictionary_exclude","c2") + .mode(SaveMode.Overwrite) + .save() + checkAnswer( + sql("select count(*) from carbon7 where c3 > 300"), Row(700) + ) + } + override def afterAll { dropTable } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/84ed893d/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala index 0e2e4dd..9f813a8 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala @@ -175,6 +175,10 @@ class CarbonDataFrameWriter(val dataFrame: DataFrame) { } private def makeCreateTableString(schema: StructType, options: CarbonOption): String = { + val properties = Map( + "DICTIONARY_INCLUDE" -> options.dictionaryInclude, + "DICTIONARY_EXCLUDE" -> options.dictionaryExclude + ).filter(_._2.isDefined).map(p => s"'${p._1}' = '${p._2.get}'").mkString(",") val carbonSchema = schema.map { field => s"${ field.name } ${ convertToCarbonType(field.dataType) }" } @@ -182,6 +186,7 @@ class CarbonDataFrameWriter(val dataFrame: DataFrame) { CREATE TABLE IF NOT EXISTS ${options.dbName}.${options.tableName} (${ carbonSchema.mkString(", ") }) STORED BY '${ CarbonContext.datasourceName }' + ${ if (properties.nonEmpty) " TBLPROPERTIES (" + properties + ")" else ""} """ } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/84ed893d/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala index e6efeaa..576da58 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala @@ -162,21 +162,10 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) { val carbonSchema = schema.map { field => s"${ field.name } ${ convertToCarbonType(field.dataType) }" } - val property = new StringBuilder - property.append( - if (options.dictionaryInclude.isDefined) { - s"'DICTIONARY_INCLUDE' = '${options.dictionaryInclude.get}' ," - } else { - "" - } - ).append( - if (options.dictionaryExclude.isDefined) { - s"'DICTIONARY_EXCLUDE' = '${options.dictionaryExclude.get}'" - } else { - "" - } - ) - + val property = Map( + "DICTIONARY_INCLUDE" -> options.dictionaryInclude, + "DICTIONARY_EXCLUDE" -> options.dictionaryExclude + ).filter(_._2.isDefined).map(p => s"'${p._1}' = '${p._2.get}'").mkString(",") s""" | CREATE TABLE IF NOT EXISTS ${options.dbName}.${options.tableName} | (${ carbonSchema.mkString(", ") })