Repository: carbondata Updated Branches: refs/heads/master 63afc00f6 -> 9c038543e
[CARBONDATA-1709][DataFrame] Support sort_columns option in dataframe writer This PR adds SORT_COLUMNS option support in dataframe writer This closes #1496 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9c038543 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9c038543 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9c038543 Branch: refs/heads/master Commit: 9c038543e2ce68a220ad9967acf59bd8f23b6ae0 Parents: 63afc00 Author: xuchuanyin <[email protected]> Authored: Tue Nov 14 20:23:35 2017 +0800 Committer: Jacky Li <[email protected]> Committed: Wed Dec 13 17:23:48 2017 +0800 ---------------------------------------------------------------------- .../testsuite/dataload/TestLoadDataFrame.scala | 63 +++++++++++++++++++- .../apache/carbondata/spark/CarbonOption.scala | 2 + .../spark/sql/CarbonDataFrameWriter.scala | 1 + 3 files changed, 63 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c038543/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala index 57c5204..574eb91 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala @@ -22,7 +22,7 @@ import java.math.BigDecimal import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.sql.types._ -import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode} +import org.apache.spark.sql.{AnalysisException, DataFrame, DataFrameWriter, Row, SaveMode} import org.scalatest.BeforeAndAfterAll class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll { @@ -73,7 +73,9 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE IF EXISTS carbon8") sql("DROP TABLE IF EXISTS carbon9") sql("DROP TABLE IF EXISTS carbon10") - + sql("DROP TABLE IF EXISTS df_write_sort_column_not_specified") + sql("DROP TABLE IF EXISTS df_write_specify_sort_column") + sql("DROP TABLE IF EXISTS df_write_empty_sort_column") } @@ -236,13 +238,68 @@ test("test the boolean data type"){ sql("select count(*) from carbon10 where c3 > 500"), Row(500) ) sql("drop table carbon10") - assert(! new File(path).exists()) + assert(!new File(path).exists()) assert(intercept[AnalysisException]( sql("select count(*) from carbon10 where c3 > 500")) .message .contains("not found")) } + private def getSortColumnValue(tableName: String): Array[String] = { + val desc = sql(s"desc formatted $tableName") + val sortColumnRow = desc.collect.find(r => + r(0).asInstanceOf[String].trim.equalsIgnoreCase("SORT_COLUMNS") + ) + assert(sortColumnRow.isDefined) + sortColumnRow.get.get(1).asInstanceOf[String].split(",") + .map(_.trim.toLowerCase).filter(_.length > 0) + } + + private def getDefaultWriter(tableName: String): DataFrameWriter[Row] = { + df2.write + .format("carbondata") + .option("tableName", tableName) + .option("tempCSV", "false") + .option("single_pass", "false") + .option("table_blocksize", "256") + .option("compress", "false") + .mode(SaveMode.Overwrite) + } + + test("test load dataframe with sort_columns not specified," + + " by default all string columns will be sort_columns") { + // all string column will be sort_columns by default + getDefaultWriter("df_write_sort_column_not_specified").save() + checkAnswer( + sql("select count(*) from df_write_sort_column_not_specified where c3 > 500"), Row(500) + ) + + val sortColumnValue = getSortColumnValue("df_write_sort_column_not_specified") + assert(sortColumnValue.sameElements(Array("c1", "c2"))) + } + + test("test load dataframe with sort_columns specified") { + // only specify c1 as sort_columns + getDefaultWriter("df_write_specify_sort_column").option("sort_columns", "c1").save() + checkAnswer( + sql("select count(*) from df_write_specify_sort_column where c3 > 500"), Row(500) + ) + + val sortColumnValue = getSortColumnValue("df_write_specify_sort_column") + assert(sortColumnValue.sameElements(Array("c1"))) + } + + test("test load dataframe with sort_columns specified empty") { + // specify empty sort_column + getDefaultWriter("df_write_empty_sort_column").option("sort_columns", "").save() + checkAnswer( + sql("select count(*) from df_write_empty_sort_column where c3 > 500"), Row(500) + ) + + val sortColumnValue = getSortColumnValue("df_write_empty_sort_column") + assert(sortColumnValue.isEmpty) + } + override def afterAll { dropTable } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c038543/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala index 594ea0e..bcdad26 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala @@ -42,6 +42,8 @@ class CarbonOption(options: Map[String, String]) { def singlePass: Boolean = options.getOrElse("single_pass", "false").toBoolean + def sortColumns: Option[String] = options.get("sort_columns") + def dictionaryInclude: Option[String] = options.get("dictionary_include") def dictionaryExclude: Option[String] = options.get("dictionary_exclude") http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c038543/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala index ca371e1..d50f0b8 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala @@ -168,6 +168,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) { s"${ field.name } ${ convertToCarbonType(field.dataType) }" } val property = Map( + "SORT_COLUMNS" -> options.sortColumns, "DICTIONARY_INCLUDE" -> options.dictionaryInclude, "DICTIONARY_EXCLUDE" -> options.dictionaryExclude, "TABLE_BLOCKSIZE" -> options.tableBlockSize
