This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 49d6258  [CARBONDATA-3553] Support SDK Writer using existing schema 
file
49d6258 is described below

commit 49d62587d4db4d9afa9a8e52078a7af0ae46d58a
Author: liuzhi <[email protected]>
AuthorDate: Mon Oct 21 17:04:52 2019 +0800

    [CARBONDATA-3553] Support SDK Writer using existing schema file
    
    Sometimes, user need to build SDK writer with schema file, rather than with 
scheme object. It's will be easier to use.
    
    ```
    val writer = CarbonWriter.builder
                   .outputPath(...)
                   .withSchemaFile("/default/test/Metadata/schema")
                   .withCsvInput()
                   .build()
    ```
    
    This closes #3415
---
 .../core/metadata/converter/SchemaConverter.java   |  13 +-
 .../core/metadata/schema/SchemaReader.java         |  39 +++++
 .../core/metadata/schema/table/CarbonTable.java    |   1 +
 .../apache/carbondata/core/util/CarbonUtil.java    |  22 ++-
 .../carbondata/hadoop/api/CarbonInputFormat.java   |   6 +-
 .../readsupport/impl/CarbonRowReadSupport.java     |   1 +
 .../testsuite/addsegment/AddSegmentTestCase.scala  | 158 ++++++++++++++++++++-
 .../sdk/TestSDKWithTransactionalTable.scala        | 114 +++++++++++++++
 .../loading/model/CarbonLoadModelBuilder.java      |   2 +
 store/sdk/pom.xml                                  |  15 +-
 .../carbondata/sdk/file/CarbonReaderBuilder.java   |  19 ++-
 .../carbondata/sdk/file/CarbonSchemaReader.java    |  18 +++
 .../carbondata/sdk/file/CarbonWriterBuilder.java   |  72 +++++++++-
 .../org/apache/carbondata/sdk/file/Schema.java     |  27 ++++
 14 files changed, 480 insertions(+), 27 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java
index 4f34070..4d8d739 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java
@@ -94,16 +94,19 @@ public interface SchemaConverter {
       org.apache.carbondata.format.TableSchema externalTableSchema, String 
tableNam);
 
   /**
-   * @param externalTableInfo
-   * @param dbName
-   * @param tableName
-   * @return
+   * method to convert thrift table info object to wrapper table info
+   *
+   * @param externalTableInfo thrift table info object
+   * @param dbName database name
+   * @param tableName table name
+   * @param tablePath table path
+   * @return TableInfo
    */
   TableInfo fromExternalToWrapperTableInfo(
       org.apache.carbondata.format.TableInfo externalTableInfo,
       String dbName,
       String tableName,
-      String storePath);
+      String tablePath);
 
   /**
    * method to convert thrift datamap schema object to wrapper
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
index 4cd4195..9d975e3 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
@@ -59,6 +59,45 @@ public class SchemaReader {
   }
 
   /**
+   * Read specified schema file as CarbonTable
+   * @param schemaFilePath schema file path
+   * @param conf hadoop configuration
+   * @return CarbonTable object
+   * @throws IOException if IO error occurs
+   */
+  public static CarbonTable readCarbonTableFromSchema(String schemaFilePath, 
Configuration conf)
+      throws IOException {
+    TableInfo tableInfo = readTableInfoFromSchema(schemaFilePath, conf);
+    CarbonMetadata.getInstance().loadTableMetadata(tableInfo);
+    return CarbonMetadata.getInstance().getCarbonTable("dummy_dummy");
+  }
+
+  /**
+   * Read specified schema file as TableInfo
+   * @param schemaFilePath schema file path
+   * @param conf hadoop configuration
+   * @return TableInfo object
+   * @throws IOException if IO error occurs
+   */
+  private static TableInfo readTableInfoFromSchema(String schemaFilePath, 
Configuration conf)
+      throws IOException {
+    if (FileFactory.isFileExist(schemaFilePath)) {
+      String tableName = "dummy";
+
+      org.apache.carbondata.format.TableInfo tableInfo = 
CarbonUtil.readSchemaFile(schemaFilePath);
+      SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+      TableInfo wrapperTableInfo = 
schemaConverter.fromExternalToWrapperTableInfo(
+          tableInfo,
+          "dummy",
+          tableName,
+          "dummy");
+      return wrapperTableInfo;
+    } else {
+      throw new IOException("File does not exist: " + schemaFilePath);
+    }
+  }
+
+  /**
    * the method returns the Wrapper TableInfo
    *
    * @param identifier
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index aa82b64..f70d6fe 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -1152,6 +1152,7 @@ public class CarbonTable implements Serializable, 
Writable {
 
   public void setTransactionalTable(boolean transactionalTable) {
     isTransactionalTable = transactionalTable;
+    getTableInfo().setTransactionalTable(transactionalTable);
   }
 
   /**
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 19d142e..66618a2 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2040,13 +2040,19 @@ public final class CarbonUtil {
    */
   public static org.apache.carbondata.format.TableInfo readSchemaFile(String 
schemaFilePath)
       throws IOException {
+    return readSchemaFile(schemaFilePath, FileFactory.getConfiguration());
+  }
+
+  public static org.apache.carbondata.format.TableInfo readSchemaFile(String 
schemaFilePath,
+      Configuration conf)
+      throws IOException {
     TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
       public org.apache.thrift.TBase<org.apache.carbondata.format.TableInfo,
           org.apache.carbondata.format.TableInfo._Fields> create() {
         return new org.apache.carbondata.format.TableInfo();
       }
     };
-    ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase);
+    ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, 
conf);
     thriftReader.open();
     org.apache.carbondata.format.TableInfo tableInfo =
         (org.apache.carbondata.format.TableInfo) thriftReader.read();
@@ -2541,12 +2547,14 @@ public final class CarbonUtil {
   }
 
   // Get the total size of carbon data and the total size of carbon index
-  private static HashMap<String, Long> getDataSizeAndIndexSize(String 
tablePath,
-      String segmentId) throws IOException {
+  public static HashMap<String, Long> getDataSizeAndIndexSize(
+      String segmentPath) throws IOException {
+    if (segmentPath == null) {
+      throw new IllegalArgumentException("Argument [segmentPath] is null.");
+    }
     long carbonDataSize = 0L;
     long carbonIndexSize = 0L;
     HashMap<String, Long> dataAndIndexSize = new HashMap<String, Long>();
-    String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
     FileFactory.FileType fileType = FileFactory.getFileType(segmentPath);
     switch (fileType) {
       case HDFS:
@@ -2595,6 +2603,12 @@ public final class CarbonUtil {
   }
 
   // Get the total size of carbon data and the total size of carbon index
+  private static HashMap<String, Long> getDataSizeAndIndexSize(String 
tablePath,
+      String segmentId) throws IOException {
+    return getDataSizeAndIndexSize(CarbonTablePath.getSegmentPath(tablePath, 
segmentId));
+  }
+
+  // Get the total size of carbon data and the total size of carbon index
   public static HashMap<String, Long> getDataSizeAndIndexSize(SegmentFileStore 
fileStore)
       throws IOException {
     long carbonDataSize = 0L;
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 56ccabc..42fae67 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -310,7 +310,7 @@ m filterExpression
   public static void setQuerySegment(Configuration conf, 
AbsoluteTableIdentifier identifier) {
     String dbName = 
identifier.getCarbonTableIdentifier().getDatabaseName().toLowerCase();
     String tbName = 
identifier.getCarbonTableIdentifier().getTableName().toLowerCase();
-    getQuerySegmentToAccess(conf, dbName, tbName);
+    setQuerySegmentToAccess(conf, dbName, tbName);
   }
 
   /**
@@ -898,7 +898,7 @@ m filterExpression
     return projectColumns.toArray(new String[projectColumns.size()]);
   }
 
-  private static void getQuerySegmentToAccess(Configuration conf, String 
dbName, String tableName) {
+  private static void setQuerySegmentToAccess(Configuration conf, String 
dbName, String tableName) {
     String segmentNumbersFromProperty = CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbName + 
"." + tableName, "*");
     if (!segmentNumbersFromProperty.trim().equals("*")) {
@@ -912,7 +912,7 @@ m filterExpression
    */
   public static void setQuerySegment(Configuration conf, CarbonTable 
carbonTable) {
     String tableName = carbonTable.getTableName();
-    getQuerySegmentToAccess(conf, carbonTable.getDatabaseName(), tableName);
+    setQuerySegmentToAccess(conf, carbonTable.getDatabaseName(), tableName);
   }
 
 }
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/CarbonRowReadSupport.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/CarbonRowReadSupport.java
index e2b5e60..1c8503d 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/CarbonRowReadSupport.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/CarbonRowReadSupport.java
@@ -41,6 +41,7 @@ public class CarbonRowReadSupport extends 
DictionaryDecodeReadSupport<CarbonRow>
         Calendar c = Calendar.getInstance();
         c.setTime(new Date(0));
         c.add(Calendar.DAY_OF_YEAR, (Integer) data[i]);
+        c.add(Calendar.DATE, 1);
         data[i] = new Date(c.getTime().getTime());
       } else if (dataTypes[i] == DataTypes.TIMESTAMP) {
         data[i] = new Timestamp((long) data[i] / 1000);
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index 50927ee..c4acf8e 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -24,13 +24,18 @@ import 
org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.{CarbonEnv, DataFrame, Row}
 import org.scalatest.BeforeAndAfterAll
-
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport
+import org.apache.carbondata.sdk.file.{CarbonReader, CarbonWriter}
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
+import org.junit.Assert
+
+import scala.io.Source
 
 class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
 
@@ -258,6 +263,157 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
     FileFactory.deleteAllFilesOfDir(new File(newPath))
   }
 
+  test("Test add segment by carbon written by sdk") {
+    val tableName = "add_segment_test"
+    sql(s"drop table if exists $tableName")
+    sql(
+      s"""
+        | CREATE TABLE $tableName (empno int, empname string, designation 
String, doj Timestamp,
+        | workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        | projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
+        | utilization int,salary int)
+        | STORED AS carbondata
+        |""".stripMargin)
+
+    val externalSegmentPath = storeLocation + "/" + "external_segment"
+    FileFactory.deleteAllFilesOfDir(new File(externalSegmentPath))
+
+    // write into external segment folder
+    val schemaFilePath = s"$storeLocation/$tableName/Metadata/schema"
+    val writer = CarbonWriter.builder
+      .outputPath(externalSegmentPath)
+      .withSchemaFile(schemaFilePath)
+      .writtenBy("AddSegmentTestCase")
+      .withCsvInput()
+      .build()
+    val source = Source.fromFile(s"$resourcesPath/data.csv")
+    var count = 0
+    for (line <- source.getLines()) {
+      if (count != 0) {
+        writer.write(line.split(","))
+      }
+      count = count + 1
+    }
+    writer.close()
+
+    sql(s"alter table $tableName add segment 
options('path'='$externalSegmentPath', 'format'='carbon')").show()
+    checkAnswer(sql(s"select count(*) from $tableName"), Seq(Row(10)))
+    sql(s"select * from $tableName").show()
+
+    expectSameResultBySchema(externalSegmentPath, schemaFilePath, tableName)
+    expectSameResultInferSchema(externalSegmentPath, tableName)
+
+    FileFactory.deleteAllFilesOfDir(new File(externalSegmentPath))
+    sql(s"drop table $tableName")
+  }
+
+  /**
+   * use sdk to read the specified path using specified schema file
+   * and compare result with select * from tableName
+   */
+  def expectSameResultBySchema(pathToRead: String, schemaFilePath: String, 
tableName: String): Unit = {
+    val tableRows = sql(s"select * from $tableName").collectAsList()
+    val projection = Seq("empno", "empname", "designation", "doj",
+      "workgroupcategory", "workgroupcategoryname", "deptno", "deptname",
+      "projectcode", "projectjoindate", "projectenddate", "attendance",
+      "utilization", "salary").toArray
+    val reader = CarbonReader.builder(pathToRead)
+      .withRowRecordReader()
+      .withReadSupport(classOf[CarbonRowReadSupport])
+      .projection(projection)
+      .build()
+
+    var count = 0
+    while (reader.hasNext) {
+      val row = reader.readNextRow.asInstanceOf[CarbonRow]
+      val tableRow = tableRows.get(count)
+      var columnIndex = 0
+      for (column <- row.getData) {
+        val tableRowColumn = tableRow.get(columnIndex)
+        Assert.assertEquals(s"cell[$count, $columnIndex] not equal", 
tableRowColumn.toString, column.toString)
+        columnIndex = columnIndex + 1
+      }
+      count += 1
+    }
+    reader.close()
+  }
+
+  /**
+   * use sdk to read the specified path by inferring schema
+   * and compare result with select * from tableName
+   */
+  def expectSameResultInferSchema(pathToRead: String, tableName: String): Unit 
= {
+    val tableRows = sql(s"select * from $tableName").collectAsList()
+    val projection = Seq("empno", "empname", "designation", "doj",
+      "workgroupcategory", "workgroupcategoryname", "deptno", "deptname",
+      "projectcode", "projectjoindate", "projectenddate", "attendance",
+      "utilization", "salary").toArray
+    val reader = CarbonReader.builder(pathToRead)
+      .withRowRecordReader()
+      .withReadSupport(classOf[CarbonRowReadSupport])
+      .projection(projection)
+      .build()
+
+    var count = 0
+    while (reader.hasNext) {
+      val row = reader.readNextRow.asInstanceOf[CarbonRow]
+      val tableRow = tableRows.get(count)
+      var columnIndex = 0
+      for (column <- row.getData) {
+        val tableRowColumn = tableRow.get(columnIndex)
+        Assert.assertEquals(s"cell[$count, $columnIndex] not equal", 
tableRowColumn.toString, column.toString)
+        columnIndex = columnIndex + 1
+      }
+      count += 1
+    }
+    reader.close()
+  }
+
+  test("Test add segment by carbon written by sdk, and 1 load") {
+    val tableName = "add_segment_test"
+    sql(s"drop table if exists $tableName")
+    sql(
+      s"""
+         | CREATE TABLE $tableName (empno int, empname string, designation 
String, doj Timestamp,
+         | workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+         | projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
+         | utilization int,salary int)
+         | STORED AS carbondata
+         |""".stripMargin)
+
+    sql(
+      s"""
+        |LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE $tableName
+        |OPTIONS('DELIMITER'=',', 'QUOTECHAR'='"')
+        |""".stripMargin)
+
+    val externalSegmentPath = storeLocation + "/" + "external_segment"
+    FileFactory.deleteAllFilesOfDir(new File(externalSegmentPath))
+
+    // write into external segment folder
+    val writer = CarbonWriter.builder
+      .outputPath(externalSegmentPath)
+      .withSchemaFile(s"$storeLocation/$tableName/Metadata/schema")
+      .writtenBy("AddSegmentTestCase")
+      .withCsvInput()
+      .build()
+    val source = Source.fromFile(s"$resourcesPath/data.csv")
+    var count = 0
+    for (line <- source.getLines()) {
+      if (count != 0) {
+        writer.write(line.split(","))
+      }
+      count = count + 1
+    }
+    writer.close()
+
+    sql(s"alter table $tableName add segment 
options('path'='$externalSegmentPath', 'format'='carbon')").show()
+    checkAnswer(sql(s"select count(*) from $tableName"), Seq(Row(20)))
+    checkAnswer(sql(s"select sum(empno) from $tableName where empname = 
'arvind' "), Seq(Row(22)))
+    FileFactory.deleteAllFilesOfDir(new File(externalSegmentPath))
+    sql(s"drop table $tableName")
+  }
+
   def copy(oldLoc: String, newLoc: String): Unit = {
     val oldFolder = FileFactory.getCarbonFile(oldLoc)
     FileFactory.mkdirs(newLoc, FileFactory.getConfiguration)
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sdk/TestSDKWithTransactionalTable.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sdk/TestSDKWithTransactionalTable.scala
new file mode 100644
index 0000000..7d96d97
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sdk/TestSDKWithTransactionalTable.scala
@@ -0,0 +1,114 @@
+package org.apache.carbondata.spark.testsuite.sdk
+
+import java.io.{BufferedWriter, File, FileWriter}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.sdk.file.{ArrowCarbonReader, CarbonReader, 
CarbonSchemaReader}
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestSDKWithTransactionalTable extends QueryTest with BeforeAndAfterAll {
+  var filePath: String = _
+
+  def buildTestData() =  {
+    filePath = s"${integrationPath}/spark-common-test/target/big.csv"
+    val file = new File(filePath)
+    val writer = new BufferedWriter(new FileWriter(file))
+    writer.write("c1, c2, c3, c4, c5, c6, c7, c8, c9, c10")
+    writer.newLine()
+    for (i <- 0 until 10) {
+      writer.write("a" + 1%1000 + "," +
+                   "b" + 1%1000 + "," +
+                   "c" + 1%1000 + "," +
+                   "d" + 1%1000 + "," +
+                   "e" + 1%1000 + "," +
+                   "f" + 1%1000 + "," +
+                   1%1000 + "," +
+                   1%1000 + "," +
+                   1%1000 + "," +
+                   1%1000 + "\n")
+      if ( i % 10000 == 0) {
+        writer.flush()
+      }
+    }
+    writer.close()
+  }
+
+  def dropTable() = {
+    sql("DROP TABLE IF EXISTS carbon_load1")
+    sql("DROP TABLE IF EXISTS train")
+    sql("DROP TABLE IF EXISTS test")
+  }
+
+  override def beforeAll {
+    dropTable
+    buildTestData
+  }
+
+  test("test sdk with transactional table, read as arrow") {
+
+    sql(
+      """
+        | CREATE TABLE carbon_load1(
+        |    c1 string, c2 string, c3 string, c4 string, c5 string,
+        |    c6 string, c7 int, c8 int, c9 int, c10 int)
+        | STORED AS carbondata
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1")
+
+    val table = CarbonEnv.getCarbonTable(None, 
"carbon_load1")(sqlContext.sparkSession)
+
+    val reader:ArrowCarbonReader[Array[Object]] =
+      CarbonReader.builder(table.getTablePath, 
table.getTableName).buildArrowReader()
+
+    var count = 0
+    while(reader.hasNext) {
+      reader.readNextRow()
+      count += 1
+    }
+    reader.close()
+    checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(count)))
+    sql("DROP TABLE carbon_load1")
+  }
+
+  test("test sdk with transactional table, read as row") {
+
+    sql(
+      """
+        | CREATE TABLE carbon_load1(
+        |    c1 string, c2 string, c3 string, c4 string, c5 string,
+        |    c6 string, c7 int, c8 int, c9 int, c10 int)
+        | STORED AS carbondata
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1")
+
+    val table = CarbonEnv.getCarbonTable(None, 
"carbon_load1")(sqlContext.sparkSession)
+    val reader = CarbonReader.builder(table.getTablePath, 
table.getTableName).build()
+
+    var count = 0
+    while ( { reader.hasNext }) {
+      var row = reader.readNextRow.asInstanceOf[Array[AnyRef]]
+      count += 1
+    }
+    reader.close()
+
+    checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(count)))
+    sql("DROP TABLE carbon_load1")
+  }
+
+  override def afterAll {
+    new File(filePath).delete()
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+        CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
+  }
+
+}
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 052db05..919437c 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -79,6 +79,8 @@ public class CarbonLoadModelBuilder {
         columns[i] = csvHeader.get(i).getColName();
       }
       optionsFinal.put("fileheader", Strings.mkString(columns, ","));
+    } else {
+      optionsFinal.put("fileheader", options.get("fileheader"));
     }
     optionsFinal.put("bad_record_path", 
CarbonBadRecordUtil.getBadRecordsPath(options, table));
     optionsFinal.put("sort_scope",
diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml
index 6f04a58..c1e5120 100644
--- a/store/sdk/pom.xml
+++ b/store/sdk/pom.xml
@@ -169,13 +169,6 @@
   <build>
     <plugins>
       <plugin>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
-        </configuration>
-      </plugin>
-      <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-shade-plugin</artifactId>
         <configuration>
@@ -210,6 +203,14 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>8</source>
+          <target>8</target>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
   <profiles>
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index 9a686bd..78f88be 100644
--- 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -39,6 +39,8 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
 import org.apache.carbondata.hadoop.util.CarbonVectorizedRecordReader;
 
 import org.apache.commons.lang3.StringUtils;
@@ -65,6 +67,7 @@ public class CarbonReaderBuilder {
   private InputSplit inputSplit;
   private boolean useArrowReader;
   private List fileLists;
+  private Class<? extends CarbonReadSupport> readSupportClass;
 
   /**
    * Construct a CarbonReaderBuilder with table path and table name
@@ -132,6 +135,16 @@ public class CarbonReaderBuilder {
   }
 
   /**
+   * set read support class
+   * @param readSupportClass read support class
+   * @return CarbonReaderBuilder object
+   */
+  public CarbonReaderBuilder withReadSupport(Class<? extends 
CarbonReadSupport> readSupportClass) {
+    this.readSupportClass = readSupportClass;
+    return this;
+  }
+
+  /**
    * Configure the projection column names of carbon reader
    *
    * @param projectionColumnNames projection column names
@@ -240,11 +253,11 @@ public class CarbonReaderBuilder {
           ((CarbonInputSplit) 
inputSplit).getSegment().getReadCommittedScope().getFilePath();
       tableName = "UnknownTable" + UUID.randomUUID();
     }
-    CarbonTable table;
-    // now always infer schema. TODO:Refactor in next version.
     if (null == this.fileLists && null == tablePath) {
       throw new IllegalArgumentException("Please set table path first.");
     }
+    // infer schema
+    CarbonTable table;
     if (null != this.fileLists) {
       if (fileLists.size() < 1) {
         throw new IllegalArgumentException("fileLists must have one file in 
list as least!");
@@ -341,6 +354,7 @@ public class CarbonReaderBuilder {
     if (hadoopConf == null) {
       hadoopConf = FileFactory.getConfiguration();
     }
+    CarbonTableInputFormat.setCarbonReadSupport(hadoopConf, readSupportClass);
     final Job job = new Job(new JobConf(hadoopConf));
     CarbonFileInputFormat format = prepareFileInputFormat(job, false, true);
     try {
@@ -371,6 +385,7 @@ public class CarbonReaderBuilder {
     if (hadoopConf == null) {
       hadoopConf = FileFactory.getConfiguration();
     }
+    CarbonTableInputFormat.setCarbonReadSupport(hadoopConf, readSupportClass);
     final Job job = new Job(new JobConf(hadoopConf));
     CarbonFileInputFormat format = prepareFileInputFormat(job, false, true);
     format.setAllColumnProjectionIfNotConfigured(job,
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
index 88cbc33..b68be3f 100644
--- 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
@@ -30,11 +30,13 @@ import 
org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.table.TableSchema;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.reader.CarbonFooterReaderV3;
 import org.apache.carbondata.core.reader.CarbonHeaderReader;
 import org.apache.carbondata.core.reader.CarbonIndexFileReader;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.FileFooter3;
 import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 import org.apache.carbondata.sdk.file.arrow.ArrowConverter;
@@ -159,6 +161,11 @@ public class CarbonSchemaReader {
    */
   public static Schema readSchema(String path, boolean validateSchema, 
Configuration conf)
       throws IOException {
+    // Check whether it is transational table reads the schema
+    String schemaFilePath = CarbonTablePath.getSchemaFilePath(path);
+    if (FileFactory.getCarbonFile(schemaFilePath, conf).exists()) {
+      return readSchemaInSchemaFile(schemaFilePath, conf);
+    }
     if (path.endsWith(INDEX_FILE_EXT)) {
       return readSchemaFromIndexFile(path, conf);
     } else if (path.endsWith(CARBON_DATA_EXT)) {
@@ -191,6 +198,17 @@ public class CarbonSchemaReader {
     }
   }
 
+  private static Schema readSchemaInSchemaFile(String schemaFilePath, 
Configuration conf)
+      throws IOException {
+    org.apache.carbondata.format.TableInfo tableInfo =
+        CarbonUtil.readSchemaFile(schemaFilePath, conf);
+    SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+    TableSchema factTable =
+        schemaConverter.fromExternalToWrapperTableInfo(tableInfo, "", "", 
"").getFactTable();
+    List<ColumnSchema> schemaList = factTable.getListOfColumns();
+    return new Schema(schemaList, factTable.getTableProperties());
+  }
+
   /**
    * read schema from path,
    * path can be folder path, carbonindex file path, and carbondata file path
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 29ef81f..1ae3d28 100644
--- 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -28,6 +28,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
@@ -39,9 +40,11 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.MapType;
 import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.metadata.schema.SchemaReader;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableSchema;
 import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -83,6 +86,9 @@ public class CarbonWriterBuilder {
 
   private WRITER_TYPE writerType;
 
+  // can be set by withSchemaFile
+  private CarbonTable carbonTable;
+
   /**
    * Sets the output path of the writer builder
    *
@@ -172,6 +178,7 @@ public class CarbonWriterBuilder {
    *                g. complex_delimiter_level_2 -- value to Split the nested 
complexTypeData
    *                h. quotechar
    *                i. escapechar
+   *                j. fileheader
    *                <p>
    *                Default values are as follows.
    *                <p>
@@ -184,6 +191,7 @@ public class CarbonWriterBuilder {
    *                g. complex_delimiter_level_2 -- "\002"
    *                h. quotechar -- "\""
    *                i. escapechar -- "\\"
+   *                j. fileheader -- None
    * @return updated CarbonWriterBuilder
    */
   public CarbonWriterBuilder withLoadOptions(Map<String, String> options) {
@@ -200,7 +208,8 @@ public class CarbonWriterBuilder {
           !option.equalsIgnoreCase("complex_delimiter_level_3") &&
           !option.equalsIgnoreCase("quotechar") &&
           !option.equalsIgnoreCase("escapechar") &&
-          !option.equalsIgnoreCase("binary_decoder")) {
+          !option.equalsIgnoreCase("binary_decoder") &&
+          !option.equalsIgnoreCase("fileheader")) {
         throw new IllegalArgumentException("Unsupported option:" + option
             + ". Refer method header or documentation");
       }
@@ -480,6 +489,9 @@ public class CarbonWriterBuilder {
    */
   public CarbonWriterBuilder withCsvInput(Schema schema) {
     Objects.requireNonNull(schema, "schema should not be null");
+    if (this.schema != null) {
+      throw new IllegalArgumentException("schema should be set only once");
+    }
     this.schema = schema;
     this.writerType = WRITER_TYPE.CSV;
     return this;
@@ -488,11 +500,24 @@ public class CarbonWriterBuilder {
   /**
    * to build a {@link CarbonWriter}, which accepts row in CSV format
    *
+   * @return CarbonWriterBuilder
+   */
+  public CarbonWriterBuilder withCsvInput() {
+    this.writerType = WRITER_TYPE.CSV;
+    return this;
+  }
+
+  /**
+   * to build a {@link CarbonWriter}, which accepts row in CSV format
+   *
    * @param jsonSchema json Schema string
    * @return CarbonWriterBuilder
    */
   public CarbonWriterBuilder withCsvInput(String jsonSchema) {
     Objects.requireNonNull(jsonSchema, "schema should not be null");
+    if (this.schema != null) {
+      throw new IllegalArgumentException("schema should be set only once");
+    }
     this.schema = Schema.parseJson(jsonSchema);
     this.writerType = WRITER_TYPE.CSV;
     return this;
@@ -506,6 +531,9 @@ public class CarbonWriterBuilder {
    */
   public CarbonWriterBuilder withAvroInput(org.apache.avro.Schema avroSchema) {
     Objects.requireNonNull(avroSchema, "Avro schema should not be null");
+    if (this.schema != null) {
+      throw new IllegalArgumentException("schema should be set only once");
+    }
     this.schema = AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema);
     this.writerType = WRITER_TYPE.AVRO;
     return this;
@@ -519,12 +547,41 @@ public class CarbonWriterBuilder {
    */
   public CarbonWriterBuilder withJsonInput(Schema carbonSchema) {
     Objects.requireNonNull(carbonSchema, "schema should not be null");
+    if (this.schema != null) {
+      throw new IllegalArgumentException("schema should be set only once");
+    }
     this.schema = carbonSchema;
     this.writerType = WRITER_TYPE.JSON;
     return this;
   }
 
   /**
+   * to build a {@link CarbonWriter}, which accepts row in Json format
+   *
+   * @return CarbonWriterBuilder
+   */
+  public CarbonWriterBuilder withJsonInput() {
+    this.writerType = WRITER_TYPE.JSON;
+    return this;
+  }
+
+  public CarbonWriterBuilder withSchemaFile(String schemaFilePath) throws 
IOException {
+    Objects.requireNonNull(schemaFilePath, "schema file path should not be 
null");
+    if (path == null) {
+      throw new IllegalArgumentException("output path should be set before 
setting schema file");
+    }
+    carbonTable = SchemaReader.readCarbonTableFromSchema(schemaFilePath, new 
Configuration());
+    carbonTable.getTableInfo().setTablePath(path);
+    carbonTable.setTransactionalTable(false);
+    List<ColumnSchema> columnSchemas =
+        
carbonTable.getCreateOrderColumn(carbonTable.getTableName()).stream().map(
+            CarbonColumn::getColumnSchema
+        ).collect(Collectors.toList());
+    schema = new Schema(columnSchemas);
+    return this;
+  }
+
+  /**
    * Build a {@link CarbonWriter}
    * This writer is not thread safe,
    * use withThreadSafe() configuration in multi thread environment
@@ -536,7 +593,7 @@ public class CarbonWriterBuilder {
   public CarbonWriter build() throws IOException, InvalidLoadOptionException {
     Objects.requireNonNull(path, "path should not be null");
     if (this.writerType == null) {
-      throw new IOException(
+      throw new RuntimeException(
           "'writerType' must be set, use withCsvInput() or withAvroInput() or 
withJsonInput()  "
               + "API based on input");
     }
@@ -545,6 +602,9 @@ public class CarbonWriterBuilder {
           "'writtenBy' must be set when writing carbon files, use writtenBy() 
API to "
               + "set it, it can be the name of the application which is using 
the SDK");
     }
+    if (this.schema == null) {
+      throw new RuntimeException("schema should be set");
+    }
     CarbonLoadModel loadModel = buildLoadModel(schema);
     loadModel.setSdkWriterCores(numOfThreads);
     CarbonProperties.getInstance()
@@ -613,10 +673,12 @@ public class CarbonWriterBuilder {
         }
       }
     }
-    // build CarbonTable using schema
-    CarbonTable table = buildCarbonTable();
+    if (carbonTable == null) {
+      // if carbonTable is not set by user, build it using schema
+      carbonTable = buildCarbonTable();
+    }
     // build LoadModel
-    return buildLoadModel(table, timestamp, taskNo, options);
+    return buildLoadModel(carbonTable, timestamp, taskNo, options);
   }
 
   private void validateLongStringColumns(Schema carbonSchema, Set<String> 
longStringColumns) {
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java
index fb23725..58a43d0 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java
@@ -20,7 +20,9 @@ package org.apache.carbondata.sdk.file;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
@@ -41,12 +43,23 @@ public class Schema {
 
   private Field[] fields;
 
+  private Map<String, String> properties;
+
   /**
    * construct a schema with fields
    * @param fields
    */
   public Schema(Field[] fields) {
+    this(fields, new HashMap<String, String>());
+  }
+
+  /**
+   * construct a schema with fields
+   * @param fields
+   */
+  public Schema(Field[] fields, Map<String, String> properties) {
     this.fields = fields;
+    this.properties = properties;
   }
 
   /**
@@ -55,10 +68,20 @@ public class Schema {
    * @param columnSchemaList column schema list
    */
   public Schema(List<ColumnSchema> columnSchemaList) {
+    this(columnSchemaList, new HashMap<String, String>());
+  }
+
+  /**
+   * construct a schema with List<ColumnSchema>
+   *
+   * @param columnSchemaList column schema list
+   */
+  public Schema(List<ColumnSchema> columnSchemaList, Map<String, String> 
properties) {
     fields = new Field[columnSchemaList.size()];
     for (int i = 0; i < columnSchemaList.size(); i++) {
       fields[i] = new Field(columnSchemaList.get(i));
     }
+    this.properties = properties;
   }
 
   /**
@@ -152,6 +175,10 @@ public class Schema {
     return this;
   }
 
+  public Map<String, String> getProperties() {
+    return properties;
+  }
+
   @Override
   public int hashCode() {
     return super.hashCode();

Reply via email to