This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new a659cfb [CARBONDATA-3855] Support carbon SDK to load data from
different files
a659cfb is described below
commit a659cfb89dbe897938e8c231f873cbdb9bb5bd7f
Author: Nihal ojha <[email protected]>
AuthorDate: Fri Jun 26 14:36:25 2020 +0530
[CARBONDATA-3855] Support carbon SDK to load data from different files
Why is this PR needed?
Currently, carbon SDK doesn't support loading the data from Parquet,
ORC, CSV, AVRO, or JSON file. Loading of these types of files is supported by
the spark-SQL library. This PR will support carbon SDK to load data from
Parquet, Avro, CSV, and ORC file.
What changes were proposed in this PR?
With the help of this PR now carbon can load data from different types
of files as mentioned above. Loading of these types of files is possible in
different ways like carbon can load a single file at a given path or it can
load all the files under the given directory or load only selected files under
the given directory. In case of loading the multiple files, it is must that
schema of all the files should same and all files are of the same type either
Parquet, ORC, Avro, CSV, or JSON.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #3819
---
.../core/constants/CarbonCommonConstants.java | 32 ++
docs/sdk-guide.md | 141 +++++++++
examples/spark/pom.xml | 6 +
integration/spark/pom.xml | 36 +++
.../converter/impl/BinaryFieldConverterImpl.java | 2 +-
sdk/sdk/pom.xml | 22 ++
.../carbondata/sdk/file/AvroCarbonWriter.java | 94 ++++++
.../carbondata/sdk/file/CSVCarbonWriter.java | 85 +++++
.../apache/carbondata/sdk/file/CarbonWriter.java | 11 +
.../carbondata/sdk/file/CarbonWriterBuilder.java | 255 ++++++++++++++-
.../carbondata/sdk/file/JsonCarbonWriter.java | 87 ++++++
.../carbondata/sdk/file/ORCCarbonWriter.java | 347 +++++++++++++++++++++
.../carbondata/sdk/file/ParquetCarbonWriter.java | 131 ++++++++
.../apache/carbondata/sdk/file/utils/SDKUtil.java | 14 +
.../carbondata/sdk/file/AvroCarbonWriterTest.java | 126 ++++++++
.../carbondata/sdk/file/CSVCarbonWriterTest.java | 172 ++++++++++
.../carbondata/sdk/file/JSONCarbonWriterTest.java | 237 ++++++++++++++
.../carbondata/sdk/file/ORCCarbonWriterTest.java | 149 +++++++++
.../sdk/file/ParquetCarbonWriterTest.java | 144 +++++++++
sdk/sdk/src/test/resources/file/NestedMap.parquet | Bin 0 -> 622 bytes
.../src/test/resources/file/avro_files/users.avro | Bin 0 -> 334 bytes
.../test/resources/file/avro_files/users_2.avro | Bin 0 -> 334 bytes
.../test/resources/file/avro_files/users_3.avro | Bin 0 -> 334 bytes
.../resources/file/csv_files/primitive_data.csv | 11 +
.../resources/file/csv_files/primitive_data_2.csv | 11 +
.../resources/file/csv_files/primitive_data_3.csv | 11 +
.../file/json_files/allPrimitiveType.json | 11 +
.../json_files/allPrimitiveTypeMultipleRows.json | 46 +++
.../json_files/allPrimitiveTypeSingleArray.json | 13 +
sdk/sdk/src/test/resources/file/nested_schema.avro | 91 ++++++
.../src/test/resources/file/orc_files/sample.orc | Bin 0 -> 1711 bytes
.../src/test/resources/file/orc_files/sample_2.orc | Bin 0 -> 1711 bytes
.../src/test/resources/file/orc_files/sample_3.orc | Bin 0 -> 1711 bytes
.../resources/file/parquet_files/file1.parquet | Bin 0 -> 520 bytes
.../resources/file/parquet_files/file2.parquet | Bin 0 -> 520 bytes
.../resources/file/parquet_files/file3.parquet | Bin 0 -> 520 bytes
.../test/resources/file/repeated-schema.parquet | Bin 0 -> 411 bytes
sdk/sdk/src/test/resources/file/testTimestamp.orc | Bin 0 -> 289 bytes
sdk/sdk/src/test/resources/file/userdata1.avro | Bin 0 -> 93561 bytes
sdk/sdk/src/test/resources/file/userdata1.parquet | Bin 0 -> 113629 bytes
sdk/sdk/src/test/resources/file/userdata1_orc | Bin 0 -> 47448 bytes
sdk/sdk/src/test/resources/file/weather.avro | Bin 0 -> 358 bytes
42 files changed, 2277 insertions(+), 8 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index cde3efa..ca8e171 100644
---
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1789,6 +1789,21 @@ public final class CarbonCommonConstants {
public static final String CSV_FILE_EXTENSION = ".csv";
/**
+ * AVRO_FILE_EXTENSION
+ */
+ public static final String AVRO_FILE_EXTENSION = ".avro";
+
+ /**
+ * JSON_FILE_EXTENSION
+ */
+ public static final String JSON_FILE_EXTENSION = ".json";
+
+ /**
+ * ORC_FILE_EXTENSION
+ */
+ public static final String ORC_FILE_EXTENSION = ".orc";
+
+ /**
* LOG_FILE_EXTENSION
*/
public static final String LOG_FILE_EXTENSION = ".log";
@@ -2525,4 +2540,21 @@ public final class CarbonCommonConstants {
* property which defines the presto query default value
*/
public static final String IS_QUERY_FROM_PRESTO_DEFAULT = "false";
+
+ /**
+ * the level 1 complex delimiter default value
+ */
+ public static final String COMPLEX_DELIMITER_LEVEL_1_DEFAULT = "#";
+
+ /**
+ * the level 2 complex delimiter default value
+ */
+ public static final String COMPLEX_DELIMITER_LEVEL_2_DEFAULT = "$";
+
+ /**
+ * the level 3 complex delimiter default value
+ */
+ public static final String COMPLEX_DELIMITER_LEVEL_3_DEFAULT = "@";
+
+ public static final String FILE_HEADER = "fileHeader";
}
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index 73aed3b..4da7893 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -454,6 +454,131 @@ public CarbonWriterBuilder withJsonInput(Schema
carbonSchema);
```
/**
+ * To build a {@link CarbonWriter}, which accepts CSV file or directory
+ * that has to be loaded.
+ *
+ * @param filePath absolute path under which files should be loaded.
+ * @return CarbonWriterBuilder
+ */
+public CarbonWriterBuilder withCsvPath(String filePath);
+```
+
+```
+/**
+ * To build a {@link CarbonWriter}, which accepts directory of CSV files and
+ * list of selected file that has to be loaded.
+ *
+ * @param filePath absolute path under which files should be loaded.
+ * @param fileList list of selected files which has to be loaded under given
directory.
+ * @return CarbonWriterBuilder
+ */
+public CarbonWriterBuilder withCsvPath(String filePath, List<String> fileList)
throws IOException;
+```
+
+Find example code at
[CsvCarbonWriterTest](https://github.com/apache/carbondata/blob/master/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java)
in the CarbonData repo.
+
+```
+/**
+ * To build a {@link CarbonWriter}, which accepts AVRO file or directory
+ * that has to be loaded.
+ *
+ * @param filePath absolute path under which files should be loaded.
+ * @return CarbonWriterBuilder
+ */
+public CarbonWriterBuilder withAvroPath(String filePath) throws IOException;
+```
+
+```
+/**
+ * To build a {@link CarbonWriter}, which accepts directory of AVRO files and
+ * list of selected file that has to be loaded.
+ *
+ * @param filePath absolute path under which files should be loaded.
+ * @param fileList list of selected files which has to be loaded under given
directory.
+ * @return CarbonWriterBuilder
+ */
+public CarbonWriterBuilder withAvroPath(String filePath, List<String>
fileList) throws IOException;
+```
+
+Find example code at
[AvroCarbonWriterTest](https://github.com/apache/carbondata/blob/master/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java)
in the CarbonData repo.
+
+```
+/**
+ * To build a {@link CarbonWriter}, which accepts JSON file or directory
+ * that has to be loaded.
+ *
+ * @param filePath absolute path under which files should be loaded.
+ * @return CarbonWriterBuilder
+ */
+public CarbonWriterBuilder withJsonPath(String filePath) throws IOException;
+```
+
+```
+/**
+ * To build a {@link CarbonWriter}, which accepts directory of JSON files and
+ * list of selected file that has to be loaded.
+ *
+ * @param filePath absolute path under which files should be loaded.
+ * @param fileList list of selected files which has to be loaded under given
directory.
+ * @return CarbonWriterBuilder
+ */
+public CarbonWriterBuilder withJsonPath(String filePath, List<String>
fileList) throws IOException;
+```
+
+Find example code at
[JSONCarbonWriterTest](https://github.com/apache/carbondata/blob/master/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/JSONCarbonWriterTest.java)
in the CarbonData repo.
+
+```
+/**
+ * To build a {@link CarbonWriter}, which accepts Parquet file or directory
+ * that has to be loaded.
+ *
+ * @param filePath absolute path under which files should be loaded.
+ * @return CarbonWriterBuilder
+ */
+public CarbonWriterBuilder withParquetPath(String filePath) throws IOException;
+```
+
+```
+/**
+ * To build a {@link CarbonWriter}, which accepts directory of Parquet files
and
+ * list of selected files that has to be loaded.
+ *
+ * @param filePath absolute path under which files should be loaded.
+ * @param fileList list of selected files which has to be loaded under given
directory.
+ * @return CarbonWriterBuilder
+ */
+public CarbonWriterBuilder withParquetPath(String filePath, List<String>
fileList) throws IOException;
+```
+
+Find example code at
[ParquetCarbonWriterTest](https://github.com/apache/carbondata/blob/master/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/ParquetCarbonWriterTest.java)
in the CarbonData repo.
+
+```
+/**
+ * To build a {@link CarbonWriter}, which accepts ORC file or directory
+ * that has to be loaded.
+ *
+ * @param filePath absolute path under which files should be loaded.
+ * @return CarbonWriterBuilder
+ */
+public CarbonWriterBuilder withOrcPath(String filePath) throws IOException;
+```
+
+```
+/**
+ * To build a {@link CarbonWriter}, which accepts directory of Parquet files
and
+ * list of selected files that has to be loaded.
+ *
+ * @param filePath absolute path under which files should be loaded.
+ * @param fileList list of selected files which has to be loaded under given
directory.
+ * @return CarbonWriterBuilder
+ */
+public CarbonWriterBuilder withOrcPath(String filePath, List<String> fileList)
throws IOException;
+```
+
+Find example code at
[ORCCarbonWriterTest](https://github.com/apache/carbondata/blob/master/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/ORCCarbonWriterTest.java)
in the CarbonData repo.
+
+```
+/**
* To support writing the ApplicationName which is writing the carbondata file
* This is a mandatory API to call, else the build() call will fail with error.
* @param application name which is writing the carbondata files
@@ -522,6 +647,22 @@ public abstract void write(Object object) throws
IOException;
```
/**
+ * Write the data of given files iteratively, format of files depends on the
implementation.
+ *
+ * @throws IOException
+ */
+public abstract void write() throws IOException;
+```
+
+```
+/**
+ * Set the list of carbon files which has to be loaded.
+ */
+public abstract void setDataFiles(CarbonFile[] dataFiles) throws IOException;
+```
+
+```
+/**
* Flush and close the writer
*/
public abstract void close() throws IOException;
diff --git a/examples/spark/pom.xml b/examples/spark/pom.xml
index 3569de8..7540151 100644
--- a/examples/spark/pom.xml
+++ b/examples/spark/pom.xml
@@ -38,6 +38,12 @@
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-spark_${spark.binary.version}</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
diff --git a/integration/spark/pom.xml b/integration/spark/pom.xml
index d5f5772..8d6d384 100644
--- a/integration/spark/pom.xml
+++ b/integration/spark/pom.xml
@@ -106,31 +106,67 @@
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-cli</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-lucene</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-bloom</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-geo</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-streaming</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-mv-plan_${spark.binary.version}</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- spark -->
<dependency>
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/BinaryFieldConverterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/BinaryFieldConverterImpl.java
index 28e8afb..9656264 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/BinaryFieldConverterImpl.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/BinaryFieldConverterImpl.java
@@ -65,7 +65,7 @@ public class BinaryFieldConverterImpl implements
FieldConverter {
throws RuntimeException {
if (value instanceof String) {
return binaryDecoder.decode((String) value);
- } else if (value instanceof byte[]) {
+ } else if (value instanceof byte[] || value == null) {
return value;
} else {
throw new CarbonDataLoadingException("Binary only support String and
byte[] data type," +
diff --git a/sdk/sdk/pom.xml b/sdk/sdk/pom.xml
index fcd9062..6e299d7 100644
--- a/sdk/sdk/pom.xml
+++ b/sdk/sdk/pom.xml
@@ -49,6 +49,28 @@
<version>${httpclient.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-avro</artifactId>
+ <version>1.11.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>3.1.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ <version>1.1.1</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-format</artifactId>
<version>0.12.0</version>
diff --git
a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index 93271d2..c3b3c1d 100644
--- a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.sdk.file;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
@@ -36,6 +37,8 @@ import java.util.UUID;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
import
org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -53,6 +56,7 @@ import
org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
@@ -76,12 +80,14 @@ import org.apache.log4j.Logger;
*/
@InterfaceAudience.Internal public class AvroCarbonWriter extends CarbonWriter
{
+ private Configuration configuration;
private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter;
private TaskAttemptContext context;
private ObjectArrayWritable writable;
private Schema avroSchema;
private static final Logger LOGGER =
LogServiceFactory.getLogService(AvroCarbonWriter.class.getName());
+ private CarbonFile[] dataFiles;
AvroCarbonWriter(CarbonLoadModel loadModel, Configuration hadoopConf) throws
IOException {
CarbonTableOutputFormat.setLoadModel(hadoopConf, loadModel);
@@ -94,6 +100,7 @@ import org.apache.log4j.Logger;
this.recordWriter = format.getRecordWriter(context);
this.context = context;
this.writable = new ObjectArrayWritable();
+ this.configuration = hadoopConf;
}
AvroCarbonWriter(CarbonLoadModel loadModel, Configuration hadoopConf, Schema
avroSchema)
@@ -102,6 +109,63 @@ import org.apache.log4j.Logger;
this.avroSchema = avroSchema;
}
+ @Override
+ public void setDataFiles(CarbonFile[] dataFiles) throws IOException {
+ if (dataFiles == null || dataFiles.length == 0) {
+ throw new RuntimeException("data files can't be empty.");
+ }
+ org.apache.avro.Schema avroSchema = null;
+ DataFileStream<GenericData.Record> avroReader = null;
+ for (CarbonFile dataFile : dataFiles) {
+ try {
+ avroReader = buildAvroReader(dataFile, this.configuration);
+ if (avroSchema == null) {
+ avroSchema = avroReader.getSchema();
+ } else {
+ if (!avroSchema.equals(avroReader.getSchema())) {
+ throw new RuntimeException("All the Avro files must be having the
same schema.");
+ }
+ }
+ } finally {
+ if (avroReader != null) {
+ avroReader.close();
+ }
+ }
+ }
+ this.dataFiles = dataFiles;
+ }
+
+ /**
+ * TO extract the avro schema from the given avro file
+ */
+ public static org.apache.avro.Schema extractAvroSchema(CarbonFile dataFile,
Configuration conf)
+ throws IOException {
+ DataFileStream<GenericData.Record> avroReader = null;
+ try {
+ avroReader = buildAvroReader(dataFile, conf);
+ return avroReader.getSchema();
+ } finally {
+ if (avroReader != null) {
+ avroReader.close();
+ }
+ }
+ }
+
+ private static DataFileStream<GenericData.Record> buildAvroReader(CarbonFile
dataFile,
+ Configuration configuration) throws IOException {
+ try {
+ GenericDatumReader<GenericData.Record> genericDatumReader =
+ new GenericDatumReader<>();
+ DataFileStream<GenericData.Record> avroReader =
+ new
DataFileStream<>(FileFactory.getDataInputStream(dataFile.getPath(),
+ -1, configuration), genericDatumReader);
+ return avroReader;
+ } catch (FileNotFoundException ex) {
+ throw new FileNotFoundException("File " + dataFile.getPath()
+ + " not found to build carbon writer.");
+ }
+ }
+
private Object[] avroToCsv(GenericData.Record avroRecord) {
if (avroSchema == null) {
avroSchema = avroRecord.getSchema();
@@ -824,6 +888,36 @@ import org.apache.log4j.Logger;
}
/**
+ * Load data of all avro files at given location iteratively.
+ *
+ * @throws IOException
+ */
+ @Override
+ public void write() throws IOException {
+ if (this.dataFiles == null || this.dataFiles.length == 0) {
+ throw new RuntimeException("'withAvroPath()' must be called to support
loading avro files");
+ }
+ for (CarbonFile dataFile : this.dataFiles) {
+ this.loadSingleFile(dataFile);
+ }
+ }
+
+ private void loadSingleFile(CarbonFile file) throws IOException {
+ DataFileStream<GenericData.Record> avroReader = null;
+ try {
+ avroReader = buildAvroReader(file, this.configuration);
+ while (avroReader.hasNext()) {
+ GenericData.Record record = avroReader.next();
+ this.write(record);
+ }
+ } finally {
+ if (avroReader != null) {
+ avroReader.close();
+ }
+ }
+ }
+
+ /**
* Flush and close the writer
*/
@Override
diff --git
a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
index aa5c671..4851e10 100644
--- a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
+++ b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
@@ -17,15 +17,22 @@
package org.apache.carbondata.sdk.file;
+import java.io.DataInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Random;
import java.util.UUID;
import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import com.univocity.parsers.csv.CsvParser;
+import com.univocity.parsers.csv.CsvParserSettings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobID;
@@ -42,9 +49,13 @@ import
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
@InterfaceAudience.Internal
class CSVCarbonWriter extends CarbonWriter {
+ private Configuration configuration;
private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter;
private TaskAttemptContext context;
private ObjectArrayWritable writable;
+ private CsvParser csvParser = null;
+ private boolean skipHeader = false;
+ private CarbonFile[] dataFiles;
CSVCarbonWriter(CarbonLoadModel loadModel, Configuration hadoopConf) throws
IOException {
CarbonTableOutputFormat.setLoadModel(hadoopConf, loadModel);
@@ -57,6 +68,11 @@ class CSVCarbonWriter extends CarbonWriter {
this.recordWriter = format.getRecordWriter(context);
this.context = context;
this.writable = new ObjectArrayWritable();
+ this.configuration = hadoopConf;
+ }
+
+ public void setSkipHeader(boolean skipHeader) {
+ this.skipHeader = skipHeader;
}
/**
@@ -72,6 +88,75 @@ class CSVCarbonWriter extends CarbonWriter {
}
}
+ private CsvParser buildCsvParser(Configuration conf) {
+ CsvParserSettings settings = CSVInputFormat.extractCsvParserSettings(conf);
+ return new CsvParser(settings);
+ }
+
+ @Override
+ public void setDataFiles(CarbonFile[] dataFiles) throws IOException {
+ if (dataFiles == null || dataFiles.length == 0) {
+ throw new RuntimeException("data files can't be empty.");
+ }
+ DataInputStream csvInputStream = null;
+ CsvParser csvParser = this.buildCsvParser(this.configuration);
+ for (CarbonFile dataFile : dataFiles) {
+ try {
+ csvInputStream = FileFactory.getDataInputStream(dataFile.getPath(),
+ -1, this.configuration);
+ csvParser.beginParsing(csvInputStream);
+ } catch (IllegalArgumentException ex) {
+ if (ex.getCause() instanceof FileNotFoundException) {
+ throw new FileNotFoundException("File " + dataFile +
+ " not found to build carbon writer.");
+ }
+ throw ex;
+ } finally {
+ if (csvInputStream != null) {
+ csvInputStream.close();
+ }
+ }
+ }
+ this.dataFiles = dataFiles;
+ }
+
+ /**
+ * Load data of all or selected csv files at given location iteratively.
+ *
+ * @throws IOException
+ */
+ @Override
+ public void write() throws IOException {
+ if (this.dataFiles == null || this.dataFiles.length == 0) {
+ throw new RuntimeException("'withCsvPath()' must be called to support
load files");
+ }
+ this.csvParser = this.buildCsvParser(this.configuration);
+ for (CarbonFile dataFile : this.dataFiles) {
+ this.loadSingleFile(dataFile);
+ }
+ }
+
+ private void loadSingleFile(CarbonFile file) throws IOException {
+ DataInputStream csvDataInputStream = FileFactory
+ .getDataInputStream(file.getPath(), -1, this.configuration);
+ this.csvParser.beginParsing(csvDataInputStream);
+ String[] row;
+ boolean skipFirstRow = this.skipHeader;
+ try {
+ while ((row = this.csvParser.parseNext()) != null) {
+ if (skipFirstRow) {
+ skipFirstRow = false;
+ continue;
+ }
+ this.write(row);
+ }
+ } finally {
+ if (csvDataInputStream != null) {
+ csvDataInputStream.close();
+ }
+ }
+ }
+
/**
* Flush and close the writer
*/
diff --git
a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriter.java
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriter.java
index 60ad060..d83cfbb 100644
--- a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriter.java
+++ b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriter.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
/**
* Writer to write row data to carbondata file. Call {@link #builder()} to get
@@ -49,4 +50,14 @@ public abstract class CarbonWriter {
return new CarbonWriterBuilder();
}
+ /**
+ * Write the data of files iteratively, format of files depends on the
implementation.
+ */
+ public abstract void write() throws IOException;
+
+ /**
+ * Set the list of carbon files.
+ */
+ public abstract void setDataFiles(CarbonFile[] dataFiles) throws IOException;
+
}
diff --git
a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 87bd705..4d5ed07 100644
---
a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -35,7 +35,9 @@ import
org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.common.constants.LoggerAction;
import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -53,8 +55,12 @@ import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder;
import org.apache.carbondata.processing.util.CarbonLoaderUtil;
+import org.apache.carbondata.sdk.file.utils.SDKUtil;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
/**
* Builder for {@link CarbonWriter}
@@ -63,7 +69,7 @@ import org.apache.hadoop.conf.Configuration;
@InterfaceStability.Unstable
public class CarbonWriterBuilder {
private Schema schema;
- private org.apache.avro.Schema avroSchema;
+ private org.apache.avro.Schema avroSchema;
private String path;
//initialize with empty array , as no columns should be selected for sorting
in NO_SORT
private String[] sortColumns = new String[0];
@@ -78,13 +84,20 @@ public class CarbonWriterBuilder {
private String taskNo;
private int localDictionaryThreshold;
private boolean isLocalDictionaryEnabled = Boolean.parseBoolean(
- CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT);
+ CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT);
private short numOfThreads;
private Configuration hadoopConf;
private String writtenByApp;
private String[] invertedIndexColumns;
+ private String filePath;
+ private boolean isDirectory = false;
+ private List<String> fileList;
+ private static final Logger LOGGER =
+ LogServiceFactory.getLogService(CarbonWriterBuilder.class.getName());
+ private CarbonFile[] dataFiles;
+
private enum WRITER_TYPE {
- CSV, AVRO, JSON
+ CSV, AVRO, JSON, PARQUET, ORC
}
private WRITER_TYPE writerType;
@@ -497,7 +510,6 @@ public class CarbonWriterBuilder {
* To set the blocklet size of CarbonData file
*
* @param pageSizeInMb is page size in MB
- *
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder withPageSizeInMb(int pageSizeInMb) {
@@ -595,6 +607,219 @@ public class CarbonWriterBuilder {
}
/**
+ * to build a {@link CarbonWriter}, which accepts loading CSV files.
+ *
+ * @param filePath absolute path under which files should be loaded.
+ * @return CarbonWriterBuilder
+ */
+ public CarbonWriterBuilder withCsvPath(String filePath) throws IOException {
+ this.validateFilePath(filePath);
+ this.filePath = filePath;
+ this.setIsDirectory(filePath);
+ this.withCsvInput();
+ this.dataFiles =
this.extractDataFiles(CarbonCommonConstants.CSV_FILE_EXTENSION);
+ return this;
+ }
+
+ /**
+ * to build a {@link CarbonWriter}, which accepts CSV files directory and
+ * list of file which has to be loaded.
+ *
+ * @param filePath directory where the CSV file exists.
+ * @param fileList list of files which has to be loaded.
+ * @return CarbonWriterBuilder
+ */
+ public CarbonWriterBuilder withCsvPath(String filePath, List<String>
fileList)
+ throws IOException {
+ this.fileList = fileList;
+ this.withCsvPath(filePath);
+ return this;
+ }
+
+ /**
+ * to build a {@link CarbonWriter}, which accepts loading JSON files.
+ *
+ * @param filePath absolute path under which files should be loaded.
+ * @return CarbonWriterBuilder
+ */
+ public CarbonWriterBuilder withJsonPath(String filePath) throws IOException {
+ this.validateFilePath(filePath);
+ this.filePath = filePath;
+ this.setIsDirectory(filePath);
+ this.withJsonInput();
+ this.dataFiles =
this.extractDataFiles(CarbonCommonConstants.JSON_FILE_EXTENSION);
+ return this;
+ }
+
+ /**
+ * to build a {@link CarbonWriter}, which accepts JSON file directory and
+ * list of file which has to be loaded.
+ *
+ * @param filePath directory where the json file exists.
+ * @param fileList list of files which has to be loaded.
+ * @return CarbonWriterBuilder
+ * @throws IOException
+ */
+ public CarbonWriterBuilder withJsonPath(String filePath, List<String>
fileList)
+ throws IOException {
+ this.fileList = fileList;
+ this.withJsonPath(filePath);
+ return this;
+ }
+
+ private void validateFilePath(String filePath) {
+ if (StringUtils.isEmpty(filePath)) {
+ throw new IllegalArgumentException("filePath can not be empty");
+ }
+ }
+
+ /**
+ * to build a {@link CarbonWriter}, which accepts loading Parquet files.
+ *
+ * @param filePath absolute path under which files should be loaded.
+ * @return CarbonWriterBuilder
+ */
+ public CarbonWriterBuilder withParquetPath(String filePath) throws
IOException {
+ this.validateFilePath(filePath);
+ this.filePath = filePath;
+ this.setIsDirectory(filePath);
+ this.writerType = WRITER_TYPE.PARQUET;
+ CarbonFile[] dataFiles =
this.extractDataFiles(CarbonCommonConstants.PARQUET_FILE_EXT);
+ org.apache.avro.Schema parquetSchema = ParquetCarbonWriter
+ .extractParquetSchema(dataFiles[0], this.hadoopConf);
+ this.dataFiles = dataFiles;
+ this.avroSchema = parquetSchema;
+ this.schema =
AvroCarbonWriter.getCarbonSchemaFromAvroSchema(this.avroSchema);
+ return this;
+ }
+
+ private void setIsDirectory(String filePath) {
+ if (this.hadoopConf == null) {
+ this.hadoopConf = new Configuration(FileFactory.getConfiguration());
+ }
+ CarbonFile carbonFile = FileFactory.getCarbonFile(filePath, hadoopConf);
+ this.isDirectory = carbonFile.isDirectory();
+ }
+
+ /**
+ * to build a {@link CarbonWriter}, which accepts parquet files directory and
+ * list of file which has to be loaded.
+ *
+ * @param filePath directory where the parquet file exists.
+ * @param fileList list of files which has to be loaded.
+ * @return CarbonWriterBuilder
+ * @throws IOException
+ */
+ public CarbonWriterBuilder withParquetPath(String filePath, List<String>
fileList)
+ throws IOException {
+ this.fileList = fileList;
+ this.withParquetPath(filePath);
+ return this;
+ }
+
+ private CarbonFile[] extractDataFiles(String suf) {
+ List<CarbonFile> dataFiles;
+ if (this.isDirectory) {
+ if (CollectionUtils.isEmpty(this.fileList)) {
+ dataFiles = SDKUtil.extractFilesFromFolder(this.filePath, suf,
this.hadoopConf);
+ } else {
+ dataFiles = this.appendFileListWithPath();
+ }
+ } else {
+ dataFiles = new ArrayList<>();
+ dataFiles.add(FileFactory.getCarbonFile(this.filePath, this.hadoopConf));
+ }
+ if (CollectionUtils.isEmpty(dataFiles)) {
+ throw new RuntimeException("Data files can't be empty.");
+ }
+ return dataFiles.toArray(new CarbonFile[0]);
+ }
+
+ /**
+ * to build a {@link CarbonWriter}, which accepts loading ORC files.
+ *
+ * @param filePath absolute path under which files should be loaded.
+ * @return CarbonWriterBuilder
+ */
+ public CarbonWriterBuilder withOrcPath(String filePath) throws IOException {
+ this.validateFilePath(filePath);
+ this.filePath = filePath;
+ this.setIsDirectory(filePath);
+ this.writerType = WRITER_TYPE.ORC;
+ Map<String, String> options = new HashMap<>();
+ options.put("complex_delimiter_level_1",
+ CarbonCommonConstants.COMPLEX_DELIMITER_LEVEL_1_DEFAULT);
+ options.put("complex_delimiter_level_2",
+ CarbonCommonConstants.COMPLEX_DELIMITER_LEVEL_2_DEFAULT);
+ options.put("complex_delimiter_level_3",
+ CarbonCommonConstants.COMPLEX_DELIMITER_LEVEL_3_DEFAULT);
+ this.withLoadOptions(options);
+ CarbonFile[] dataFiles =
this.extractDataFiles(CarbonCommonConstants.ORC_FILE_EXTENSION);
+ this.dataFiles = dataFiles;
+ this.schema = ORCCarbonWriter.extractOrcFileSchema(dataFiles[0],
this.hadoopConf);
+ return this;
+ }
+
+ /**
+ * to build a {@link CarbonWriter}, which accepts orc files directory and
+ * list of file which has to be loaded.
+ *
+ * @param filePath directory where the orc file exists.
+ * @param fileList list of files which has to be loaded.
+ * @return CarbonWriterBuilder
+ * @throws IOException
+ */
+ public CarbonWriterBuilder withOrcPath(String filePath, List<String>
fileList)
+ throws IOException {
+ this.fileList = fileList;
+ this.withOrcPath(filePath);
+ return this;
+ }
+
+ private List<CarbonFile> appendFileListWithPath() {
+ List<CarbonFile> dataFiles = new ArrayList<>();
+ for (String file : this.fileList) {
+ dataFiles.add(FileFactory.getCarbonFile(this.filePath +
+ CarbonCommonConstants.FILE_SEPARATOR + file, this.hadoopConf));
+ }
+ return dataFiles;
+ }
+
+ /**
+ * to build a {@link CarbonWriter}, which accepts loading AVRO files.
+ *
+ * @param filePath absolute path under which files should be loaded.
+ * @return CarbonWriterBuilder
+ */
+ public CarbonWriterBuilder withAvroPath(String filePath) throws IOException {
+ this.validateFilePath(filePath);
+ this.filePath = filePath;
+ this.setIsDirectory(filePath);
+ this.writerType = WRITER_TYPE.AVRO;
+ CarbonFile[] dataFiles =
this.extractDataFiles(CarbonCommonConstants.AVRO_FILE_EXTENSION);
+ this.avroSchema = AvroCarbonWriter.extractAvroSchema(dataFiles[0],
this.hadoopConf);
+ this.dataFiles = dataFiles;
+ this.schema =
AvroCarbonWriter.getCarbonSchemaFromAvroSchema(this.avroSchema);
+ return this;
+ }
+
+ /**
+ * to build a {@link CarbonWriter}, which accepts avro file directory and
+ * list of file which has to be loaded.
+ *
+ * @param filePath directory where the avro file exists.
+ * @param fileList list of files which has to be loaded.
+ * @return CarbonWriterBuilder
+ * @throws IOException
+ */
+ public CarbonWriterBuilder withAvroPath(String filePath, List<String>
fileList)
+ throws IOException {
+ this.fileList = fileList;
+ this.withAvroPath(filePath);
+ return this;
+ }
+
+ /**
* to build a {@link CarbonWriter}, which accepts row in Json format
*
* @return CarbonWriterBuilder
@@ -654,20 +879,36 @@ public class CarbonWriterBuilder {
if (hadoopConf == null) {
hadoopConf = new Configuration(FileFactory.getConfiguration());
}
+ CarbonWriter carbonWriter;
if (this.writerType == WRITER_TYPE.AVRO) {
// AVRO records are pushed to Carbon as Object not as Strings. This was
done in order to
// handle multi level complex type support. As there are no conversion
converter step is
// removed from the load. LoadWithoutConverter flag is going to point to
the Loader Builder
// which will skip Conversion Step.
loadModel.setLoadWithoutConverterStep(true);
- return new AvroCarbonWriter(loadModel, hadoopConf, this.avroSchema);
+ carbonWriter = new AvroCarbonWriter(loadModel,
+ hadoopConf, this.avroSchema);
} else if (this.writerType == WRITER_TYPE.JSON) {
loadModel.setJsonFileLoad(true);
- return new JsonCarbonWriter(loadModel, hadoopConf);
+ carbonWriter = new JsonCarbonWriter(loadModel, hadoopConf);
+ } else if (this.writerType == WRITER_TYPE.PARQUET) {
+ loadModel.setLoadWithoutConverterStep(true);
+ carbonWriter = new ParquetCarbonWriter(loadModel, hadoopConf,
this.avroSchema);
+ } else if (this.writerType == WRITER_TYPE.ORC) {
+ carbonWriter = new ORCCarbonWriter(loadModel, hadoopConf);
} else {
// CSV
- return new CSVCarbonWriter(loadModel, hadoopConf);
+ CSVCarbonWriter csvCarbonWriter = new CSVCarbonWriter(loadModel,
hadoopConf);
+ if (!StringUtils.isEmpty(filePath) &&
+ !this.options.containsKey(CarbonCommonConstants.FILE_HEADER)) {
+ csvCarbonWriter.setSkipHeader(true);
+ }
+ carbonWriter = csvCarbonWriter;
+ }
+ if (!StringUtils.isEmpty(this.filePath)) {
+ carbonWriter.setDataFiles(this.dataFiles);
}
+ return carbonWriter;
}
private void setCsvHeader(CarbonLoadModel model) {
diff --git
a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
index 5785d1e..560a292 100644
--- a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
+++ b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
@@ -17,15 +17,24 @@
package org.apache.carbondata.sdk.file;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import
org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeInmemoryMergeHolder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
@@ -36,6 +45,10 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.log4j.Logger;
+import org.json.simple.JSONArray;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
/**
* Writer Implementation to write Json Record to carbondata file.
@@ -43,9 +56,13 @@ import
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
*/
@InterfaceAudience.User
public class JsonCarbonWriter extends CarbonWriter {
+ private Configuration configuration;
private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter;
private TaskAttemptContext context;
private ObjectArrayWritable writable;
+ private CarbonFile[] dataFiles;
+ private static final Logger LOGGER =
+
LogServiceFactory.getLogService(UnsafeInmemoryMergeHolder.class.getName());
JsonCarbonWriter(CarbonLoadModel loadModel, Configuration configuration)
throws IOException {
CarbonTableOutputFormat.setLoadModel(configuration, loadModel);
@@ -58,6 +75,37 @@ public class JsonCarbonWriter extends CarbonWriter {
this.recordWriter = outputFormat.getRecordWriter(context);
this.context = context;
this.writable = new ObjectArrayWritable();
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void setDataFiles(CarbonFile[] dataFiles) throws IOException {
+ if (dataFiles == null || dataFiles.length == 0) {
+ throw new RuntimeException("data files can't be empty.");
+ }
+ Reader jsonReader = null;
+ for (CarbonFile dataFile : dataFiles) {
+ try {
+ jsonReader = this.buildJsonReader(dataFile, this.configuration);
+ new JSONParser().parse(jsonReader);
+ } catch (FileNotFoundException ex) {
+ throw new FileNotFoundException("File " + dataFile + " not found to
build carbon writer.");
+ } catch (ParseException ex) {
+ throw new RuntimeException("File " + dataFile + " is not in json
format.");
+ } finally {
+ if (jsonReader != null) {
+ jsonReader.close();
+ }
+ }
+ }
+ this.dataFiles = dataFiles;
+ }
+
+ private java.io.Reader buildJsonReader(CarbonFile file, Configuration conf)
+ throws IOException {
+ InputStream inputStream = FileFactory.getDataInputStream(file.getPath(),
-1, conf);
+ java.io.Reader reader = new InputStreamReader(inputStream,
StandardCharsets.UTF_8);
+ return reader;
}
/**
@@ -91,4 +139,43 @@ public class JsonCarbonWriter extends CarbonWriter {
throw new IOException(e);
}
}
+
+ private void loadSingleFile(CarbonFile file) throws IOException {
+ Reader reader = null;
+ try {
+ reader = this.buildJsonReader(file, configuration);
+ JSONParser jsonParser = new JSONParser();
+ Object jsonRecord = jsonParser.parse(reader);
+ if (jsonRecord instanceof JSONArray) {
+ JSONArray jsonArray = (JSONArray) jsonRecord;
+ for (Object record : jsonArray) {
+ this.write(record.toString());
+ }
+ } else {
+ this.write(jsonRecord.toString());
+ }
+ } catch (ParseException ex) {
+ LOGGER.error(ex);
+ throw new IOException(ex.getMessage());
+ } finally {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+
+ /**
+ * Load data of all or selected json files at given location iteratively.
+ *
+ * @throws IOException
+ */
+ @Override
+ public void write() throws IOException {
+ if (this.dataFiles == null || this.dataFiles.length == 0) {
+ throw new RuntimeException("'withJsonPath()' must be called to support
load json files");
+ }
+ for (CarbonFile dataFile : this.dataFiles) {
+ this.loadSingleFile(dataFile);
+ }
+ }
}
diff --git
a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/ORCCarbonWriter.java
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/ORCCarbonWriter.java
new file mode 100644
index 0000000..f93f9fd
--- /dev/null
+++ b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/ORCCarbonWriter.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.Field;
+import org.apache.carbondata.core.metadata.datatype.MapType;
+import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.orc.FileFormatException;
+import org.apache.orc.TypeDescription;
+
+
+/**
+ * Implementation to write ORC rows in CSV format to carbondata file.
+ */
+public class ORCCarbonWriter extends CarbonWriter {
+ private Configuration configuration;
+ private Reader orcReader = null;
+ private CarbonFile[] dataFiles;
+ private static final Logger LOGGER =
+ LogServiceFactory.getLogService(ORCCarbonWriter.class.getName());
+ private CSVCarbonWriter csvCarbonWriter;
+
+ ORCCarbonWriter(CarbonLoadModel loadModel, Configuration hadoopConf)
+ throws IOException {
+ this.csvCarbonWriter = new CSVCarbonWriter(loadModel, hadoopConf);
+ this.configuration = hadoopConf;
+ }
+
+ @Override
+ public void setDataFiles(CarbonFile[] dataFiles) throws IOException {
+ if (dataFiles == null || dataFiles.length == 0) {
+ throw new RuntimeException("data files can't be empty.");
+ }
+ this.compareAllOrcFilesSchema(dataFiles);
+ this.dataFiles = dataFiles;
+ }
+
+ private void compareAllOrcFilesSchema(CarbonFile[] dataFiles) throws
IOException {
+ TypeDescription orcSchema = null;
+ for (CarbonFile dataFile : dataFiles) {
+ Reader orcReader = buildOrcReader(dataFile.getPath(),
this.configuration);
+ if (orcSchema == null) {
+ orcSchema = orcReader.getSchema();
+ } else {
+ if (!orcSchema.toString().equals(orcReader.getSchema().toString())) {
+ throw new RuntimeException("All the ORC files must be having the
same schema.");
+ }
+ }
+ }
+ }
+
+ // extract child schema from the ORC type description.
+ private static Field[] childSchema(Field[] childs,
+ List<TypeDescription> childSchemas, List<String> fieldsName) {
+ if (childSchemas != null) {
+ for (int i = 0; i < childSchemas.size(); i++) {
+ List<String> fieldList = null;
+ try {
+ if (childSchemas.get(i) != null) {
+ fieldList = childSchemas.get(i).getFieldNames();
+ }
+ } catch (NullPointerException ex) {
+ LOGGER.info("Field names of given column is null");
+ }
+ childs[i] = orcToCarbonSchemaConverter(childSchemas.get(i), fieldList,
+ fieldsName == null ? null : fieldsName.get(i));
+ }
+ }
+ return childs;
+ }
+
+ private static Reader buildOrcReader(String path, Configuration conf) throws
IOException {
+ try {
+ Reader orcReader = OrcFile.createReader(new Path(path),
+ OrcFile.readerOptions(conf));
+ return orcReader;
+ } catch (FileFormatException ex) {
+ throw new RuntimeException("File " + path + " is not in ORC format");
+ } catch (FileNotFoundException ex) {
+ throw new FileNotFoundException("File " + path + " not found to build
carbon writer.");
+ }
+ }
+
+ // Extract the schema of ORC file and convert into carbon schema.
+ public static Schema extractOrcFileSchema(CarbonFile dataFile, Configuration
conf)
+ throws IOException {
+ Reader orcReader;
+ orcReader = buildOrcReader(dataFile.getPath(), conf);
+ TypeDescription typeDescription = orcReader.getSchema();
+ List<String> fieldList = null;
+ Schema schema;
+ try {
+ fieldList = typeDescription.getFieldNames();
+ } catch (NullPointerException e) {
+ LOGGER.info("Field names of given file is null.");
+ }
+ Field field = orcToCarbonSchemaConverter(typeDescription,
+ fieldList, typeDescription.getCategory().getName());
+ String fieldType = field.getDataType().toString();
+ if (fieldType.equalsIgnoreCase(CarbonCommonConstants.STRUCT)) {
+ int size = field.getChildren().size();
+ Field[] fields = new Field[size];
+ for (int i = 0; i < size; i++) {
+ StructField columnDetails = field.getChildren().get(i);
+ fields[i] = new Field(columnDetails.getFieldName(),
+ columnDetails.getDataType(), columnDetails.getChildren());
+ }
+ schema = new Schema(fields);
+ } else {
+ Field[] fields = new Field[1];
+ fields[0] = field;
+ schema = new Schema(fields);
+ }
+ return schema;
+ }
+
+ // TO convert ORC schema to carbon schema
+ private static Field orcToCarbonSchemaConverter(TypeDescription
typeDescription,
+ List<String> fieldsName, String colName) {
+ Objects.requireNonNull(typeDescription, "orc typeDescription should not be
null");
+ Objects.requireNonNull(typeDescription.getCategory(),
+ "typeDescription category should not be null");
+ if (colName == null) {
+ colName = typeDescription.getCategory().getName();
+ }
+ switch (typeDescription.getCategory()) {
+ case BOOLEAN:
+ return new Field(colName, "boolean");
+ case BYTE:
+ case BINARY:
+ return new Field(colName, "binary");
+ case SHORT:
+ return new Field(colName, "short");
+ case INT:
+ return new Field(colName, "int");
+ case LONG:
+ return new Field(colName, "long");
+ case FLOAT:
+ return new Field(colName, "float");
+ case DOUBLE:
+ return new Field(colName, "double");
+ case DECIMAL:
+ return new Field(colName, "decimal");
+ case STRING:
+ return new Field(colName, "string");
+ case CHAR:
+ case VARCHAR:
+ return new Field(colName, "varchar");
+ case DATE:
+ return new Field(colName, "date");
+ case TIMESTAMP:
+ return new Field(colName, "timestamp");
+ case STRUCT:
+ List<TypeDescription> childSchemas = typeDescription.getChildren();
+ Field[] childs = new Field[childSchemas.size()];
+ childSchema(childs, childSchemas, fieldsName);
+ List<StructField> structList = new ArrayList<>();
+ for (int i = 0; i < childSchemas.size(); i++) {
+ structList.add(new StructField(childs[i].getFieldName(),
+ childs[i].getDataType(), childs[i].getChildren()));
+ }
+ return new Field(colName, "struct", structList);
+ case LIST:
+ childSchemas = typeDescription.getChildren();
+ childs = new Field[childSchemas.size()];
+ childSchema(childs, childSchemas, fieldsName);
+ List<StructField> arrayField = new ArrayList<>();
+ for (int i = 0; i < childSchemas.size(); i++) {
+ arrayField.add(new StructField(childs[i].getFieldName(),
+ childs[i].getDataType(), childs[i].getChildren()));
+ }
+ return new Field(colName, "array", arrayField);
+ case MAP:
+ childSchemas = typeDescription.getChildren();
+ childs = new Field[childSchemas.size()];
+ childSchema(childs, childSchemas, fieldsName);
+ ArrayList<StructField> keyValueFields = new ArrayList<>();
+ StructField keyField = new
StructField(typeDescription.getCategory().getName() + ".key",
+ childs[0].getDataType());
+ StructField valueField = new
StructField(typeDescription.getCategory().getName() + ".value",
+ childs[1].getDataType(), childs[1].getChildren());
+ keyValueFields.add(keyField);
+ keyValueFields.add(valueField);
+ StructField mapKeyValueField =
+ new StructField(typeDescription.getCategory().getName() + ".val",
+ DataTypes.createStructType(keyValueFields), keyValueFields);
+ MapType mapType =
+ DataTypes.createMapType(DataTypes.STRING,
mapKeyValueField.getDataType());
+ List<StructField> mapStructFields = new ArrayList<>();
+ mapStructFields.add(mapKeyValueField);
+ return new Field(colName, mapType, mapStructFields);
+ default:
+ throw new UnsupportedOperationException(
+ "carbon not support " + typeDescription.getCategory().getName() +
" orc type yet");
+ }
+ }
+
+ @Override
+ public void write(Object object) {
+ throw new UnsupportedOperationException("Carbon doesn't support writing a
single ORC object");
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.csvCarbonWriter.close();
+ }
+
+ /**
+ * Load ORC file in iterative way.
+ */
+ @Override
+ public void write() throws IOException {
+ if (this.dataFiles == null || this.dataFiles.length == 0) {
+ throw new RuntimeException("'withOrcPath()' must be called to support
loading ORC files");
+ }
+ for (CarbonFile dataFile : this.dataFiles) {
+ this.loadSingleFile(dataFile);
+ }
+ }
+
+ private void loadSingleFile(CarbonFile file) throws IOException {
+ orcReader = buildOrcReader(file.getPath(), this.configuration);
+ ObjectInspector objectInspector = orcReader.getObjectInspector();
+ RecordReader recordReader = orcReader.rows();
+ Object record = null;
+ if (objectInspector instanceof StructObjectInspector) {
+ StructObjectInspector structObjectInspector =
+ (StructObjectInspector) orcReader.getObjectInspector();
+ while (recordReader.hasNext()) {
+ record = recordReader.next(record);
+ List valueList =
structObjectInspector.getStructFieldsDataAsList(record);
+ for (int i = 0; i < valueList.size(); i++) {
+ valueList.set(i, parseOrcObject(valueList.get(i), 0));
+ }
+ this.csvCarbonWriter.write(valueList.toArray());
+ }
+ } else {
+ while (recordReader.hasNext()) {
+ record = recordReader.next(record);
+ this.csvCarbonWriter.write(new Object[]{parseOrcObject(record, 0)});
+ }
+ }
+ }
+
+ private String parseOrcObject(Object recordObject, int level) {
+ if (recordObject instanceof OrcStruct) {
+ Objects.requireNonNull(orcReader);
+ StructObjectInspector structObjectInspector = (StructObjectInspector)
orcReader
+ .getObjectInspector();
+ List value =
structObjectInspector.getStructFieldsDataAsList(recordObject);
+ for (int i = 0; i < value.size(); i++) {
+ value.set(i, parseOrcObject(value.get(i), level + 1));
+ }
+ String str = listToString(value, level);
+ if (str.length() > 0) {
+ return str.substring(0, str.length() - 1);
+ }
+ return null;
+ } else if (recordObject instanceof ArrayList) {
+ ArrayList listValue = (ArrayList) recordObject;
+ for (int i = 0; i < listValue.size(); i++) {
+ listValue.set(i, parseOrcObject(listValue.get(i), level + 1));
+ }
+ String str = listToString(listValue, level);
+ if (str.length() > 0) {
+ return str.substring(0, str.length() - 1);
+ }
+ return null;
+ } else if (recordObject instanceof LinkedHashMap) {
+ LinkedHashMap<Text, Object> keyValueRow = (LinkedHashMap<Text, Object>)
recordObject;
+ for (Map.Entry<Text, Object> entry : keyValueRow.entrySet()) {
+ Object val = parseOrcObject(keyValueRow.get(entry.getKey()), level +
2);
+ keyValueRow.put(entry.getKey(), val);
+ }
+ StringBuilder str = new StringBuilder();
+ for (Map.Entry<Text, Object> entry : keyValueRow.entrySet()) {
+ Text key = entry.getKey();
+
str.append(key.toString()).append("$").append(keyValueRow.get(key)).append("#");
+ }
+ if (str.length() > 0) {
+ return str.substring(0, str.length() - 1);
+ }
+ return null;
+ }
+ if (recordObject == null) {
+ return null;
+ }
+ return recordObject.toString();
+ }
+
+ private String listToString(List value, int level) {
+ String delimiter;
+ if (level == 0) {
+ delimiter = CarbonCommonConstants.COMPLEX_DELIMITER_LEVEL_1_DEFAULT;
+ } else if (level == 1) {
+ delimiter = CarbonCommonConstants.COMPLEX_DELIMITER_LEVEL_2_DEFAULT;
+ } else if (level == 2) {
+ delimiter = CarbonCommonConstants.COMPLEX_DELIMITER_LEVEL_3_DEFAULT;
+ } else {
+ throw new RuntimeException("carbon only support three level of ORC
complex schema");
+ }
+ StringBuilder str = new StringBuilder();
+ for (int i = 0; i < value.size(); i++) {
+ str.append(value.get(i)).append(delimiter);
+ }
+ return str.toString();
+ }
+}
diff --git
a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/ParquetCarbonWriter.java
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/ParquetCarbonWriter.java
new file mode 100644
index 0000000..c5486ba
--- /dev/null
+++
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/ParquetCarbonWriter.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.hadoop.ParquetReader;
+
+/**
+ * Implementation to write parquet rows in avro format to carbondata file.
+ */
+public class ParquetCarbonWriter extends CarbonWriter {
+ private Configuration configuration;
+ private CarbonFile[] dataFiles;
+ private AvroCarbonWriter avroCarbonWriter;
+
+ ParquetCarbonWriter(CarbonLoadModel loadModel, Configuration hadoopConf,
Schema avroSchema)
+ throws IOException {
+ this.avroCarbonWriter = new AvroCarbonWriter(loadModel, hadoopConf,
avroSchema);
+ this.configuration = hadoopConf;
+ }
+
+ @Override
+ public void setDataFiles(CarbonFile[] dataFiles) throws IOException {
+ if (dataFiles == null || dataFiles.length == 0) {
+ throw new RuntimeException("data files can't be empty.");
+ }
+ Schema parquetSchema = null;
+ for (CarbonFile dataFile : dataFiles) {
+ Schema currentFileSchema = extractParquetSchema(dataFile,
+ this.configuration);
+ if (parquetSchema == null) {
+ parquetSchema = currentFileSchema;
+ } else {
+ if (!parquetSchema.equals(currentFileSchema)) {
+ throw new RuntimeException("All the parquet files must be having the
same schema.");
+ }
+ }
+ }
+ this.dataFiles = dataFiles;
+ }
+
+ /**
+ * TO extract the parquet schema from the given parquet file
+ */
+ public static Schema extractParquetSchema(CarbonFile dataFile,
+ Configuration configuration) throws IOException {
+ ParquetReader<GenericRecord> parquetReader =
+ buildParquetReader(dataFile.getPath(), configuration);
+ Schema parquetSchema = parquetReader.read().getSchema();
+ parquetReader.close();
+ return parquetSchema;
+ }
+
+ private static ParquetReader<GenericRecord> buildParquetReader(String path,
Configuration conf)
+ throws IOException {
+ try {
+ AvroReadSupport<GenericRecord> avroReadSupport = new AvroReadSupport<>();
+ return ParquetReader.builder(avroReadSupport,
+ new Path(path)).withConf(conf).build();
+ } catch (FileNotFoundException ex) {
+ throw new FileNotFoundException("File " + path + " not found to build
carbon writer.");
+ }
+ }
+
+ @Override
+ public void write(Object object) {
+ throw new UnsupportedOperationException("Carbon doesn't " +
+ "support writing a single Parquet object");
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.avroCarbonWriter.close();
+ }
+
+ /**
+ * Load data of all parquet files at given location iteratively.
+ *
+ * @throws IOException
+ */
+ @Override
+ public void write() throws IOException {
+ if (this.dataFiles == null || this.dataFiles.length == 0) {
+ throw new RuntimeException("'withParquetPath()' " +
+ "must be called to support loading parquet files");
+ }
+ for (CarbonFile dataFile : this.dataFiles) {
+ this.loadSingleFile(dataFile);
+ }
+ }
+
+ private void loadSingleFile(CarbonFile file) throws IOException {
+ ParquetReader<GenericRecord> parquetReader = null;
+ try {
+ parquetReader = buildParquetReader(file.getPath(), this.configuration);
+ GenericRecord genericRecord;
+ while ((genericRecord = parquetReader.read()) != null) {
+ this.avroCarbonWriter.write(genericRecord);
+ }
+ } finally {
+ if (parquetReader != null) {
+ parquetReader.close();
+ }
+ }
+ }
+}
diff --git
a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/utils/SDKUtil.java
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/utils/SDKUtil.java
index 0362374..a334746 100644
--- a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/utils/SDKUtil.java
+++ b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/utils/SDKUtil.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
public class SDKUtil {
@@ -79,4 +80,17 @@ public class SDKUtil {
return (Object[]) input[i];
}
+ public static List<CarbonFile> extractFilesFromFolder(String path,
+ String suf, Configuration hadoopConf) {
+ List dataFiles = listFiles(path, suf, hadoopConf);
+ List<CarbonFile> carbonFiles = new ArrayList<>();
+ for (Object dataFile: dataFiles) {
+ carbonFiles.add(FileFactory.getCarbonFile(dataFile.toString(),
hadoopConf));
+ }
+ if (CollectionUtils.isEmpty(dataFiles)) {
+ throw new RuntimeException("No file found at given location. Please
provide" +
+ "the correct folder location.");
+ }
+ return carbonFiles;
+ }
}
diff --git
a/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
index 76a7040..dcc874b 100644
---
a/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
+++
b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -36,13 +37,26 @@ import
org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.avro.generic.GenericData;
import org.apache.commons.io.FileUtils;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.apache.avro.Schema;
public class AvroCarbonWriterTest {
private String path = "./AvroCarbonWriterSuiteWriteFiles";
+ String DATA_PATH = "./src/test/resources/file/";
+
+ @Before
+ @After
+ public void cleanTestData() {
+ try {
+ FileUtils.deleteDirectory(new File(path));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
@Test
public void testWriteBasic() throws IOException {
@@ -603,4 +617,116 @@ public class AvroCarbonWriterTest {
}
}
+ private void loadAvroFile(String filePath) throws IOException,
InvalidLoadOptionException {
+ CarbonWriterBuilder carbonWriterBuilder = new CarbonWriterBuilder();
+ CarbonWriter carbonWriter = carbonWriterBuilder.withAvroPath(filePath)
+ .outputPath(path).writtenBy("AvroCarbonWriterTest").build();
+ carbonWriter.write();
+ carbonWriter.close();
+ File[] dataFiles = new File(path).listFiles();
+ assert (Objects.requireNonNull(dataFiles).length == 2);
+ }
+
+ @Test
+ public void testAvroFileLoadWithNestedSchema() {
+ String filePath = DATA_PATH + "nested_schema.avro";
+ try {
+ loadAvroFile(filePath);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ private void assertFetchedRow(Object[] row, int sum) {
+ Assert.assertTrue(row.length == 3);
+ if (sum % 2 != 0) {
+ Assert.assertEquals(row[0], "Alyssa");
+ Assert.assertNull(((Object[]) row[1])[0]);
+ } else {
+ Assert.assertEquals(row[0], "Ben");
+ Assert.assertEquals(((Object[]) row[1])[0], "red");
+ }
+ }
+
+ @Test
+ public void testLoadingAvroFileAndReadingCarbonFile() throws IOException {
+ String filePath = DATA_PATH + "avro_files/users.avro";
+ CarbonReader carbonReader = null;
+ try {
+ loadAvroFile(filePath);
+ carbonReader = CarbonReader.builder().withFolder(path).build();
+ int sum = 0;
+ while (carbonReader.hasNext()) {
+ sum++;
+ Object[] row = (Object[]) carbonReader.readNextRow();
+ assertFetchedRow(row, sum);
+ }
+ Assert.assertTrue(sum == 2);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ } finally {
+ if (carbonReader != null) {
+ carbonReader.close();
+ }
+ }
+ }
+
+ @Test
+ public void testMultipleAvroFileLoad() throws IOException {
+ String filePath = DATA_PATH + "avro_files";
+ CarbonReader carbonReader = null;
+ try {
+ loadAvroFile(filePath);
+ carbonReader = CarbonReader.builder().withFolder(path).build();
+ int sum = 0;
+ while (carbonReader.hasNext()) {
+ sum++;
+ Object[] row = (Object[]) carbonReader.readNextRow();
+ assertFetchedRow(row, sum);
+ }
+ Assert.assertTrue(sum == 6);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ } finally {
+ if (carbonReader != null) {
+ carbonReader.close();
+ }
+ }
+ }
+
+ @Test
+ public void testSelectedAvroFileLoadInDirectory() throws IOException {
+ String filePath = DATA_PATH + "avro_files";
+ CarbonReader carbonReader = null;
+ try {
+ CarbonWriterBuilder carbonWriterBuilder = new CarbonWriterBuilder();
+ List<String> fileList = new ArrayList<>();
+ fileList.add("users_2.avro");
+ fileList.add("users.avro");
+ CarbonWriter carbonWriter = carbonWriterBuilder.withAvroPath(filePath,
fileList)
+ .outputPath(path).writtenBy("AvroCarbonWriterTest").build();
+ carbonWriter.write();
+ carbonWriter.close();
+ File[] dataFiles = new File(path).listFiles();
+ assert (Objects.requireNonNull(dataFiles).length == 2);
+ carbonReader = CarbonReader.builder().withFolder(path).build();
+ int sum = 0;
+ while (carbonReader.hasNext()) {
+ sum++;
+ Object[] row = (Object[]) carbonReader.readNextRow();
+ assertFetchedRow(row, sum);
+ }
+ Assert.assertTrue(sum == 4);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ } finally {
+ if (carbonReader != null) {
+ carbonReader.close();
+ }
+ }
+ }
}
diff --git
a/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
index 7b9c040..883e6fc 100644
---
a/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
+++
b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -23,8 +23,11 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -46,7 +49,9 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.FileFooter3;
import org.apache.commons.io.FileUtils;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import static org.apache.carbondata.sdk.file.utils.SDKUtil.readObjects;
@@ -55,6 +60,17 @@ import static
org.apache.carbondata.sdk.file.utils.SDKUtil.readObjects;
* Test suite for {@link CSVCarbonWriter}
*/
public class CSVCarbonWriterTest {
+ String path = "./testCsvFileLoad";
+
+ @Before
+ @After
+ public void cleanTestData() {
+ try {
+ FileUtils.deleteDirectory(new File(path));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
@Test
public void testWriteFiles() throws IOException {
@@ -846,4 +862,160 @@ public class CSVCarbonWriterTest {
}
}
+ private void loadCsvFile(String filePath, Schema schema, Map<String, String>
options)
+ throws IOException, InvalidLoadOptionException {
+ CarbonWriterBuilder carbonWriterBuilder = new
CarbonWriterBuilder().withCsvInput(schema).
+ withCsvPath(filePath).outputPath(path).writtenBy("CSVCarbonWriter");
+ CarbonWriter carbonWriter = null;
+ if (options == null) {
+ carbonWriter = carbonWriterBuilder.build();
+ } else {
+ carbonWriter = carbonWriterBuilder.withLoadOptions(options).build();
+ }
+ carbonWriter.write();
+ carbonWriter.close();
+ File[] dataFiles = new File(path).listFiles();
+ assert (Objects.requireNonNull(dataFiles).length == 2);
+ }
+
+ @Test
+ public void testCsvLoadAndCarbonReadWithPrimitiveType() throws IOException,
InvalidLoadOptionException, InterruptedException {
+ String filePath = "./src/test/resources/file/csv_files/primitive_data.csv";
+ Field fields[] = new Field[4];
+ fields[0] = new Field("id", "INT");
+ fields[1] = new Field("country", "STRING");
+ fields[2] = new Field("name", "STRING");
+ fields[3] = new Field("salary", "INT");
+ Schema schema = new Schema(fields);
+ loadCsvFile(filePath, schema, null);
+ CarbonReader reader = CarbonReader.builder("./testCsvFileLoad", "_temp")
+ .projection(new String[]{"id", "country", "name",
"salary"}).build();
+ int rowCount = 0;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ rowCount++;
+ Assert.assertEquals(row[0], rowCount);
+ Assert.assertEquals(row[1], "china");
+ Assert.assertEquals(row[2], "aaa" + rowCount);
+ Assert.assertEquals(row[3], 14999 + rowCount);
+ }
+ assert (rowCount == 10);
+ reader.close();
+ }
+
+ @Test
+ public void testCsvLoadAndCarbonReadWithComplexType() throws IOException,
InterruptedException, InvalidLoadOptionException {
+ String filePath = "../../examples/spark/src/main/resources/data.csv";
+ Field fields[] = new Field[11];
+ fields[0] = new Field("shortField", "SHORT");
+ fields[1] = new Field("intField", "INT");
+ fields[2] = new Field("bigintField", "LONG");
+ fields[3] = new Field("doubleField", "DOUBLE");
+ fields[4] = new Field("stringField", "STRING");
+ fields[5] = new Field("timestampfield", "TIMESTAMP");
+ fields[6] = new Field("decimalField", "DECIMAL");
+ fields[7] = new Field("datefield", "DATE");
+ fields[8] = new Field("charField", "VARCHAR");
+ fields[9] = new Field("floatField", "FLOAT");
+
+ StructField[] structFields = new StructField[3];
+ structFields[0] = new StructField("fooField", DataTypes.STRING);
+ structFields[1] = new StructField("barField", DataTypes.STRING);
+ structFields[2] = new StructField("worldField", DataTypes.STRING);
+ Field structType = new Field("structField", "struct",
Arrays.asList(structFields));
+
+ fields[10] = structType;
+ Map<String, String> options = new HashMap<>();
+ options.put("timestampformat", "yyyy/MM/dd HH:mm:ss");
+ options.put("dateformat", "yyyy/MM/dd");
+ options.put("complex_delimiter_level_1", "#");
+ Schema schema = new Schema(fields);
+ loadCsvFile(filePath, schema, options);
+
+ CarbonReader reader = CarbonReader.builder("./testCsvFileLoad", "_temp")
+ .projection(new String[]{"structfield"}).build();
+ int rowCount = 0;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ Object[] structCol = (Object[]) row[0];
+ assert (structCol.length == 3);
+ Assert.assertEquals(structCol[0], "'foo'");
+ Assert.assertEquals(structCol[1], "'bar'");
+ Assert.assertEquals(structCol[2], "'world'");
+ rowCount++;
+ }
+ assert (rowCount == 10);
+ reader.close();
+ }
+
+ @Test
+ public void testMultipleCsvFileLoad() throws IOException,
InvalidLoadOptionException, InterruptedException {
+ String filePath = "./src/test/resources/file/csv_files";
+ Field fields[] = new Field[4];
+ fields[0] = new Field("id", "INT");
+ fields[1] = new Field("country", "STRING");
+ fields[2] = new Field("name", "STRING");
+ fields[3] = new Field("salary", "INT");
+ Schema schema = new Schema(fields);
+ loadCsvFile(filePath, schema, null);
+
+ CarbonReader reader = CarbonReader.builder("./testCsvFileLoad", "_temp")
+ .projection(new String[]{"id", "country", "name",
"salary"}).build();
+ int rowCount = 0;
+ List<Object[]> rowDatas = new ArrayList<>();
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ rowDatas.add(row);
+ }
+ rowDatas.sort((row1, row2) -> {
+ if((int)row1[0] == (int)row2[0]){
+ return 0;
+ }
+ return ((int)row1[0] < (int)row2[0]) ? -1 : 1;
+ });
+ for(Object[] row: rowDatas) {
+ rowCount++;
+ Assert.assertEquals(row[0], rowCount);
+ Assert.assertEquals(row[1], "china");
+ Assert.assertEquals(row[2], "aaa" + rowCount);
+ Assert.assertEquals(row[3], 14999 + rowCount);
+ }
+ assert (rowCount == 30);
+ reader.close();
+ }
+
+ @Test
+ public void testSelectedCsvFileLoadInDirectory() throws IOException,
+ InvalidLoadOptionException, InterruptedException {
+ String filePath = "./src/test/resources/file/csv_files";
+ CarbonWriterBuilder carbonWriterBuilder = new CarbonWriterBuilder();
+ Field fields[] = new Field[4];
+ fields[0] = new Field("id", "INT");
+ fields[1] = new Field("country", "STRING");
+ fields[2] = new Field("name", "STRING");
+ fields[3] = new Field("salary", "INT");
+ List<String> fileList = new ArrayList<>();
+ fileList.add("primitive_data_2.csv");
+ fileList.add("primitive_data_3.csv");
+ CarbonWriter carbonWriter = carbonWriterBuilder.withCsvInput(new
Schema(fields)).
+ withCsvPath(filePath,
fileList).outputPath(path).writtenBy("CSVCarbonWriter").build();
+ carbonWriter.write();
+ carbonWriter.close();
+ File[] dataFiles = new File(path).listFiles();
+ assert (Objects.requireNonNull(dataFiles).length == 2);
+
+ CarbonReader reader = CarbonReader.builder("./testCsvFileLoad", "_temp")
+ .projection(new String[]{"id", "country", "name",
"salary"}).build();
+ int id = 10;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ id++;
+ Assert.assertEquals(row[0], id);
+ Assert.assertEquals(row[1], "china");
+ Assert.assertEquals(row[2], "aaa" + id);
+ Assert.assertEquals(row[3], 14999 + id);
+ }
+ assert (id == 30);
+ reader.close();
+ }
}
diff --git
a/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/JSONCarbonWriterTest.java
b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/JSONCarbonWriterTest.java
new file mode 100644
index 0000000..a3b226f
--- /dev/null
+++
b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/JSONCarbonWriterTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.Field;
+import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Test suite for {@link JsonCarbonWriter}
+ */
+public class JSONCarbonWriterTest {
+ String path = "./testLoadJsonFIle";
+
+ @Before
+ @After
+ public void cleanTestData() {
+ try {
+ FileUtils.deleteDirectory(new File(path));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private Schema buildSchema() {
+ Field[] fields = new Field[9];
+ fields[0] = new Field("stringField", "STRING");
+ fields[1] = new Field("intField", "INT");
+ fields[2] = new Field("shortField", "SHORT");
+ fields[3] = new Field("longField", "LONG");
+ fields[4] = new Field("doubleField", "DOUBLE");
+ fields[5] = new Field("boolField", "BOOLEAN");
+ fields[6] = new Field("dateField", "DATE");
+ fields[7] = new Field("timeField", "TIMESTAMP");
+ fields[8] = new Field("decimalField", "DECIMAL");
+ return new Schema(fields);
+ }
+
+ private void loadJsonFile(String filePath, Schema schema)
+ throws IOException, InvalidLoadOptionException {
+ CarbonWriterBuilder carbonWriterBuilder = new CarbonWriterBuilder();
+ CarbonWriter carbonWriter =
carbonWriterBuilder.withJsonPath(filePath).outputPath(path)
+ .withJsonInput(schema).writtenBy("JSONCarbonWriterTest").build();
+ carbonWriter.write();
+ carbonWriter.close();
+ File[] dataFiles = new File(path).listFiles();
+ assert (Objects.requireNonNull(dataFiles).length == 2);
+ }
+
+ @Test
+ public void testJsonFileLoadSingleRow() throws IOException,
+ InvalidLoadOptionException, InterruptedException {
+ String filePath =
"./src/test/resources/file/json_files/allPrimitiveType.json";
+ Schema schema = buildSchema();
+ loadJsonFile(filePath, schema);
+
+ CarbonReader reader = CarbonReader.builder("./testLoadJsonFIle", "_temp")
+ .projection(new String[]{"stringField", "boolField", "decimalField",
"longField"}).build();
+ int id = 0;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ Assert.assertEquals(row[0], "nihal\"ojha\"");
+ Assert.assertEquals(row[1], false);
+ Assert.assertEquals(row[2], (new BigDecimal("55.35")).setScale(2,
BigDecimal.ROUND_FLOOR));
+ Assert.assertEquals(row[3], (long) 1234567);
+ id++;
+ }
+ assert (id == 1);
+ reader.close();
+ }
+
+ @Test
+ public void testJsonFileLoadWithMultipleRow() throws IOException,
+ InvalidLoadOptionException, InterruptedException {
+ String filePath =
"./src/test/resources/file/json_files/allPrimitiveTypeMultipleRows.json";
+ Schema schema = buildSchema();
+ loadJsonFile(filePath, schema);
+ CarbonReader reader = CarbonReader.builder("./testLoadJsonFIle", "_temp")
+ .projection(new String[]{"stringField", "boolField", "shortField",
"doubleField"}).build();
+ int count = 0;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ if (count == 0) {
+ Assert.assertEquals(row[0], "nihal");
+ Assert.assertEquals(row[1], false);
+ Assert.assertEquals((((Short) row[2]).intValue()), 26);
+ Assert.assertEquals(row[3], 23.3333);
+ } else if (count == 1) {
+ Assert.assertEquals(row[0], "ab");
+ Assert.assertEquals(row[1], false);
+ Assert.assertEquals((((Short) row[2]).intValue()), 25);
+ Assert.assertEquals(row[3], 23.3333);
+ }
+ count++;
+ }
+ assert (count == 4);
+ reader.close();
+ }
+
+ @Test
+ public void testJsonFileLoadWithComplexSchema() throws IOException,
+ InvalidLoadOptionException, InterruptedException {
+ String filePath = "../../integration/spark/src/test/resources/jsonFiles/" +
+ "data/arrayOfarrayOfarrayOfStruct.json";
+ Field[] fields = new Field[3];
+ fields[0] = new Field("name", "STRING");
+ fields[1] = new Field("age", "INT");
+
+ StructField[] structFields = new StructField[2];
+ structFields[0] = new StructField("street", DataTypes.STRING);
+ structFields[1] = new StructField("city", DataTypes.STRING);
+ Field structType = new Field("structField", "struct",
Arrays.asList(structFields));
+
+ List<StructField> arrayField3 = new ArrayList<>();
+ arrayField3.add(new StructField("structField", structType.getDataType()
+ , Arrays.asList(structFields)));
+ Field arrayLevel3 = new Field("arr3", "array", arrayField3);
+
+ List<StructField> arrayField2 = new ArrayList<>();
+ arrayField2.add(new StructField("arrayField3", arrayLevel3.getDataType(),
arrayField3));
+ Field arrayLevel2 = new Field("arr2", "array", arrayField2);
+
+ List<StructField> arrayField = new ArrayList<>();
+ arrayField.add(new StructField("arrayField2", arrayLevel2.getDataType(),
arrayField2));
+ Field arrayLevel1 = new Field("BuildNum", "array", arrayField);
+
+ fields[2] = arrayLevel1;
+ Schema schema = new Schema(fields);
+ loadJsonFile(filePath, schema);
+
+ CarbonReader reader = CarbonReader.builder("./testLoadJsonFIle", "_temp")
+ .projection(new String[]{"name", "age", "BuildNum"}).build();
+ int count = 0;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ Assert.assertEquals(row[0], "ajantha");
+ Assert.assertEquals(row[1], 26);
+ Object[] outerArray = (Object[]) row[2];
+ Assert.assertEquals(outerArray.length, 2);
+ Object[] middleArray = (Object[]) outerArray[0];
+ Assert.assertEquals(middleArray.length, 2);
+ Object[] innerArray1 = (Object[]) middleArray[0];
+ Assert.assertEquals(innerArray1.length, 3);
+ Object[] structField1 = (Object[]) innerArray1[0];
+ Assert.assertEquals(structField1.length, 2);
+ Assert.assertEquals(structField1[0], "abc");
+ Assert.assertEquals(structField1[1], "city1");
+ Object[] structField2 = (Object[]) innerArray1[1];
+ Assert.assertEquals(structField2.length, 2);
+ Assert.assertEquals(structField2[0], "def");
+ Assert.assertEquals(structField2[1], "city2");
+ count++;
+ }
+ assert (count == 1);
+ reader.close();
+ }
+
+ @Test
+ public void testMultiplJsonFileLoad() throws IOException,
+ InvalidLoadOptionException, InterruptedException {
+ String filePath = "./src/test/resources/file/json_files";
+ Schema schema = buildSchema();
+ loadJsonFile(filePath, schema);
+ CarbonReader reader = CarbonReader.builder("./testLoadJsonFIle", "_temp")
+ .projection(new String[]{"longField", "boolField", "decimalField",
"doubleField"}).build();
+ int count = 0;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ Assert.assertEquals(row[0], (long) 1234567);
+ Assert.assertEquals(row[1], false);
+ Assert.assertEquals(row[2], (new BigDecimal("55.35")).setScale(2,
BigDecimal.ROUND_FLOOR));
+ Assert.assertEquals(row[3], 23.3333);
+ count++;
+ }
+ assert (count == 6);
+ reader.close();
+ }
+
+ @Test
+ public void testSelectedJsonFileLoadInDirectory() throws IOException,
+ InvalidLoadOptionException, InterruptedException {
+ String filePath = "./src/test/resources/file/json_files";
+ Schema schema = buildSchema();
+ CarbonWriterBuilder carbonWriterBuilder = new CarbonWriterBuilder();
+ List<String> fileList = new ArrayList<>();
+ fileList.add("allPrimitiveType.json");
+ fileList.add("allPrimitiveTypeMultipleRows.json");
+ CarbonWriter carbonWriter = carbonWriterBuilder.withJsonPath(filePath,
fileList).outputPath(path)
+ .withJsonInput(schema).writtenBy("JSONCarbonWriterTest").build();
+ carbonWriter.write();
+ carbonWriter.close();
+ File[] dataFiles = new File(path).listFiles();
+ assert (Objects.requireNonNull(dataFiles).length == 2);
+ CarbonReader reader = CarbonReader.builder("./testLoadJsonFIle", "_temp")
+ .projection(new String[]{"longField", "boolField", "decimalField",
"doubleField"}).build();
+ int count = 0;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ Assert.assertEquals(row[0], (long) 1234567);
+ Assert.assertEquals(row[1], false);
+ Assert.assertEquals(row[2], (new BigDecimal("55.35")).setScale(2,
BigDecimal.ROUND_FLOOR));
+ Assert.assertEquals(row[3], 23.3333);
+ count++;
+ }
+ assert (count == 5);
+ reader.close();
+ }
+}
\ No newline at end of file
diff --git
a/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/ORCCarbonWriterTest.java
b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/ORCCarbonWriterTest.java
new file mode 100644
index 0000000..cc72945
--- /dev/null
+++
b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/ORCCarbonWriterTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test suite for {@link ORCCarbonWriter}
+ */
+public class ORCCarbonWriterTest {
+ String DATA_PATH = "./src/test/resources/file/";
+ String outputPath = "./testloadORCFiles";
+
+ @Before
+ @After
+ public void cleanTestData() {
+ try {
+ FileUtils.deleteDirectory(new File(outputPath));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void loadOrcFile(String path) throws IOException,
InvalidLoadOptionException {
+ CarbonWriterBuilder carbonWriterBuilder = new CarbonWriterBuilder();
+ CarbonWriter carbonWriter = carbonWriterBuilder.withOrcPath(path)
+ .outputPath(outputPath).writtenBy("ORCCarbonWriter")
+ .withLoadOption("bad_records_action", "force").build();
+ carbonWriter.write();
+ carbonWriter.close();
+ }
+
+ @Test
+ public void testORCFileLoadWithPrimitiveType() throws IOException,
InvalidLoadOptionException {
+ String filePath = DATA_PATH + "userdata1_orc";
+ loadOrcFile(filePath);
+ }
+
+ @Test
+ public void testORCFileLoadWithoutStructSchema() throws IOException,
InvalidLoadOptionException {
+ String filePath = DATA_PATH + "testTimestamp.orc";
+ loadOrcFile(filePath);
+ }
+
+ @Test
+ public void testORCFileLoadWithComplexSchema() throws IOException,
+ InvalidLoadOptionException, InterruptedException {
+ String filePath = DATA_PATH + "orc_files/sample.orc";
+ loadOrcFile(filePath);
+ CarbonReader reader = CarbonReader.builder(outputPath, "_temp")
+ .projection(new String[]{"string1", "list"}).build();
+ int count = 0;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ count++;
+ if (count == 1) {
+ assertFetcedRow(row);
+ }
+ }
+ assert (count == 2);
+ reader.close();
+ }
+
+ private void assertFetcedRow(Object[] row) {
+ Assert.assertEquals(row[0], "hi");
+ Object[] list = (Object[]) row[1];
+ assert (list.length == 2);
+ Object[] first = (Object[]) list[0];
+ Assert.assertEquals(first[1], "good");
+ Assert.assertEquals(first[0], 3);
+ Object[] second = (Object[]) list[1];
+ Assert.assertEquals(second[1], "bad");
+ Assert.assertEquals(second[0], 4);
+ }
+
+ @Test
+ public void testMultipleORCFileLoad() throws IOException,
+ InvalidLoadOptionException, InterruptedException {
+ String filePath = DATA_PATH + "orc_files";
+ loadOrcFile(filePath);
+ CarbonReader reader = CarbonReader.builder(outputPath, "_temp")
+ .projection(new String[]{"string1", "list"}).build();
+ int count = 0;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ count++;
+ if (count % 2 == 1) {
+ assertFetcedRow(row);
+ }
+ }
+ assert (count == 6);
+ reader.close();
+ }
+
+ @Test
+ public void testOrcFileLoadAndCarbonRead() throws IOException,
+ InterruptedException, InvalidLoadOptionException {
+ String filePath = DATA_PATH + "orc_files";
+ CarbonWriterBuilder carbonWriterBuilder = new CarbonWriterBuilder();
+ List<String> fileList = new ArrayList<>();
+ fileList.add("sample.orc");
+ fileList.add("sample_3.orc");
+ CarbonWriter carbonWriter = carbonWriterBuilder.withOrcPath(filePath,
fileList).
+ outputPath(outputPath).writtenBy("ORCCarbonWriter")
+ .withLoadOption("bad_records_action", "force").build();
+ carbonWriter.write();
+ carbonWriter.close();
+ File[] dataFiles = new File(outputPath).listFiles();
+ assert (Objects.requireNonNull(dataFiles).length == 2);
+ CarbonReader reader = CarbonReader.builder(outputPath, "_temp")
+ .projection(new String[]{"string1", "list"}).build();
+ int count = 0;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ count++;
+ if (count % 2 == 1) {
+ assertFetcedRow(row);
+ }
+ }
+ assert (count == 4);
+ reader.close();
+ }
+}
diff --git
a/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/ParquetCarbonWriterTest.java
b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/ParquetCarbonWriterTest.java
new file mode 100644
index 0000000..95ea4a9
--- /dev/null
+++
b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/ParquetCarbonWriterTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test suite for {@link ParquetCarbonWriter}
+ */
+public class ParquetCarbonWriterTest {
+ String DATA_PATH = "./src/test/resources/file/";
+ String outputPath = "./testWriteFiles";
+
+ @Before
+ @After
+ public void cleanTestData() {
+ try {
+ FileUtils.deleteDirectory(new File(outputPath));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void loadParquetFile(String filePath) throws IOException,
InvalidLoadOptionException {
+ CarbonWriterBuilder carbonWriterBuilder = new CarbonWriterBuilder();
+ CarbonWriter carbonWriter =
carbonWriterBuilder.withParquetPath(filePath).outputPath(outputPath)
+ .writtenBy("ParquetCarbonWriterTest").build();
+ carbonWriter.write();
+ carbonWriter.close();
+ File[] dataFiles = new File(outputPath).listFiles();
+ assert (Objects.requireNonNull(dataFiles).length == 2);
+ }
+
+ @Test
+ public void testParquetFileLoadWithNestedMap() throws IOException,
InvalidLoadOptionException {
+ String filePath = DATA_PATH + "NestedMap.parquet";
+ loadParquetFile(filePath);
+ }
+
+ @Test
+ public void testParquetFileLoadWithMixSchema() throws IOException,
InvalidLoadOptionException {
+ String filePath = DATA_PATH + "parquet_files/file1.parquet";
+ loadParquetFile(filePath);
+ }
+
+ @Test
+ public void testParquetFileWithInt96() throws IOException,
InvalidLoadOptionException {
+ String filePath = DATA_PATH + "userdata1.parquet";
+ CarbonWriterBuilder carbonWriterBuilder = new CarbonWriterBuilder();
+ try {
+ carbonWriterBuilder.withParquetPath(filePath).outputPath(outputPath)
+ .writtenBy("ParquetFileLoader").build();
+ } catch (IllegalArgumentException e) {
+ Assert.assertEquals("INT96 not implemented and is deprecated",
e.getMessage());
+ }
+ }
+
+ @Test
+ public void testParquetFileWithRepeatedSchema() throws IOException,
InvalidLoadOptionException {
+ String filePath = DATA_PATH + "repeated-schema.parquet";
+ CarbonWriterBuilder carbonWriterBuilder = new CarbonWriterBuilder();
+ try {
+ carbonWriterBuilder.withParquetPath(filePath).outputPath(outputPath)
+ .writtenBy("ParquetCarbonWriterTest").build();
+ } catch (UnsupportedOperationException e) {
+ assert (e.getMessage().contains("REPEATED not supported outside LIST or
MAP."));
+ }
+ }
+
+ private int buildCarbonReaderAndFetchRecord() throws IOException,
InterruptedException {
+ CarbonReader reader = CarbonReader
+ .builder(outputPath, "_temp")
+ .projection(new String[]{"file"})
+ .build();
+ int rowCount = 0;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ Assert.assertEquals(((Object[]) row[0])[0], "file1.parquet");
+ rowCount++;
+ }
+ reader.close();
+ return rowCount;
+ }
+
+ @Test
+ public void testParquetLoadAndCarbonRead() throws Exception {
+ String filePath = DATA_PATH + "parquet_files/file1.parquet";
+ loadParquetFile(filePath);
+ int i = buildCarbonReaderAndFetchRecord();
+ Assert.assertEquals(i, 1);
+ }
+
+ @Test
+ public void testMultipeParquetFileLoad() throws IOException,
InvalidLoadOptionException, InterruptedException {
+ String filePath = DATA_PATH + "parquet_files";
+ loadParquetFile(filePath);
+ int numberOfRow = buildCarbonReaderAndFetchRecord();
+ Assert.assertEquals(numberOfRow, 3);
+ }
+
+ @Test
+ public void testSelectedParquetFileLoadInDirectory() throws IOException,
InvalidLoadOptionException, InterruptedException {
+ String filePath = DATA_PATH + "parquet_files";
+ CarbonWriterBuilder carbonWriterBuilder = new CarbonWriterBuilder();
+ List<String> fileList = new ArrayList<>();
+ fileList.add("file1.parquet");
+ fileList.add("file3.parquet");
+ CarbonWriter carbonWriter = carbonWriterBuilder.withParquetPath(filePath,
fileList)
+ .outputPath(outputPath).writtenBy("ParquetCarbonWriterTest").build();
+ carbonWriter.write();
+ carbonWriter.close();
+ File[] dataFiles = new File(outputPath).listFiles();
+ assert (Objects.requireNonNull(dataFiles).length == 2);
+
+ int numberOfRow = buildCarbonReaderAndFetchRecord();
+ Assert.assertEquals(numberOfRow, 2);
+ }
+}
diff --git a/sdk/sdk/src/test/resources/file/NestedMap.parquet
b/sdk/sdk/src/test/resources/file/NestedMap.parquet
new file mode 100644
index 0000000..a43c329
Binary files /dev/null and b/sdk/sdk/src/test/resources/file/NestedMap.parquet
differ
diff --git a/sdk/sdk/src/test/resources/file/avro_files/users.avro
b/sdk/sdk/src/test/resources/file/avro_files/users.avro
new file mode 100644
index 0000000..27c526a
Binary files /dev/null and
b/sdk/sdk/src/test/resources/file/avro_files/users.avro differ
diff --git a/sdk/sdk/src/test/resources/file/avro_files/users_2.avro
b/sdk/sdk/src/test/resources/file/avro_files/users_2.avro
new file mode 100644
index 0000000..27c526a
Binary files /dev/null and
b/sdk/sdk/src/test/resources/file/avro_files/users_2.avro differ
diff --git a/sdk/sdk/src/test/resources/file/avro_files/users_3.avro
b/sdk/sdk/src/test/resources/file/avro_files/users_3.avro
new file mode 100644
index 0000000..27c526a
Binary files /dev/null and
b/sdk/sdk/src/test/resources/file/avro_files/users_3.avro differ
diff --git a/sdk/sdk/src/test/resources/file/csv_files/primitive_data.csv
b/sdk/sdk/src/test/resources/file/csv_files/primitive_data.csv
new file mode 100644
index 0000000..61bd4e8
--- /dev/null
+++ b/sdk/sdk/src/test/resources/file/csv_files/primitive_data.csv
@@ -0,0 +1,11 @@
+ID,country,name,salary
+1,china,aaa1,15000
+2,china,aaa2,15001
+3,china,aaa3,15002
+4,china,aaa4,15003
+5,china,aaa5,15004
+6,china,aaa6,15005
+7,china,aaa7,15006
+8,china,aaa8,15007
+9,china,aaa9,15008
+10,china,aaa10,15009
diff --git a/sdk/sdk/src/test/resources/file/csv_files/primitive_data_2.csv
b/sdk/sdk/src/test/resources/file/csv_files/primitive_data_2.csv
new file mode 100644
index 0000000..18a119a
--- /dev/null
+++ b/sdk/sdk/src/test/resources/file/csv_files/primitive_data_2.csv
@@ -0,0 +1,11 @@
+ID,country,name,salary
+11,china,aaa11,15010
+12,china,aaa12,15011
+13,china,aaa13,15012
+14,china,aaa14,15013
+15,china,aaa15,15014
+16,china,aaa16,15015
+17,china,aaa17,15016
+18,china,aaa18,15017
+19,china,aaa19,15018
+20,china,aaa20,15019
diff --git a/sdk/sdk/src/test/resources/file/csv_files/primitive_data_3.csv
b/sdk/sdk/src/test/resources/file/csv_files/primitive_data_3.csv
new file mode 100644
index 0000000..aa52009
--- /dev/null
+++ b/sdk/sdk/src/test/resources/file/csv_files/primitive_data_3.csv
@@ -0,0 +1,11 @@
+ID,country,name,salary
+21,china,aaa21,15020
+22,china,aaa22,15021
+23,china,aaa23,15022
+24,china,aaa24,15023
+25,china,aaa25,15024
+26,china,aaa26,15025
+27,china,aaa27,15026
+28,china,aaa28,15027
+29,china,aaa29,15028
+30,china,aaa30,15029
diff --git a/sdk/sdk/src/test/resources/file/json_files/allPrimitiveType.json
b/sdk/sdk/src/test/resources/file/json_files/allPrimitiveType.json
new file mode 100644
index 0000000..9d22d2a
--- /dev/null
+++ b/sdk/sdk/src/test/resources/file/json_files/allPrimitiveType.json
@@ -0,0 +1,11 @@
+{
+ "stringField": "nihal\"ojha\"",
+ "intField": 26,
+ "shortField": 26,
+ "longField": 1234567,
+ "doubleField": 23.3333,
+ "boolField": false,
+ "dateField": "2019-03-02",
+ "timeField": "2019-02-12 03:03:34",
+ "decimalField" : 55.35
+}
diff --git
a/sdk/sdk/src/test/resources/file/json_files/allPrimitiveTypeMultipleRows.json
b/sdk/sdk/src/test/resources/file/json_files/allPrimitiveTypeMultipleRows.json
new file mode 100644
index 0000000..40e5897
--- /dev/null
+++
b/sdk/sdk/src/test/resources/file/json_files/allPrimitiveTypeMultipleRows.json
@@ -0,0 +1,46 @@
+[
+ {
+ "stringField": "nihal",
+ "intField": 26,
+ "shortField": 26,
+ "longField": 1234567,
+ "doubleField": 23.3333,
+ "boolField": false,
+ "dateField": "2019-03-02",
+ "timeField": "2019-02-12 03:03:34",
+ "decimalField": 55.35
+ },
+ {
+ "stringField": "ab",
+ "intField": 25,
+ "shortField": 25,
+ "longField": 1234567,
+ "doubleField": 23.3333,
+ "boolField": false,
+ "dateField": "2018-03-02",
+ "timeField": "2018-02-12 03:03:34",
+ "decimalField": 55.35
+ },
+ {
+ "stringField": "cd",
+ "intField": 24,
+ "shortField": 24,
+ "longField": 1234567,
+ "doubleField": 23.3333,
+ "boolField": false,
+ "dateField": "2017-03-02",
+ "timeField": "2017-02-12 03:03:34",
+ "decimalField": 55.35
+ },
+ {
+ "stringField": "ef",
+ "intField": 23,
+ "shortField": 23,
+ "longField": 1234567,
+ "doubleField": 23.3333,
+ "boolField": false,
+ "dateField": "2016-03-02",
+ "timeField": "2016-02-12 03:03:34",
+ "decimalField": 55.35
+ }
+]
diff --git
a/sdk/sdk/src/test/resources/file/json_files/allPrimitiveTypeSingleArray.json
b/sdk/sdk/src/test/resources/file/json_files/allPrimitiveTypeSingleArray.json
new file mode 100644
index 0000000..ecd6fe2
--- /dev/null
+++
b/sdk/sdk/src/test/resources/file/json_files/allPrimitiveTypeSingleArray.json
@@ -0,0 +1,13 @@
+[
+ {
+ "stringField": "ZZ",
+ "intField": 100,
+ "shortField": 100,
+ "longField": 1234567,
+ "doubleField": 23.3333,
+ "boolField": false,
+ "dateField": "2020-03-02",
+ "timeField": "2020-02-12 03:03:34",
+ "decimalField": 55.35
+ }
+]
diff --git a/sdk/sdk/src/test/resources/file/nested_schema.avro
b/sdk/sdk/src/test/resources/file/nested_schema.avro
new file mode 100644
index 0000000..2bb3dff
--- /dev/null
+++ b/sdk/sdk/src/test/resources/file/nested_schema.avro
@@ -0,0 +1,91 @@
+Objavro.schema��{"type":"record","name":"top_level","namespace":"com.facebook.test","fields":[{"name":"string_col","type":"string"},{"name":"long_record","type":{"type":"record","name":"long_record_type","fields":[{"name":"record_field","type":"string"},{"name":"record_field0","type":"string"},{"name":"record_field1","type":"string"},{"name":"record_field2","type":"string"},{"name":"record_field3","type":"string"},{"name":"record_field4","type":"string"},{"name":"record_field5","type
[...]
+val10
+val11
+val12
+val13
+val14
+val15
+val16
+val17
+val18
+val19
+val20
+val21
+val22
+val23
+val24
+val25
+val26
+val27
+val28
+val29
+val30
+val31
+val32
+val33
+val34
+val35
+val36
+val37
+val38
+val39
+val40
+val41
+val42
+val43
+val44
+val45
+val46
+val47
+val48
+val49
+val50
+val51
+val52
+val53
+val54
+val55
+val56
+val57
+val58
+val59
+val60
+val61
+val62
+val63
+val64
+val65
+val66
+val67
+val68
+val69
+val70
+val71
+val72
+val73
+val74
+val75
+val76
+val77
+val78
+val79
+val80
+val81
+val82
+val83
+val84
+val85
+val86
+val87
+val88
+val89
+val90
+val91
+val92
+val93
+val94
+val95
+val96
+val97
+val98
+val99val100val101val102val103val104val105val106val107val108val109
val110val111val112val113val114val115val116val117val118val119val120
val121val122val123val124val125val126val127val128val129val130val131
val132val133val134val135val136val137val138val139val140val141val142
val143val144val145val146val147val148val149val150val151val152val153
val154val155val156val157val158val159val160val161val162val163val164
val165val166val167val168val [...]
\ No newline at end of file
diff --git a/sdk/sdk/src/test/resources/file/orc_files/sample.orc
b/sdk/sdk/src/test/resources/file/orc_files/sample.orc
new file mode 100644
index 0000000..4fb0bef
Binary files /dev/null and
b/sdk/sdk/src/test/resources/file/orc_files/sample.orc differ
diff --git a/sdk/sdk/src/test/resources/file/orc_files/sample_2.orc
b/sdk/sdk/src/test/resources/file/orc_files/sample_2.orc
new file mode 100644
index 0000000..4fb0bef
Binary files /dev/null and
b/sdk/sdk/src/test/resources/file/orc_files/sample_2.orc differ
diff --git a/sdk/sdk/src/test/resources/file/orc_files/sample_3.orc
b/sdk/sdk/src/test/resources/file/orc_files/sample_3.orc
new file mode 100644
index 0000000..4fb0bef
Binary files /dev/null and
b/sdk/sdk/src/test/resources/file/orc_files/sample_3.orc differ
diff --git a/sdk/sdk/src/test/resources/file/parquet_files/file1.parquet
b/sdk/sdk/src/test/resources/file/parquet_files/file1.parquet
new file mode 100644
index 0000000..ad360b1
Binary files /dev/null and
b/sdk/sdk/src/test/resources/file/parquet_files/file1.parquet differ
diff --git a/sdk/sdk/src/test/resources/file/parquet_files/file2.parquet
b/sdk/sdk/src/test/resources/file/parquet_files/file2.parquet
new file mode 100644
index 0000000..ad360b1
Binary files /dev/null and
b/sdk/sdk/src/test/resources/file/parquet_files/file2.parquet differ
diff --git a/sdk/sdk/src/test/resources/file/parquet_files/file3.parquet
b/sdk/sdk/src/test/resources/file/parquet_files/file3.parquet
new file mode 100644
index 0000000..ad360b1
Binary files /dev/null and
b/sdk/sdk/src/test/resources/file/parquet_files/file3.parquet differ
diff --git a/sdk/sdk/src/test/resources/file/repeated-schema.parquet
b/sdk/sdk/src/test/resources/file/repeated-schema.parquet
new file mode 100644
index 0000000..8a7eea6
Binary files /dev/null and
b/sdk/sdk/src/test/resources/file/repeated-schema.parquet differ
diff --git a/sdk/sdk/src/test/resources/file/testTimestamp.orc
b/sdk/sdk/src/test/resources/file/testTimestamp.orc
new file mode 100644
index 0000000..505d42c
Binary files /dev/null and b/sdk/sdk/src/test/resources/file/testTimestamp.orc
differ
diff --git a/sdk/sdk/src/test/resources/file/userdata1.avro
b/sdk/sdk/src/test/resources/file/userdata1.avro
new file mode 100644
index 0000000..c6b8e90
Binary files /dev/null and b/sdk/sdk/src/test/resources/file/userdata1.avro
differ
diff --git a/sdk/sdk/src/test/resources/file/userdata1.parquet
b/sdk/sdk/src/test/resources/file/userdata1.parquet
new file mode 100644
index 0000000..2ae23da
Binary files /dev/null and b/sdk/sdk/src/test/resources/file/userdata1.parquet
differ
diff --git a/sdk/sdk/src/test/resources/file/userdata1_orc
b/sdk/sdk/src/test/resources/file/userdata1_orc
new file mode 100644
index 0000000..db3d176
Binary files /dev/null and b/sdk/sdk/src/test/resources/file/userdata1_orc
differ
diff --git a/sdk/sdk/src/test/resources/file/weather.avro
b/sdk/sdk/src/test/resources/file/weather.avro
new file mode 100644
index 0000000..b5b6b8a
Binary files /dev/null and b/sdk/sdk/src/test/resources/file/weather.avro differ