Repository: carbondata
Updated Branches:
  refs/heads/master 4a47630d3 -> 5f3264799


[CARBONDATA-2359][SDK] Support applicable load options and table properties for 
Non-Transactional table

Support read multiple sdk writer placed at same path

This closes #2190


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5f326479
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5f326479
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5f326479

Branch: refs/heads/master
Commit: 5f32647997209fd291d272e143d3105746af7cee
Parents: 4a47630
Author: ajantha-bhat <[email protected]>
Authored: Thu Apr 19 18:41:20 2018 +0530
Committer: Venkata Ramana G <[email protected]>
Committed: Mon Apr 23 10:27:53 2018 +0530

----------------------------------------------------------------------
 .../schema/table/TableSchemaBuilder.java        |  10 +-
 .../apache/carbondata/core/util/CarbonUtil.java |   1 +
 .../scan/executor/util/RestructureUtilTest.java |   4 +-
 .../TestNonTransactionalCarbonTable.scala       | 163 ++++++++++++++++---
 .../management/CarbonCleanFilesCommand.scala    |   4 +
 .../sdk/file/CarbonWriterBuilder.java           |  86 ++++++++--
 .../carbondata/sdk/file/CarbonReaderTest.java   |  16 +-
 7 files changed, 244 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f326479/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
index 7c2e54d..8187145 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
@@ -95,7 +95,6 @@ public class TableSchemaBuilder {
     if (blockletSize > 0) {
       property.put(CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB, 
String.valueOf(blockletSize));
     }
-    // TODO: check other table properties
     if (property.size() != 0) {
       schema.setTableProperties(property);
     }
@@ -119,7 +118,14 @@ public class TableSchemaBuilder {
     }
     newColumn.setSchemaOrdinal(ordinal++);
     newColumn.setColumnar(true);
-    newColumn.setColumnUniqueId(UUID.randomUUID().toString());
+
+    // For NonTransactionalTable, multiple sdk writer output with same column 
name can be placed in
+    // single folder for query.
+    // That time many places in code, columnId check will fail. To avoid that
+    // keep column ID as same as column name.
+    // Anyhow Alter table is not supported for NonTransactionalTable.
+    // SO, this will not have any impact.
+    newColumn.setColumnUniqueId(field.getFieldName());
     newColumn.setColumnReferenceId(newColumn.getColumnUniqueId());
     newColumn.setEncodingList(createEncoding(field.getDataType(), 
isSortColumn));
     if (DataTypes.isDecimal(field.getDataType())) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f326479/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 27ec202..6b4a94a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2349,6 +2349,7 @@ public final class CarbonUtil {
       fistFilePath = filePaths.get(0);
     } catch (Exception e) {
       LOGGER.error("CarbonData file is not present in the table location");
+      throw new IOException("CarbonData file is not present in the table 
location");
     }
     CarbonHeaderReader carbonHeaderReader = new 
CarbonHeaderReader(fistFilePath);
     List<ColumnSchema> columnSchemaList = carbonHeaderReader.readSchema();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f326479/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java
 
b/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java
index cb80cd3..2768e93 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java
@@ -92,7 +92,7 @@ public class RestructureUtilTest {
     List<ProjectionDimension> result = null;
     result = RestructureUtil
         
.createDimensionInfoAndGetCurrentBlockQueryDimension(blockExecutionInfo, 
queryDimensions,
-            tableBlockDimensions, tableComplexDimensions, 
queryMeasures.size(), false);
+            tableBlockDimensions, tableComplexDimensions, 
queryMeasures.size(), true);
     List<CarbonDimension> resultDimension = new ArrayList<>(result.size());
     for (ProjectionDimension queryDimension : result) {
       resultDimension.add(queryDimension.getDimension());
@@ -127,7 +127,7 @@ public class RestructureUtilTest {
     List<ProjectionMeasure> queryMeasures = Arrays.asList(queryMeasure1, 
queryMeasure2, queryMeasure3);
     BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo();
     
RestructureUtil.createMeasureInfoAndGetCurrentBlockQueryMeasures(blockExecutionInfo,
 queryMeasures,
-        currentBlockMeasures, false);
+        currentBlockMeasures, true);
     MeasureInfo measureInfo = blockExecutionInfo.getMeasureInfo();
     boolean[] measuresExist = { true, true, false };
     assertThat(measureInfo.getMeasureExists(), is(equalTo(measuresExist)));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f326479/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 7798403..3adcec8 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -18,6 +18,7 @@
 package org.apache.carbondata.spark.testsuite.createTable
 
 import java.io.{File, FileFilter}
+import java.util
 
 import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.Row
@@ -32,6 +33,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
 
+import scala.collection.JavaConverters._
 
 class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll 
{
 
@@ -45,22 +47,51 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
 
   def buildTestDataSingleFile(): Any = {
     FileUtils.deleteDirectory(new File(writerPath))
-    buildTestData(3,false)
+    buildTestData(3, false, null)
   }
 
   def buildTestDataMultipleFiles(): Any = {
     FileUtils.deleteDirectory(new File(writerPath))
-    buildTestData(1000000,false)
+    buildTestData(1000000, false, null)
   }
 
   def buildTestDataTwice(): Any = {
     FileUtils.deleteDirectory(new File(writerPath))
-    buildTestData(3,false)
-    buildTestData(3,false)
+    buildTestData(3, false, null)
+    buildTestData(3, false, null)
   }
 
+  def buildTestDataSameDirectory(): Any = {
+    buildTestData(3, false, null)
+  }
+
+  def buildTestDataWithBadRecordForce(): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+    var options = Map("bAd_RECords_action" -> "FORCE").asJava
+    buildTestData(3, false, options)
+  }
+
+  def buildTestDataWithBadRecordFail(): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+    var options = Map("bAd_RECords_action" -> "FAIL").asJava
+    buildTestData(3, false, options)
+  }
+
+  def buildTestDataWithBadRecordIgnore(): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+    var options = Map("bAd_RECords_action" -> "IGNORE").asJava
+    buildTestData(3, false, options)
+  }
+
+  def buildTestDataWithBadRecordRedirect(): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+    var options = Map("bAd_RECords_action" -> "REDIRECT").asJava
+    buildTestData(3, false, options)
+  }
+
+
   // prepare sdk writer output
-  def buildTestData(rows:Int, persistSchema:Boolean): Any = {
+  def buildTestData(rows: Int, persistSchema: Boolean, options: 
util.Map[String, String]): Any = {
     val schema = new StringBuilder()
       .append("[ \n")
       .append("   {\"name\":\"string\"},\n")
@@ -80,17 +111,34 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
             .uniqueIdentifier(System.currentTimeMillis)
             .buildWriterForCSVInput()
         } else {
-          builder.withSchema(Schema.parseJson(schema))
-            .outputPath(writerPath)
-            .isTransactionalTable(false)
-            .uniqueIdentifier(System.currentTimeMillis).withBlockSize(2)
-            .buildWriterForCSVInput()
+          if (options != null) {
+            builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath)
+              .isTransactionalTable(false)
+              .uniqueIdentifier(
+                
System.currentTimeMillis).withBlockSize(2).withLoadOptions(options)
+              .buildWriterForCSVInput()
+          } else {
+            builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath)
+              .isTransactionalTable(false)
+              .uniqueIdentifier(
+                System.currentTimeMillis).withBlockSize(2)
+              .buildWriterForCSVInput()
+          }
         }
       var i = 0
       while (i < rows) {
-        writer.write(Array[String]("robot" + i, String.valueOf(i), 
String.valueOf(i.toDouble / 2)))
+        if (options != null){
+          // writing a bad record
+          writer.write(Array[String]( "robot" + i, String.valueOf(i.toDouble / 
2), "robot"))
+        } else {
+          writer.write(Array[String]("robot" + i, String.valueOf(i), 
String.valueOf(i.toDouble / 2)))
+        }
         i += 1
       }
+      if (options != null) {
+        //Keep one valid record. else carbon data file will not generate
+        writer.write(Array[String]("robot" + i, String.valueOf(i), 
String.valueOf(i.toDouble / 2)))
+      }
       writer.close()
     } catch {
       case ex: Exception => None
@@ -150,7 +198,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
 
   test("test create External Table with insert into feature")
   {
-    buildTestData(3, false)
+    buildTestDataSingleFile()
     assert(new File(writerPath).exists())
     sql("DROP TABLE IF EXISTS sdkOutputTable")
     sql("DROP TABLE IF EXISTS t1")
@@ -183,7 +231,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
 
   test("test create External Table with insert overwrite")
   {
-    buildTestData(3, false)
+    buildTestDataSingleFile()
     assert(new File(writerPath).exists())
     sql("DROP TABLE IF EXISTS sdkOutputTable")
     sql("DROP TABLE IF EXISTS t1")
@@ -222,7 +270,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
 
   test("test create External Table with Load")
   {
-    buildTestData(3, false)
+    buildTestDataSingleFile()
     assert(new File(writerPath).exists())
     sql("DROP TABLE IF EXISTS sdkOutputTable")
     sql("DROP TABLE IF EXISTS t1")
@@ -391,6 +439,14 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     assert(exception.getMessage()
       .contains("Unsupported operation on non transactional table"))
 
+    //14. Block clean files
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("clean files for table sdkOutputTable")
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on non transactional table"))
+
+
     sql("DROP TABLE sdkOutputTable")
     //drop table should not delete the files
     assert(new File(writerPath).exists())
@@ -433,8 +489,6 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     assert(exception.getMessage()
       .contains("Operation not allowed: Invalid table path provided:"))
 
-    // drop table should not delete the files
-    assert(new File(writerPath).exists())
     cleanTestData()
   }
 
@@ -457,8 +511,6 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     assert(exception.getMessage()
       .contains("Operation not allowed: Invalid table path provided:"))
 
-    // drop table should not delete the files
-    assert(new File(writerPath).exists())
     cleanTestData()
   }
 
@@ -484,13 +536,15 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
 
     checkAnswer(sql("select count(*) from sdkOutputTable"), Seq(Row(1000000)))
 
+    sql("DROP TABLE sdkOutputTable")
     // drop table should not delete the files
     assert(new File(writerPath).exists())
     cleanTestData()
   }
 
   test("Read two sdk writer outputs with same column name placed in same 
folder") {
-    buildTestDataTwice()
+    buildTestDataSingleFile()
+
     assert(new File(writerPath).exists())
 
     sql("DROP TABLE IF EXISTS sdkOutputTable")
@@ -500,17 +554,86 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
          |'$writerPath' """.stripMargin)
 
 
-    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(
+      Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0)))
+
+    buildTestDataSameDirectory()
+
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(
+      Row("robot0", 0, 0.0),
       Row("robot1", 1, 0.5),
       Row("robot2", 2, 1.0),
       Row("robot0", 0, 0.0),
       Row("robot1", 1, 0.5),
       Row("robot2", 2, 1.0)))
 
+    //test filter query
+    checkAnswer(sql("select * from sdkOutputTable where age = 1"), Seq(
+      Row("robot1", 1, 0.5),
+      Row("robot1", 1, 0.5)))
+
+    // test the default sort column behavior in Nontransactional table
+    checkExistence(sql("describe formatted sdkOutputTable"), true,
+      "SORT_COLUMNS                        name")
+
+    sql("DROP TABLE sdkOutputTable")
     // drop table should not delete the files
     assert(new File(writerPath).exists())
     cleanTestData()
   }
 
+  test("test bad records form sdk writer") {
+
+    //1. Action = FORCE
+    buildTestDataWithBadRecordForce()
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(
+      Row("robot0", null, null),
+      Row("robot1", null, null),
+      Row("robot2", null, null),
+      Row("robot3", 3, 1.5)))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+
+
+    //2. Action = REDIRECT
+    buildTestDataWithBadRecordRedirect()
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(
+      Row("robot3", 3, 1.5)))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+
+    //3. Action = IGNORE
+    buildTestDataWithBadRecordIgnore()
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(
+      Row("robot3", 3, 1.5)))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+
+    cleanTestData()
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f326479/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index a2f3727..1c4d068 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -26,6 +26,7 @@ import 
org.apache.spark.sql.execution.command.{AtomicRunnableCommand, Checker, D
 import org.apache.spark.sql.optimizer.CarbonFilters
 
 import org.apache.carbondata.api.CarbonStore
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.exception.ConcurrentOperationException
@@ -117,6 +118,9 @@ case class CarbonCleanFilesCommand(
 
   private def cleanGarbageData(sparkSession: SparkSession,
       databaseNameOp: Option[String], tableName: String): Unit = {
+    if (!carbonTable.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non 
transactional table")
+    }
     val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
       Seq.empty[Expression],
       sparkSession,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f326479/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index de1e5be..3e5f814 100644
--- 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -18,14 +18,14 @@
 package org.apache.carbondata.sdk.file;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.TreeMap;
 
-import org.apache.carbondata.common.Strings;
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
 import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.StructField;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
@@ -57,6 +58,7 @@ public class CarbonWriterBuilder {
   private int blockSize;
   private boolean isTransactionalTable;
   private long UUID;
+  private Map<String, String> options;
   private String taskNo;
 
   /**
@@ -83,7 +85,9 @@ public class CarbonWriterBuilder {
 
   /**
    * sets the list of columns that needs to be in sorted order
-   * @param sortColumns is a string array of columns that needs to be sorted
+   * @param sortColumns is a string array of columns that needs to be sorted.
+   *                    If it is null, all dimensions are selected for sorting
+   *                    If it is empty array, no columns are sorted
    * @return updated CarbonWriterBuilder
    */
   public CarbonWriterBuilder sortBy(String[] sortColumns) {
@@ -138,6 +142,52 @@ public class CarbonWriterBuilder {
   }
 
   /**
+   * To support the load options for sdk writer
+   * @param options key,value pair of load options.
+   *                supported keys values are
+   *                a. bad_records_logger_enable -- true (write into separate 
logs), false
+   *                b. bad_records_action -- FAIL, FORCE, IGNORE, REDIRECT
+   *                c. bad_record_path -- path
+   *                d. dateformat -- same as JAVA SimpleDateFormat
+   *                e. timestampformat -- same as JAVA SimpleDateFormat
+   *                f. complex_delimiter_level_1 -- value to Split the 
complexTypeData
+   *                g. complex_delimiter_level_2 -- value to Split the nested 
complexTypeData
+   *                h. quotechar
+   *                i. escapechar
+   *
+   * @return updated CarbonWriterBuilder
+   */
+  public CarbonWriterBuilder withLoadOptions(Map<String, String> options) {
+    Objects.requireNonNull(options, "Load options should not be null");
+    //validate the options.
+    if (options.size() > 9) {
+      throw new IllegalArgumentException("Supports only nine options now. "
+          + "Refer method header or documentation");
+    }
+
+    for (String option: options.keySet()) {
+      if (!option.equalsIgnoreCase("bad_records_logger_enable") &&
+          !option.equalsIgnoreCase("bad_records_action") &&
+          !option.equalsIgnoreCase("bad_record_path") &&
+          !option.equalsIgnoreCase("dateformat") &&
+          !option.equalsIgnoreCase("timestampformat") &&
+          !option.equalsIgnoreCase("complex_delimiter_level_1") &&
+          !option.equalsIgnoreCase("complex_delimiter_level_2") &&
+          !option.equalsIgnoreCase("quotechar") &&
+          !option.equalsIgnoreCase("escapechar")) {
+        throw new IllegalArgumentException("Unsupported options. "
+            + "Refer method header or documentation");
+      }
+    }
+
+    // convert it to treeMap as keys need to be case insensitive
+    Map<String, String> optionsTreeMap = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+    optionsTreeMap.putAll(options);
+    this.options = optionsTreeMap;
+    return this;
+  }
+
+  /**
    * To set the carbondata file size in MB between 1MB-2048MB
    * @param blockSize is size in MB between 1MB to 2048 MB
    * @return updated CarbonWriterBuilder
@@ -194,7 +244,7 @@ public class CarbonWriterBuilder {
     }
 
     // build LoadModel
-    return buildLoadModel(table, UUID, taskNo);
+    return buildLoadModel(table, UUID, taskNo, options);
   }
 
   /**
@@ -210,11 +260,22 @@ public class CarbonWriterBuilder {
       tableSchemaBuilder = tableSchemaBuilder.blockletSize(blockletSize);
     }
 
-    List<String> sortColumnsList;
-    if (sortColumns != null) {
-      sortColumnsList = Arrays.asList(sortColumns);
+    List<String> sortColumnsList = new ArrayList<>();
+    if (sortColumns == null) {
+      // If sort columns are not specified, default set all dimensions to sort 
column.
+      // When dimensions are default set to sort column,
+      // Inverted index will be supported by default for sort columns.
+      for (Field field : schema.getFields()) {
+        if (field.getDataType() == DataTypes.STRING ||
+            field.getDataType() == DataTypes.DATE ||
+            field.getDataType() == DataTypes.TIMESTAMP) {
+          sortColumnsList.add(field.getFieldName());
+        }
+      }
+      sortColumns = new String[sortColumnsList.size()];
+      sortColumns = sortColumnsList.toArray(sortColumns);
     } else {
-      sortColumnsList = new LinkedList<>();
+      sortColumnsList = Arrays.asList(sortColumns);
     }
     for (Field field : schema.getFields()) {
       tableSchemaBuilder.addColumn(
@@ -275,11 +336,10 @@ public class CarbonWriterBuilder {
   /**
    * Build a {@link CarbonLoadModel}
    */
-  private CarbonLoadModel buildLoadModel(CarbonTable table, long UUID, String 
taskNo)
-      throws InvalidLoadOptionException, IOException {
-    Map<String, String> options = new HashMap<>();
-    if (sortColumns != null) {
-      options.put("sort_columns", Strings.mkString(sortColumns, ","));
+  private CarbonLoadModel buildLoadModel(CarbonTable table, long UUID, String 
taskNo,
+      Map<String, String> options) throws InvalidLoadOptionException, 
IOException {
+    if (options == null) {
+      options = new HashMap<>();
     }
     CarbonLoadModelBuilder builder = new CarbonLoadModelBuilder(table);
     return builder.build(options, UUID, taskNo);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f326479/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index 5f7ef6a..bb1a7c6 100644
--- 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -46,13 +46,23 @@ public class CarbonReaderTest {
     CarbonReader reader = CarbonReader.builder(path, "_temp")
         .projection(new String[]{"name", "age"}).build();
 
+    // expected output after sorting
+    String[] name = new String[100];
+    int[] age = new int[100];
+    for (int i = 0; i < 100; i++) {
+      name[i] = "robot" + (i / 10);
+      age[i] = (i % 10) * 10 + i / 10;
+    }
+
     int i = 0;
     while (reader.hasNext()) {
-      Object[] row = (Object[])reader.readNextRow();
-      Assert.assertEquals("robot" + (i % 10), row[0]);
-      Assert.assertEquals(i, row[1]);
+      Object[] row = (Object[]) reader.readNextRow();
+      // Default sort column is applied for dimensions. So, need  to validate 
accordingly
+      Assert.assertEquals(name[i], row[0]);
+      Assert.assertEquals(age[i], row[1]);
       i++;
     }
+    Assert.assertEquals(i, 100);
 
     FileUtils.deleteDirectory(new File(path));
   }

Reply via email to