Repository: carbondata Updated Branches: refs/heads/master 54dcd8d5b -> a9cc43411
[CARBONDATA-2922] support long string columns with spark FileFormat and SDK with long_string_columns TableProperties problem: Exception if we try long string column for Spark file format Solution: For Varchar data type respective spark data type was selected, hence the exception. Spark don't have limit for string data type. so map varchar to string type. Tested with more than 32K length string and added a UT Also for SDK (required for AVRO) and spark file format, supported the longstring columns table property. This closes #2690 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a9cc4341 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a9cc4341 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a9cc4341 Branch: refs/heads/master Commit: a9cc434114805da56fe841a560dcf350e86cad52 Parents: 54dcd8d Author: ajantha-bhat <[email protected]> Authored: Fri Sep 7 17:45:30 2018 +0530 Committer: ravipesala <[email protected]> Committed: Mon Sep 10 17:23:23 2018 +0530 ---------------------------------------------------------------------- docs/sdk-guide.md | 7 +- .../TestNonTransactionalCarbonTable.scala | 2 +- .../datasources/CarbonSparkDataSourceUtil.scala | 6 ++ .../spark/sql/util/SparkTypeConverter.scala | 1 + ...tCreateTableUsingSparkCarbonFileFormat.scala | 67 +++++++++++++ .../sdk/file/CarbonWriterBuilder.java | 99 +++++++++++++++----- .../org/apache/carbondata/sdk/file/Field.java | 5 + 7 files changed, 161 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9cc4341/docs/sdk-guide.md ---------------------------------------------------------------------- diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md index dd28e51..c80cc75 100644 --- a/docs/sdk-guide.md +++ b/docs/sdk-guide.md @@ -339,8 +339,7 @@ public CarbonWriterBuilder taskNo(long taskNo); * g. complex_delimiter_level_2 -- value to Split the nested complexTypeData * h. quotechar * i. escapechar -* j. sort_scope -- "local_sort", "no_sort", "batch_sort" -* +* * Default values are as follows. * * a. bad_records_logger_enable -- "false" @@ -352,7 +351,6 @@ public CarbonWriterBuilder taskNo(long taskNo); * g. complex_delimiter_level_2 -- ":" * h. quotechar -- "\"" * i. escapechar -- "\\" -* j. sort_scope -- "local_sort" * * @return updated CarbonWriterBuilder */ @@ -370,6 +368,9 @@ public CarbonWriterBuilder withLoadOptions(Map<String, String> options); * c. local_dictionary_threshold -- positive value, default is 10000 * d. local_dictionary_enable -- true / false. Default is false * e. sort_columns -- comma separated column. "c1,c2". Default all dimensions are sorted. +* j. sort_scope -- "local_sort", "no_sort", "batch_sort". default value is "local_sort" +* k. long_string_columns -- comma separated string columns which are more than 32k length. +* default value is null. * * @return updated CarbonWriterBuilder */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9cc4341/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala index 29ea755..0b6813f 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala @@ -2365,7 +2365,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { fields(1) = new Field("intField", DataTypes.INT) val writer: CarbonWriter = CarbonWriter.builder .outputPath(writerPath) - .withLoadOptions(options) + .withTableProperties(options) .buildWriterForCSVInput(new Schema(fields)) writer.write(Array("carbon", "1")) writer.write(Array("hydrogen", "10")) http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9cc4341/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala index b097320..00a5139 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala +++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala @@ -228,6 +228,12 @@ object CarbonSparkDataSourceUtil { case _ => null } builder.sortBy(sortCols) + val longStringColumns: String = options + .getOrElse(CarbonCommonConstants.LONG_STRING_COLUMNS, null) + if (longStringColumns != null) { + val loadOptions = Map(CarbonCommonConstants.LONG_STRING_COLUMNS -> longStringColumns).asJava + builder.withTableProperties(loadOptions) + } builder.uniqueIdentifier(System.currentTimeMillis()) val model = builder.buildLoadModel(schema) val tableInfo = model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9cc4341/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala index 8145953..1138a29 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala +++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala @@ -83,6 +83,7 @@ private[spark] object SparkTypeConverter { case CarbonDataTypes.BOOLEAN => BooleanType case CarbonDataTypes.TIMESTAMP => TimestampType case CarbonDataTypes.DATE => DateType + case CarbonDataTypes.VARCHAR => StringType } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9cc4341/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala index a0c4a0b..26f67f8 100644 --- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala +++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.carbondata.datasource import java.io.File import org.apache.commons.io.FileUtils +import org.apache.commons.lang.RandomStringUtils import org.scalatest.{BeforeAndAfterAll, FunSuite} import org.apache.spark.sql.carbondata.datasource.TestUtil._ import org.apache.spark.util.SparkUtil @@ -319,4 +320,70 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA assert(new File(filePath).exists()) cleanTestData() } + + test("Test with long string columns") { + FileUtils.deleteDirectory(new File(writerPath)) + // here we specify the long string column as varchar + val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"address\":\"varchar\"},\n") + .append(" {\"age\":\"int\"}\n") + .append("]") + .toString() + val builder = CarbonWriter.builder() + val writer = builder.outputPath(writerPath) + .buildWriterForCSVInput(Schema.parseJson(schema)) + for (i <- 0 until 3) { + // write a varchar with 75,000 length + writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(75000), i.toString)) + } + writer.close() + + //--------------- data source external table with schema --------------------------- + spark.sql("DROP TABLE IF EXISTS sdkOutputTable") + if (spark.sparkContext.version.startsWith("2.1")) { + //data source file format + spark.sql( + s"""CREATE TABLE sdkOutputTable (name string, address string, age int) + |USING carbon OPTIONS (PATH '$writerPath', "long_String_columns" "address") """ + .stripMargin) + } else if (spark.sparkContext.version.startsWith("2.2")) { + //data source file format + spark.sql( + s"""CREATE TABLE sdkOutputTable (name string, address string, age int) USING carbon + |OPTIONS("long_String_columns"="address") LOCATION + |'$writerPath' """.stripMargin) + } else { + // TODO. spark2.3 ? + assert(false) + } + assert(spark.sql("select * from sdkOutputTable where age = 0").count() == 1) + val op = spark.sql("select address from sdkOutputTable limit 1").collectAsList() + assert(op.get(0).getString(0).length == 75000) + spark.sql("DROP TABLE sdkOutputTable") + + //--------------- data source external table without schema --------------------------- + spark.sql("DROP TABLE IF EXISTS sdkOutputTableWithoutSchema") + if (spark.sparkContext.version.startsWith("2.1")) { + //data source file format + spark + .sql( + s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS (PATH + |'$writerPath', "long_String_columns" "address") """.stripMargin) + } else if (spark.sparkContext.version.startsWith("2.2")) { + //data source file format + spark.sql( + s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS + |("long_String_columns"="address") LOCATION '$writerPath' """.stripMargin) + } else { + // TODO. spark2.3 ? + assert(false) + } + assert(spark.sql("select * from sdkOutputTableWithoutSchema where age = 0").count() == 1) + val op1 = spark.sql("select address from sdkOutputTableWithoutSchema limit 1").collectAsList() + assert(op1.get(0).getString(0).length == 75000) + spark.sql("DROP TABLE sdkOutputTableWithoutSchema") + cleanTestData() + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9cc4341/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index 56757e4..89e69fb 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -240,7 +240,6 @@ public class CarbonWriterBuilder { * g. complex_delimiter_level_2 -- value to Split the nested complexTypeData * h. quotechar * i. escapechar - * j. sort_scope -- "local_sort", "no_sort", "batch_sort" * * Default values are as follows. * @@ -253,7 +252,6 @@ public class CarbonWriterBuilder { * g. complex_delimiter_level_2 -- ":" * h. quotechar -- "\"" * i. escapechar -- "\\" - * j. sort_scope -- "local_sort" * * @return updated CarbonWriterBuilder */ @@ -269,26 +267,17 @@ public class CarbonWriterBuilder { !option.equalsIgnoreCase("complex_delimiter_level_1") && !option.equalsIgnoreCase("complex_delimiter_level_2") && !option.equalsIgnoreCase("quotechar") && - !option.equalsIgnoreCase("escapechar") && - !option.equalsIgnoreCase("sort_scope")) { + !option.equalsIgnoreCase("escapechar")) { throw new IllegalArgumentException("Unsupported option:" + option + ". Refer method header or documentation"); } } - // validate sort scope - String sortScope = options.get("sort_scope"); - if (sortScope != null) { - if ((!CarbonUtil.isValidSortOption(sortScope))) { - throw new IllegalArgumentException("Invalid Sort Scope Option: " + sortScope); - } else if (sortScope.equalsIgnoreCase("global_sort")) { - throw new IllegalArgumentException("global sort is not supported"); - } - } - // convert it to treeMap as keys need to be case insensitive - Map<String, String> optionsTreeMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); - optionsTreeMap.putAll(options); - this.options = optionsTreeMap; + if (this.options == null) { + // convert it to treeMap as keys need to be case insensitive + this.options = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); + } + this.options.putAll(options); return this; } @@ -302,6 +291,9 @@ public class CarbonWriterBuilder { * c. local_dictionary_threshold -- positive value, default is 10000 * d. local_dictionary_enable -- true / false. Default is false * e. sort_columns -- comma separated column. "c1,c2". Default all dimensions are sorted. + * j. sort_scope -- "local_sort", "no_sort", "batch_sort". default value is "local_sort" + * k. long_string_columns -- comma separated string columns which are more than 32k length. + * default value is null. * * @return updated CarbonWriterBuilder */ @@ -315,7 +307,7 @@ public class CarbonWriterBuilder { Set<String> supportedOptions = new HashSet<>(Arrays .asList("table_blocksize", "table_blocklet_size", "local_dictionary_threshold", - "local_dictionary_enable", "sort_columns")); + "local_dictionary_enable", "sort_columns", "sort_scope", "long_string_columns")); for (String key : options.keySet()) { if (!supportedOptions.contains(key.toLowerCase())) { @@ -337,6 +329,10 @@ public class CarbonWriterBuilder { //sort columns String[] sortColumns = entry.getValue().split(","); this.sortBy(sortColumns); + } else if (entry.getKey().equalsIgnoreCase("sort_scope")) { + this.withSortScope(entry); + } else if (entry.getKey().equalsIgnoreCase("long_string_columns")) { + updateToLoadOptions(entry); } } return this; @@ -548,7 +544,13 @@ public class CarbonWriterBuilder { public CarbonLoadModel buildLoadModel(Schema carbonSchema) throws IOException, InvalidLoadOptionException { - this.schema = schemaFieldNameToLowerCase(carbonSchema); + Set<String> longStringColumns = null; + if (options != null && options.get("long_string_columns") != null) { + longStringColumns = + new HashSet<>(Arrays.asList(options.get("long_string_columns").toLowerCase().split(","))); + validateLongStringColumns(carbonSchema, longStringColumns); + } + this.schema = updateSchemaFields(carbonSchema, longStringColumns); // build CarbonTable using schema CarbonTable table = buildCarbonTable(); if (persistSchemaFile) { @@ -559,6 +561,28 @@ public class CarbonWriterBuilder { return buildLoadModel(table, UUID, taskNo, options); } + private void validateLongStringColumns(Schema carbonSchema, Set<String> longStringColumns) { + // long string columns must be string or varchar type + for (Field field :carbonSchema.getFields()) { + if (longStringColumns.contains(field.getFieldName().toLowerCase()) && ( + (field.getDataType() != DataTypes.STRING) && field.getDataType() != DataTypes.VARCHAR)) { + throw new RuntimeException( + "long string column : " + field.getFieldName() + "is not supported for data type: " + + field.getDataType()); + } + } + // long string columns must not be present in sort columns + if (sortColumns != null) { + for (String col : sortColumns) { + // already will be in lower case + if (longStringColumns.contains(col)) { + throw new RuntimeException( + "long string column : " + col + "must not be present in sort columns"); + } + } + } + } + /** * Build a {@link CarbonTable} */ @@ -736,9 +760,11 @@ public class CarbonWriterBuilder { return build; } - /* loop through all the parent column and change fields name lower case. - * this is to match with sort column case */ - private Schema schemaFieldNameToLowerCase(Schema schema) { + /* loop through all the parent column and + a) change fields name to lower case. + this is to match with sort column case. + b) change string fields to varchar type */ + private Schema updateSchemaFields(Schema schema, Set<String> longStringColumns) { if (schema == null) { return null; } @@ -747,7 +773,36 @@ public class CarbonWriterBuilder { if (fields[i] != null) { fields[i].updateNameToLowerCase(); } + + if (longStringColumns != null) { + /* Also update the string type to varchar */ + if (longStringColumns.contains(fields[i].getFieldName())) { + fields[i].updateDataTypeToVarchar(); + } + } } return new Schema(fields); } + + private void updateToLoadOptions(Map.Entry<String, String> entry) { + if (this.options == null) { + // convert it to treeMap as keys need to be case insensitive + this.options = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); + } + // update it to load options + this.options.put(entry.getKey(), entry.getValue()); + } + + private void withSortScope(Map.Entry<String, String> entry) { + String sortScope = entry.getValue(); + if (sortScope != null) { + if ((!CarbonUtil.isValidSortOption(sortScope))) { + throw new IllegalArgumentException("Invalid Sort Scope Option: " + sortScope); + } else if (sortScope.equalsIgnoreCase("global_sort")) { + throw new IllegalArgumentException("global sort is not supported"); + } + } + // update it to load options + updateToLoadOptions(entry); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9cc4341/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java index 6903200..924835f 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java @@ -219,6 +219,11 @@ public class Field { this.columnComment = columnComment; } + /* for SDK, change string type to varchar by default for parent columns */ + public void updateDataTypeToVarchar() { + this.type = DataTypes.VARCHAR; + } + /*can use to change the case of the schema */ public void updateNameToLowerCase() { this.name = name.toLowerCase();
