This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 49d6258 [CARBONDATA-3553] Support SDK Writer using existing schema
file
49d6258 is described below
commit 49d62587d4db4d9afa9a8e52078a7af0ae46d58a
Author: liuzhi <[email protected]>
AuthorDate: Mon Oct 21 17:04:52 2019 +0800
[CARBONDATA-3553] Support SDK Writer using existing schema file
Sometimes, user need to build SDK writer with schema file, rather than with
scheme object. It's will be easier to use.
```
val writer = CarbonWriter.builder
.outputPath(...)
.withSchemaFile("/default/test/Metadata/schema")
.withCsvInput()
.build()
```
This closes #3415
---
.../core/metadata/converter/SchemaConverter.java | 13 +-
.../core/metadata/schema/SchemaReader.java | 39 +++++
.../core/metadata/schema/table/CarbonTable.java | 1 +
.../apache/carbondata/core/util/CarbonUtil.java | 22 ++-
.../carbondata/hadoop/api/CarbonInputFormat.java | 6 +-
.../readsupport/impl/CarbonRowReadSupport.java | 1 +
.../testsuite/addsegment/AddSegmentTestCase.scala | 158 ++++++++++++++++++++-
.../sdk/TestSDKWithTransactionalTable.scala | 114 +++++++++++++++
.../loading/model/CarbonLoadModelBuilder.java | 2 +
store/sdk/pom.xml | 15 +-
.../carbondata/sdk/file/CarbonReaderBuilder.java | 19 ++-
.../carbondata/sdk/file/CarbonSchemaReader.java | 18 +++
.../carbondata/sdk/file/CarbonWriterBuilder.java | 72 +++++++++-
.../org/apache/carbondata/sdk/file/Schema.java | 27 ++++
14 files changed, 480 insertions(+), 27 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java
b/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java
index 4f34070..4d8d739 100644
---
a/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java
+++
b/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java
@@ -94,16 +94,19 @@ public interface SchemaConverter {
org.apache.carbondata.format.TableSchema externalTableSchema, String
tableNam);
/**
- * @param externalTableInfo
- * @param dbName
- * @param tableName
- * @return
+ * method to convert thrift table info object to wrapper table info
+ *
+ * @param externalTableInfo thrift table info object
+ * @param dbName database name
+ * @param tableName table name
+ * @param tablePath table path
+ * @return TableInfo
*/
TableInfo fromExternalToWrapperTableInfo(
org.apache.carbondata.format.TableInfo externalTableInfo,
String dbName,
String tableName,
- String storePath);
+ String tablePath);
/**
* method to convert thrift datamap schema object to wrapper
diff --git
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
index 4cd4195..9d975e3 100644
---
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
+++
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
@@ -59,6 +59,45 @@ public class SchemaReader {
}
/**
+ * Read specified schema file as CarbonTable
+ * @param schemaFilePath schema file path
+ * @param conf hadoop configuration
+ * @return CarbonTable object
+ * @throws IOException if IO error occurs
+ */
+ public static CarbonTable readCarbonTableFromSchema(String schemaFilePath,
Configuration conf)
+ throws IOException {
+ TableInfo tableInfo = readTableInfoFromSchema(schemaFilePath, conf);
+ CarbonMetadata.getInstance().loadTableMetadata(tableInfo);
+ return CarbonMetadata.getInstance().getCarbonTable("dummy_dummy");
+ }
+
+ /**
+ * Read specified schema file as TableInfo
+ * @param schemaFilePath schema file path
+ * @param conf hadoop configuration
+ * @return TableInfo object
+ * @throws IOException if IO error occurs
+ */
+ private static TableInfo readTableInfoFromSchema(String schemaFilePath,
Configuration conf)
+ throws IOException {
+ if (FileFactory.isFileExist(schemaFilePath)) {
+ String tableName = "dummy";
+
+ org.apache.carbondata.format.TableInfo tableInfo =
CarbonUtil.readSchemaFile(schemaFilePath);
+ SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+ TableInfo wrapperTableInfo =
schemaConverter.fromExternalToWrapperTableInfo(
+ tableInfo,
+ "dummy",
+ tableName,
+ "dummy");
+ return wrapperTableInfo;
+ } else {
+ throw new IOException("File does not exist: " + schemaFilePath);
+ }
+ }
+
+ /**
* the method returns the Wrapper TableInfo
*
* @param identifier
diff --git
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index aa82b64..f70d6fe 100644
---
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -1152,6 +1152,7 @@ public class CarbonTable implements Serializable,
Writable {
public void setTransactionalTable(boolean transactionalTable) {
isTransactionalTable = transactionalTable;
+ getTableInfo().setTransactionalTable(transactionalTable);
}
/**
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 19d142e..66618a2 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2040,13 +2040,19 @@ public final class CarbonUtil {
*/
public static org.apache.carbondata.format.TableInfo readSchemaFile(String
schemaFilePath)
throws IOException {
+ return readSchemaFile(schemaFilePath, FileFactory.getConfiguration());
+ }
+
+ public static org.apache.carbondata.format.TableInfo readSchemaFile(String
schemaFilePath,
+ Configuration conf)
+ throws IOException {
TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
public org.apache.thrift.TBase<org.apache.carbondata.format.TableInfo,
org.apache.carbondata.format.TableInfo._Fields> create() {
return new org.apache.carbondata.format.TableInfo();
}
};
- ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase);
+ ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase,
conf);
thriftReader.open();
org.apache.carbondata.format.TableInfo tableInfo =
(org.apache.carbondata.format.TableInfo) thriftReader.read();
@@ -2541,12 +2547,14 @@ public final class CarbonUtil {
}
// Get the total size of carbon data and the total size of carbon index
- private static HashMap<String, Long> getDataSizeAndIndexSize(String
tablePath,
- String segmentId) throws IOException {
+ public static HashMap<String, Long> getDataSizeAndIndexSize(
+ String segmentPath) throws IOException {
+ if (segmentPath == null) {
+ throw new IllegalArgumentException("Argument [segmentPath] is null.");
+ }
long carbonDataSize = 0L;
long carbonIndexSize = 0L;
HashMap<String, Long> dataAndIndexSize = new HashMap<String, Long>();
- String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
FileFactory.FileType fileType = FileFactory.getFileType(segmentPath);
switch (fileType) {
case HDFS:
@@ -2595,6 +2603,12 @@ public final class CarbonUtil {
}
// Get the total size of carbon data and the total size of carbon index
+ private static HashMap<String, Long> getDataSizeAndIndexSize(String
tablePath,
+ String segmentId) throws IOException {
+ return getDataSizeAndIndexSize(CarbonTablePath.getSegmentPath(tablePath,
segmentId));
+ }
+
+ // Get the total size of carbon data and the total size of carbon index
public static HashMap<String, Long> getDataSizeAndIndexSize(SegmentFileStore
fileStore)
throws IOException {
long carbonDataSize = 0L;
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 56ccabc..42fae67 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -310,7 +310,7 @@ m filterExpression
public static void setQuerySegment(Configuration conf,
AbsoluteTableIdentifier identifier) {
String dbName =
identifier.getCarbonTableIdentifier().getDatabaseName().toLowerCase();
String tbName =
identifier.getCarbonTableIdentifier().getTableName().toLowerCase();
- getQuerySegmentToAccess(conf, dbName, tbName);
+ setQuerySegmentToAccess(conf, dbName, tbName);
}
/**
@@ -898,7 +898,7 @@ m filterExpression
return projectColumns.toArray(new String[projectColumns.size()]);
}
- private static void getQuerySegmentToAccess(Configuration conf, String
dbName, String tableName) {
+ private static void setQuerySegmentToAccess(Configuration conf, String
dbName, String tableName) {
String segmentNumbersFromProperty = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbName +
"." + tableName, "*");
if (!segmentNumbersFromProperty.trim().equals("*")) {
@@ -912,7 +912,7 @@ m filterExpression
*/
public static void setQuerySegment(Configuration conf, CarbonTable
carbonTable) {
String tableName = carbonTable.getTableName();
- getQuerySegmentToAccess(conf, carbonTable.getDatabaseName(), tableName);
+ setQuerySegmentToAccess(conf, carbonTable.getDatabaseName(), tableName);
}
}
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/CarbonRowReadSupport.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/CarbonRowReadSupport.java
index e2b5e60..1c8503d 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/CarbonRowReadSupport.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/CarbonRowReadSupport.java
@@ -41,6 +41,7 @@ public class CarbonRowReadSupport extends
DictionaryDecodeReadSupport<CarbonRow>
Calendar c = Calendar.getInstance();
c.setTime(new Date(0));
c.add(Calendar.DAY_OF_YEAR, (Integer) data[i]);
+ c.add(Calendar.DATE, 1);
data[i] = new Date(c.getTime().getTime());
} else if (dataTypes[i] == DataTypes.TIMESTAMP) {
data[i] = new Timestamp((long) data[i] / 1000);
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index 50927ee..c4acf8e 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -24,13 +24,18 @@ import
org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.{CarbonEnv, DataFrame, Row}
import org.scalatest.BeforeAndAfterAll
-
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport
+import org.apache.carbondata.sdk.file.{CarbonReader, CarbonWriter}
import org.apache.carbondata.spark.rdd.CarbonScanRDD
+import org.junit.Assert
+
+import scala.io.Source
class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
@@ -258,6 +263,157 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
FileFactory.deleteAllFilesOfDir(new File(newPath))
}
+ test("Test add segment by carbon written by sdk") {
+ val tableName = "add_segment_test"
+ sql(s"drop table if exists $tableName")
+ sql(
+ s"""
+ | CREATE TABLE $tableName (empno int, empname string, designation
String, doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int)
+ | STORED AS carbondata
+ |""".stripMargin)
+
+ val externalSegmentPath = storeLocation + "/" + "external_segment"
+ FileFactory.deleteAllFilesOfDir(new File(externalSegmentPath))
+
+ // write into external segment folder
+ val schemaFilePath = s"$storeLocation/$tableName/Metadata/schema"
+ val writer = CarbonWriter.builder
+ .outputPath(externalSegmentPath)
+ .withSchemaFile(schemaFilePath)
+ .writtenBy("AddSegmentTestCase")
+ .withCsvInput()
+ .build()
+ val source = Source.fromFile(s"$resourcesPath/data.csv")
+ var count = 0
+ for (line <- source.getLines()) {
+ if (count != 0) {
+ writer.write(line.split(","))
+ }
+ count = count + 1
+ }
+ writer.close()
+
+ sql(s"alter table $tableName add segment
options('path'='$externalSegmentPath', 'format'='carbon')").show()
+ checkAnswer(sql(s"select count(*) from $tableName"), Seq(Row(10)))
+ sql(s"select * from $tableName").show()
+
+ expectSameResultBySchema(externalSegmentPath, schemaFilePath, tableName)
+ expectSameResultInferSchema(externalSegmentPath, tableName)
+
+ FileFactory.deleteAllFilesOfDir(new File(externalSegmentPath))
+ sql(s"drop table $tableName")
+ }
+
+ /**
+ * use sdk to read the specified path using specified schema file
+ * and compare result with select * from tableName
+ */
+ def expectSameResultBySchema(pathToRead: String, schemaFilePath: String,
tableName: String): Unit = {
+ val tableRows = sql(s"select * from $tableName").collectAsList()
+ val projection = Seq("empno", "empname", "designation", "doj",
+ "workgroupcategory", "workgroupcategoryname", "deptno", "deptname",
+ "projectcode", "projectjoindate", "projectenddate", "attendance",
+ "utilization", "salary").toArray
+ val reader = CarbonReader.builder(pathToRead)
+ .withRowRecordReader()
+ .withReadSupport(classOf[CarbonRowReadSupport])
+ .projection(projection)
+ .build()
+
+ var count = 0
+ while (reader.hasNext) {
+ val row = reader.readNextRow.asInstanceOf[CarbonRow]
+ val tableRow = tableRows.get(count)
+ var columnIndex = 0
+ for (column <- row.getData) {
+ val tableRowColumn = tableRow.get(columnIndex)
+ Assert.assertEquals(s"cell[$count, $columnIndex] not equal",
tableRowColumn.toString, column.toString)
+ columnIndex = columnIndex + 1
+ }
+ count += 1
+ }
+ reader.close()
+ }
+
+ /**
+ * use sdk to read the specified path by inferring schema
+ * and compare result with select * from tableName
+ */
+ def expectSameResultInferSchema(pathToRead: String, tableName: String): Unit
= {
+ val tableRows = sql(s"select * from $tableName").collectAsList()
+ val projection = Seq("empno", "empname", "designation", "doj",
+ "workgroupcategory", "workgroupcategoryname", "deptno", "deptname",
+ "projectcode", "projectjoindate", "projectenddate", "attendance",
+ "utilization", "salary").toArray
+ val reader = CarbonReader.builder(pathToRead)
+ .withRowRecordReader()
+ .withReadSupport(classOf[CarbonRowReadSupport])
+ .projection(projection)
+ .build()
+
+ var count = 0
+ while (reader.hasNext) {
+ val row = reader.readNextRow.asInstanceOf[CarbonRow]
+ val tableRow = tableRows.get(count)
+ var columnIndex = 0
+ for (column <- row.getData) {
+ val tableRowColumn = tableRow.get(columnIndex)
+ Assert.assertEquals(s"cell[$count, $columnIndex] not equal",
tableRowColumn.toString, column.toString)
+ columnIndex = columnIndex + 1
+ }
+ count += 1
+ }
+ reader.close()
+ }
+
+ test("Test add segment by carbon written by sdk, and 1 load") {
+ val tableName = "add_segment_test"
+ sql(s"drop table if exists $tableName")
+ sql(
+ s"""
+ | CREATE TABLE $tableName (empno int, empname string, designation
String, doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int)
+ | STORED AS carbondata
+ |""".stripMargin)
+
+ sql(
+ s"""
+ |LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE $tableName
+ |OPTIONS('DELIMITER'=',', 'QUOTECHAR'='"')
+ |""".stripMargin)
+
+ val externalSegmentPath = storeLocation + "/" + "external_segment"
+ FileFactory.deleteAllFilesOfDir(new File(externalSegmentPath))
+
+ // write into external segment folder
+ val writer = CarbonWriter.builder
+ .outputPath(externalSegmentPath)
+ .withSchemaFile(s"$storeLocation/$tableName/Metadata/schema")
+ .writtenBy("AddSegmentTestCase")
+ .withCsvInput()
+ .build()
+ val source = Source.fromFile(s"$resourcesPath/data.csv")
+ var count = 0
+ for (line <- source.getLines()) {
+ if (count != 0) {
+ writer.write(line.split(","))
+ }
+ count = count + 1
+ }
+ writer.close()
+
+ sql(s"alter table $tableName add segment
options('path'='$externalSegmentPath', 'format'='carbon')").show()
+ checkAnswer(sql(s"select count(*) from $tableName"), Seq(Row(20)))
+ checkAnswer(sql(s"select sum(empno) from $tableName where empname =
'arvind' "), Seq(Row(22)))
+ FileFactory.deleteAllFilesOfDir(new File(externalSegmentPath))
+ sql(s"drop table $tableName")
+ }
+
def copy(oldLoc: String, newLoc: String): Unit = {
val oldFolder = FileFactory.getCarbonFile(oldLoc)
FileFactory.mkdirs(newLoc, FileFactory.getConfiguration)
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sdk/TestSDKWithTransactionalTable.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sdk/TestSDKWithTransactionalTable.scala
new file mode 100644
index 0000000..7d96d97
--- /dev/null
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sdk/TestSDKWithTransactionalTable.scala
@@ -0,0 +1,114 @@
+package org.apache.carbondata.spark.testsuite.sdk
+
+import java.io.{BufferedWriter, File, FileWriter}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.sdk.file.{ArrowCarbonReader, CarbonReader,
CarbonSchemaReader}
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestSDKWithTransactionalTable extends QueryTest with BeforeAndAfterAll {
+ var filePath: String = _
+
+ def buildTestData() = {
+ filePath = s"${integrationPath}/spark-common-test/target/big.csv"
+ val file = new File(filePath)
+ val writer = new BufferedWriter(new FileWriter(file))
+ writer.write("c1, c2, c3, c4, c5, c6, c7, c8, c9, c10")
+ writer.newLine()
+ for (i <- 0 until 10) {
+ writer.write("a" + 1%1000 + "," +
+ "b" + 1%1000 + "," +
+ "c" + 1%1000 + "," +
+ "d" + 1%1000 + "," +
+ "e" + 1%1000 + "," +
+ "f" + 1%1000 + "," +
+ 1%1000 + "," +
+ 1%1000 + "," +
+ 1%1000 + "," +
+ 1%1000 + "\n")
+ if ( i % 10000 == 0) {
+ writer.flush()
+ }
+ }
+ writer.close()
+ }
+
+ def dropTable() = {
+ sql("DROP TABLE IF EXISTS carbon_load1")
+ sql("DROP TABLE IF EXISTS train")
+ sql("DROP TABLE IF EXISTS test")
+ }
+
+ override def beforeAll {
+ dropTable
+ buildTestData
+ }
+
+ test("test sdk with transactional table, read as arrow") {
+
+ sql(
+ """
+ | CREATE TABLE carbon_load1(
+ | c1 string, c2 string, c3 string, c4 string, c5 string,
+ | c6 string, c7 int, c8 int, c9 int, c10 int)
+ | STORED AS carbondata
+ """.stripMargin)
+
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1")
+
+ val table = CarbonEnv.getCarbonTable(None,
"carbon_load1")(sqlContext.sparkSession)
+
+ val reader:ArrowCarbonReader[Array[Object]] =
+ CarbonReader.builder(table.getTablePath,
table.getTableName).buildArrowReader()
+
+ var count = 0
+ while(reader.hasNext) {
+ reader.readNextRow()
+ count += 1
+ }
+ reader.close()
+ checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(count)))
+ sql("DROP TABLE carbon_load1")
+ }
+
+ test("test sdk with transactional table, read as row") {
+
+ sql(
+ """
+ | CREATE TABLE carbon_load1(
+ | c1 string, c2 string, c3 string, c4 string, c5 string,
+ | c6 string, c7 int, c8 int, c9 int, c10 int)
+ | STORED AS carbondata
+ """.stripMargin)
+
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1")
+
+ val table = CarbonEnv.getCarbonTable(None,
"carbon_load1")(sqlContext.sparkSession)
+ val reader = CarbonReader.builder(table.getTablePath,
table.getTableName).build()
+
+ var count = 0
+ while ( { reader.hasNext }) {
+ var row = reader.readNextRow.asInstanceOf[Array[AnyRef]]
+ count += 1
+ }
+ reader.close()
+
+ checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(count)))
+ sql("DROP TABLE carbon_load1")
+ }
+
+ override def afterAll {
+ new File(filePath).delete()
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+ CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
+ }
+
+}
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 052db05..919437c 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -79,6 +79,8 @@ public class CarbonLoadModelBuilder {
columns[i] = csvHeader.get(i).getColName();
}
optionsFinal.put("fileheader", Strings.mkString(columns, ","));
+ } else {
+ optionsFinal.put("fileheader", options.get("fileheader"));
}
optionsFinal.put("bad_record_path",
CarbonBadRecordUtil.getBadRecordsPath(options, table));
optionsFinal.put("sort_scope",
diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml
index 6f04a58..c1e5120 100644
--- a/store/sdk/pom.xml
+++ b/store/sdk/pom.xml
@@ -169,13 +169,6 @@
<build>
<plugins>
<plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.7</source>
- <target>1.7</target>
- </configuration>
- </plugin>
- <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
@@ -210,6 +203,14 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>8</source>
+ <target>8</target>
+ </configuration>
+ </plugin>
</plugins>
</build>
<profiles>
diff --git
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index 9a686bd..78f88be 100644
---
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -39,6 +39,8 @@ import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
import org.apache.carbondata.hadoop.util.CarbonVectorizedRecordReader;
import org.apache.commons.lang3.StringUtils;
@@ -65,6 +67,7 @@ public class CarbonReaderBuilder {
private InputSplit inputSplit;
private boolean useArrowReader;
private List fileLists;
+ private Class<? extends CarbonReadSupport> readSupportClass;
/**
* Construct a CarbonReaderBuilder with table path and table name
@@ -132,6 +135,16 @@ public class CarbonReaderBuilder {
}
/**
+ * set read support class
+ * @param readSupportClass read support class
+ * @return CarbonReaderBuilder object
+ */
+ public CarbonReaderBuilder withReadSupport(Class<? extends
CarbonReadSupport> readSupportClass) {
+ this.readSupportClass = readSupportClass;
+ return this;
+ }
+
+ /**
* Configure the projection column names of carbon reader
*
* @param projectionColumnNames projection column names
@@ -240,11 +253,11 @@ public class CarbonReaderBuilder {
((CarbonInputSplit)
inputSplit).getSegment().getReadCommittedScope().getFilePath();
tableName = "UnknownTable" + UUID.randomUUID();
}
- CarbonTable table;
- // now always infer schema. TODO:Refactor in next version.
if (null == this.fileLists && null == tablePath) {
throw new IllegalArgumentException("Please set table path first.");
}
+ // infer schema
+ CarbonTable table;
if (null != this.fileLists) {
if (fileLists.size() < 1) {
throw new IllegalArgumentException("fileLists must have one file in
list as least!");
@@ -341,6 +354,7 @@ public class CarbonReaderBuilder {
if (hadoopConf == null) {
hadoopConf = FileFactory.getConfiguration();
}
+ CarbonTableInputFormat.setCarbonReadSupport(hadoopConf, readSupportClass);
final Job job = new Job(new JobConf(hadoopConf));
CarbonFileInputFormat format = prepareFileInputFormat(job, false, true);
try {
@@ -371,6 +385,7 @@ public class CarbonReaderBuilder {
if (hadoopConf == null) {
hadoopConf = FileFactory.getConfiguration();
}
+ CarbonTableInputFormat.setCarbonReadSupport(hadoopConf, readSupportClass);
final Job job = new Job(new JobConf(hadoopConf));
CarbonFileInputFormat format = prepareFileInputFormat(job, false, true);
format.setAllColumnProjectionIfNotConfigured(job,
diff --git
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
index 88cbc33..b68be3f 100644
---
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
+++
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
@@ -30,11 +30,13 @@ import
org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.converter.SchemaConverter;
import
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.table.TableSchema;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.reader.CarbonFooterReaderV3;
import org.apache.carbondata.core.reader.CarbonHeaderReader;
import org.apache.carbondata.core.reader.CarbonIndexFileReader;
import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.FileFooter3;
import
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
import org.apache.carbondata.sdk.file.arrow.ArrowConverter;
@@ -159,6 +161,11 @@ public class CarbonSchemaReader {
*/
public static Schema readSchema(String path, boolean validateSchema,
Configuration conf)
throws IOException {
+ // Check whether it is transational table reads the schema
+ String schemaFilePath = CarbonTablePath.getSchemaFilePath(path);
+ if (FileFactory.getCarbonFile(schemaFilePath, conf).exists()) {
+ return readSchemaInSchemaFile(schemaFilePath, conf);
+ }
if (path.endsWith(INDEX_FILE_EXT)) {
return readSchemaFromIndexFile(path, conf);
} else if (path.endsWith(CARBON_DATA_EXT)) {
@@ -191,6 +198,17 @@ public class CarbonSchemaReader {
}
}
+ private static Schema readSchemaInSchemaFile(String schemaFilePath,
Configuration conf)
+ throws IOException {
+ org.apache.carbondata.format.TableInfo tableInfo =
+ CarbonUtil.readSchemaFile(schemaFilePath, conf);
+ SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+ TableSchema factTable =
+ schemaConverter.fromExternalToWrapperTableInfo(tableInfo, "", "",
"").getFactTable();
+ List<ColumnSchema> schemaList = factTable.getListOfColumns();
+ return new Schema(schemaList, factTable.getTableProperties());
+ }
+
/**
* read schema from path,
* path can be folder path, carbonindex file path, and carbondata file path
diff --git
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 29ef81f..1ae3d28 100644
---
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -28,6 +28,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
@@ -39,9 +40,11 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.datatype.MapType;
import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.metadata.schema.SchemaReader;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableSchema;
import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
@@ -83,6 +86,9 @@ public class CarbonWriterBuilder {
private WRITER_TYPE writerType;
+ // can be set by withSchemaFile
+ private CarbonTable carbonTable;
+
/**
* Sets the output path of the writer builder
*
@@ -172,6 +178,7 @@ public class CarbonWriterBuilder {
* g. complex_delimiter_level_2 -- value to Split the nested
complexTypeData
* h. quotechar
* i. escapechar
+ * j. fileheader
* <p>
* Default values are as follows.
* <p>
@@ -184,6 +191,7 @@ public class CarbonWriterBuilder {
* g. complex_delimiter_level_2 -- "\002"
* h. quotechar -- "\""
* i. escapechar -- "\\"
+ * j. fileheader -- None
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder withLoadOptions(Map<String, String> options) {
@@ -200,7 +208,8 @@ public class CarbonWriterBuilder {
!option.equalsIgnoreCase("complex_delimiter_level_3") &&
!option.equalsIgnoreCase("quotechar") &&
!option.equalsIgnoreCase("escapechar") &&
- !option.equalsIgnoreCase("binary_decoder")) {
+ !option.equalsIgnoreCase("binary_decoder") &&
+ !option.equalsIgnoreCase("fileheader")) {
throw new IllegalArgumentException("Unsupported option:" + option
+ ". Refer method header or documentation");
}
@@ -480,6 +489,9 @@ public class CarbonWriterBuilder {
*/
public CarbonWriterBuilder withCsvInput(Schema schema) {
Objects.requireNonNull(schema, "schema should not be null");
+ if (this.schema != null) {
+ throw new IllegalArgumentException("schema should be set only once");
+ }
this.schema = schema;
this.writerType = WRITER_TYPE.CSV;
return this;
@@ -488,11 +500,24 @@ public class CarbonWriterBuilder {
/**
* to build a {@link CarbonWriter}, which accepts row in CSV format
*
+ * @return CarbonWriterBuilder
+ */
+ public CarbonWriterBuilder withCsvInput() {
+ this.writerType = WRITER_TYPE.CSV;
+ return this;
+ }
+
+ /**
+ * to build a {@link CarbonWriter}, which accepts row in CSV format
+ *
* @param jsonSchema json Schema string
* @return CarbonWriterBuilder
*/
public CarbonWriterBuilder withCsvInput(String jsonSchema) {
Objects.requireNonNull(jsonSchema, "schema should not be null");
+ if (this.schema != null) {
+ throw new IllegalArgumentException("schema should be set only once");
+ }
this.schema = Schema.parseJson(jsonSchema);
this.writerType = WRITER_TYPE.CSV;
return this;
@@ -506,6 +531,9 @@ public class CarbonWriterBuilder {
*/
public CarbonWriterBuilder withAvroInput(org.apache.avro.Schema avroSchema) {
Objects.requireNonNull(avroSchema, "Avro schema should not be null");
+ if (this.schema != null) {
+ throw new IllegalArgumentException("schema should be set only once");
+ }
this.schema = AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema);
this.writerType = WRITER_TYPE.AVRO;
return this;
@@ -519,12 +547,41 @@ public class CarbonWriterBuilder {
*/
public CarbonWriterBuilder withJsonInput(Schema carbonSchema) {
Objects.requireNonNull(carbonSchema, "schema should not be null");
+ if (this.schema != null) {
+ throw new IllegalArgumentException("schema should be set only once");
+ }
this.schema = carbonSchema;
this.writerType = WRITER_TYPE.JSON;
return this;
}
/**
+ * to build a {@link CarbonWriter}, which accepts row in Json format
+ *
+ * @return CarbonWriterBuilder
+ */
+ public CarbonWriterBuilder withJsonInput() {
+ this.writerType = WRITER_TYPE.JSON;
+ return this;
+ }
+
+ public CarbonWriterBuilder withSchemaFile(String schemaFilePath) throws
IOException {
+ Objects.requireNonNull(schemaFilePath, "schema file path should not be
null");
+ if (path == null) {
+ throw new IllegalArgumentException("output path should be set before
setting schema file");
+ }
+ carbonTable = SchemaReader.readCarbonTableFromSchema(schemaFilePath, new
Configuration());
+ carbonTable.getTableInfo().setTablePath(path);
+ carbonTable.setTransactionalTable(false);
+ List<ColumnSchema> columnSchemas =
+
carbonTable.getCreateOrderColumn(carbonTable.getTableName()).stream().map(
+ CarbonColumn::getColumnSchema
+ ).collect(Collectors.toList());
+ schema = new Schema(columnSchemas);
+ return this;
+ }
+
+ /**
* Build a {@link CarbonWriter}
* This writer is not thread safe,
* use withThreadSafe() configuration in multi thread environment
@@ -536,7 +593,7 @@ public class CarbonWriterBuilder {
public CarbonWriter build() throws IOException, InvalidLoadOptionException {
Objects.requireNonNull(path, "path should not be null");
if (this.writerType == null) {
- throw new IOException(
+ throw new RuntimeException(
"'writerType' must be set, use withCsvInput() or withAvroInput() or
withJsonInput() "
+ "API based on input");
}
@@ -545,6 +602,9 @@ public class CarbonWriterBuilder {
"'writtenBy' must be set when writing carbon files, use writtenBy()
API to "
+ "set it, it can be the name of the application which is using
the SDK");
}
+ if (this.schema == null) {
+ throw new RuntimeException("schema should be set");
+ }
CarbonLoadModel loadModel = buildLoadModel(schema);
loadModel.setSdkWriterCores(numOfThreads);
CarbonProperties.getInstance()
@@ -613,10 +673,12 @@ public class CarbonWriterBuilder {
}
}
}
- // build CarbonTable using schema
- CarbonTable table = buildCarbonTable();
+ if (carbonTable == null) {
+ // if carbonTable is not set by user, build it using schema
+ carbonTable = buildCarbonTable();
+ }
// build LoadModel
- return buildLoadModel(table, timestamp, taskNo, options);
+ return buildLoadModel(carbonTable, timestamp, taskNo, options);
}
private void validateLongStringColumns(Schema carbonSchema, Set<String>
longStringColumns) {
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java
index fb23725..58a43d0 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java
@@ -20,7 +20,9 @@ package org.apache.carbondata.sdk.file;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
@@ -41,12 +43,23 @@ public class Schema {
private Field[] fields;
+ private Map<String, String> properties;
+
/**
* construct a schema with fields
* @param fields
*/
public Schema(Field[] fields) {
+ this(fields, new HashMap<String, String>());
+ }
+
+ /**
+ * construct a schema with fields
+ * @param fields
+ */
+ public Schema(Field[] fields, Map<String, String> properties) {
this.fields = fields;
+ this.properties = properties;
}
/**
@@ -55,10 +68,20 @@ public class Schema {
* @param columnSchemaList column schema list
*/
public Schema(List<ColumnSchema> columnSchemaList) {
+ this(columnSchemaList, new HashMap<String, String>());
+ }
+
+ /**
+ * construct a schema with List<ColumnSchema>
+ *
+ * @param columnSchemaList column schema list
+ */
+ public Schema(List<ColumnSchema> columnSchemaList, Map<String, String>
properties) {
fields = new Field[columnSchemaList.size()];
for (int i = 0; i < columnSchemaList.size(); i++) {
fields[i] = new Field(columnSchemaList.get(i));
}
+ this.properties = properties;
}
/**
@@ -152,6 +175,10 @@ public class Schema {
return this;
}
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
@Override
public int hashCode() {
return super.hashCode();