[CARBONDATA-2253][SDK] Support write JSON/Avro data to carbon files This PR adds AvroCarbonWriter in SDK, it can be used to write JSON or Avro data to carbon files
This closes #2061 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e39b0a14 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e39b0a14 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e39b0a14 Branch: refs/heads/carbonfile Commit: e39b0a14a196224dbe6fdce2ff53e09b3b76b876 Parents: 04ff367 Author: Jacky Li <[email protected]> Authored: Wed Mar 14 00:06:37 2018 +0800 Committer: Jacky Li <[email protected]> Committed: Fri Mar 16 23:02:37 2018 +0800 ---------------------------------------------------------------------- pom.xml | 6 - store/sdk/pom.xml | 95 +------ .../carbondata/sdk/file/AvroCarbonWriter.java | 125 +++++++++ .../sdk/file/CarbonWriterBuilder.java | 29 +- .../sdk/file/AvroCarbonWriterTest.java | 104 ++++++++ .../sdk/file/CSVCarbonWriterSuite.java | 267 ------------------- .../sdk/file/CSVCarbonWriterTest.java | 267 +++++++++++++++++++ 7 files changed, 519 insertions(+), 374 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/e39b0a14/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 972be1e..287e052 100644 --- a/pom.xml +++ b/pom.xml @@ -579,12 +579,6 @@ <id>include-all</id> </profile> <profile> - <id>store-sdk</id> - <modules> - <module>store/sdk</module> - </modules> - </profile> - <profile> <id>sdvtest</id> <modules> <module>integration/spark-common-cluster-test</module> http://git-wip-us.apache.org/repos/asf/carbondata/blob/e39b0a14/store/sdk/pom.xml ---------------------------------------------------------------------- diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml index 9f7038a..1d1735e 100644 --- a/store/sdk/pom.xml +++ b/store/sdk/pom.xml @@ -25,55 +25,27 @@ <version>${project.version}</version> </dependency> <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> + <groupId>tech.allegro.schema.json2avro</groupId> + <artifactId>converter</artifactId> + <version>0.2.5</version> <scope>test</scope> </dependency> <dependency> - <groupId>org.scalatest</groupId> - <artifactId>scalatest_${scala.binary.version}</artifactId> + <groupId>junit</groupId> + <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies> <build> + <sourceDirectory>src/main/java</sourceDirectory> <resources> <resource> - <directory>src/resources</directory> - </resource> - <resource> <directory>.</directory> </resource> </resources> <plugins> <plugin> - <groupId>org.scala-tools</groupId> - <artifactId>maven-scala-plugin</artifactId> - <version>2.15.2</version> - <executions> - <execution> - <id>compile</id> - <goals> - <goal>compile</goal> - </goals> - <phase>compile</phase> - </execution> - <execution> - <id>testCompile</id> - <goals> - <goal>testCompile</goal> - </goals> - <phase>test</phase> - </execution> - <execution> - <phase>process-resources</phase> - <goals> - <goal>compile</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.7</source> @@ -82,61 +54,6 @@ </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <version>2.18</version> - <!-- Note config is repeated in scalatest config --> - <configuration> - <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> - <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine> - <systemProperties> - <java.awt.headless>true</java.awt.headless> - </systemProperties> - <failIfNoTests>false</failIfNoTests> - </configuration> - </plugin> - <plugin> - <groupId>org.scalatest</groupId> - <artifactId>scalatest-maven-plugin</artifactId> - <version>1.0</version> - <!-- Note config is repeated in surefire config --> - <configuration> - <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> - <junitxml>.</junitxml> - <filereports>CarbonTestSuite.txt</filereports> - <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m - </argLine> - <stderr /> - <environmentVariables> - </environmentVariables> - <systemProperties> - <java.awt.headless>true</java.awt.headless> - </systemProperties> - </configuration> - <executions> - <execution> - <id>test</id> - <goals> - <goal>test</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-deploy-plugin</artifactId> - <configuration> - <skip>true</skip> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-install-plugin</artifactId> - <configuration> - <skip>true</skip> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <configuration> <shadedArtifactAttached>false</shadedArtifactAttached> http://git-wip-us.apache.org/repos/asf/carbondata/blob/e39b0a14/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java new file mode 100644 index 0000000..e88164c --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.List; +import java.util.Random; +import java.util.UUID; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +/** + * Writer Implementation to write Avro Record to carbondata file. + */ [email protected] +class AvroCarbonWriter extends CarbonWriter { + + private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter; + private TaskAttemptContext context; + private ObjectArrayWritable writable; + private Schema avroSchema; + + AvroCarbonWriter(CarbonLoadModel loadModel) throws IOException { + Configuration hadoopConf = new Configuration(); + CarbonTableOutputFormat.setLoadModel(hadoopConf, loadModel); + CarbonTableOutputFormat format = new CarbonTableOutputFormat(); + JobID jobId = new JobID(UUID.randomUUID().toString(), 0); + Random random = new Random(); + TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt()); + TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt()); + TaskAttemptContextImpl context = new TaskAttemptContextImpl(hadoopConf, attemptID); + this.recordWriter = format.getRecordWriter(context); + this.context = context; + this.writable = new ObjectArrayWritable(); + } + + private String[] avroToCsv(GenericData.Record avroRecord) { + if (avroSchema == null) { + avroSchema = avroRecord.getSchema(); + } + List<Schema.Field> fields = avroSchema.getFields(); + String[] csvField = new String[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + csvField[i] = avroFieldToString(fields.get(i), avroRecord.get(i)); + } + return csvField; + } + + private String avroFieldToString(Schema.Field fieldType, Object fieldValue) { + StringBuilder out = new StringBuilder(); + Schema.Type type = fieldType.schema().getType(); + switch (type) { + case BOOLEAN: + case INT: + case LONG: + case DOUBLE: + case STRING: + out.append(fieldValue.toString()); + break; + default: + throw new UnsupportedOperationException(); + // TODO: convert complex type + } + return out.toString(); + } + + /** + * Write single row data, input row is Avro Record + */ + @Override + public void write(Object object) throws IOException { + GenericData.Record record = (GenericData.Record) object; + + // convert Avro record to CSV String[] + String[] csvRecord = avroToCsv(record); + writable.set(csvRecord); + try { + recordWriter.write(NullWritable.get(), writable); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + /** + * Flush and close the writer + */ + @Override + public void close() throws IOException { + try { + recordWriter.close(context); + } catch (InterruptedException e) { + throw new IOException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/e39b0a14/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index 8734341..5be60c4 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -104,7 +104,23 @@ public class CarbonWriterBuilder { public CarbonWriter buildWriterForCSVInput() throws IOException, InvalidLoadOptionException { Objects.requireNonNull(schema, "schema should not be null"); Objects.requireNonNull(path, "path should not be null"); + CarbonLoadModel loadModel = createLoadModel(); + return new CSVCarbonWriter(loadModel); + } + + /** + * Build a {@link CarbonWriter}, which accepts Avro object + * @return + * @throws IOException + */ + public CarbonWriter buildWriterForAvroInput() throws IOException, InvalidLoadOptionException { + Objects.requireNonNull(schema, "schema should not be null"); + Objects.requireNonNull(path, "path should not be null"); + CarbonLoadModel loadModel = createLoadModel(); + return new AvroCarbonWriter(loadModel); + } + private CarbonLoadModel createLoadModel() throws IOException, InvalidLoadOptionException { // build CarbonTable using schema CarbonTable table = buildCarbonTable(); if (persistSchemaFile) { @@ -113,18 +129,7 @@ public class CarbonWriterBuilder { } // build LoadModel - CarbonLoadModel loadModel = buildLoadModel(table); - return new CSVCarbonWriter(loadModel); - } - - /** - * Build a {@link CarbonWriter}, which accepts Avro object - * @return - * @throws IOException - */ - public CarbonWriter buildWriterForAvroInput() throws IOException { - // TODO - throw new UnsupportedOperationException(); + return buildLoadModel(table); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/e39b0a14/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java new file mode 100644 index 0000000..25c34e0 --- /dev/null +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import org.apache.avro.generic.GenericData; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.CharEncoding; +import org.junit.Assert; +import org.junit.Test; + +import tech.allegro.schema.json2avro.converter.JsonAvroConverter; +import org.apache.avro.Schema; + +public class AvroCarbonWriterTest { + private String path = "./AvroCarbonWriterSuiteWriteFiles"; + + @Test + public void testWriteBasic() throws IOException { + FileUtils.deleteDirectory(new File(path)); + + // Avro schema + String avroSchema = + "{" + + " \"type\" : \"record\"," + + " \"name\" : \"Acme\"," + + " \"fields\" : [" + + "{ \"name\" : \"name\", \"type\" : \"string\" }," + + "{ \"name\" : \"age\", \"type\" : \"int\" }]" + + "}"; + + String json = "{\"name\":\"bob\", \"age\":10}"; + + // conversion to GenericData.Record + JsonAvroConverter converter = new JsonAvroConverter(); + GenericData.Record record = converter.convertToGenericDataRecord( + json.getBytes(CharEncoding.UTF_8), new Schema.Parser().parse(avroSchema)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.STRING); + + try { + CarbonWriter writer = CarbonWriter.builder() + .withSchema(new org.apache.carbondata.sdk.file.Schema(fields)) + .outputPath(path) + .buildWriterForAvroInput(); + + for (int i = 0; i < 100; i++) { + writer.write(record); + } + writer.close(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + + File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); + Assert.assertTrue(segmentFolder.exists()); + + File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + @Override public boolean accept(File pathname) { + return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); + } + }); + Assert.assertNotNull(dataFiles); + Assert.assertEquals(1, dataFiles.length); + + FileUtils.deleteDirectory(new File(path)); + } + + @Test + public void testWriteAllPrimitive() throws IOException { + // TODO + } + + @Test + public void testWriteNestedRecord() throws IOException { + // TODO + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/e39b0a14/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java deleted file mode 100644 index 0ac6f38..0000000 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.sdk.file; - -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.util.path.CarbonTablePath; - -import org.apache.commons.io.FileUtils; -import org.junit.Assert; -import org.junit.Test; - -/** - * Test suite for {@link CSVCarbonWriter} - */ -public class CSVCarbonWriterSuite { - - @Test - public void testWriteFiles() throws IOException { - String path = "./testWriteFiles"; - FileUtils.deleteDirectory(new File(path)); - - Field[] fields = new Field[2]; - fields[0] = new Field("name", DataTypes.STRING); - fields[1] = new Field("age", DataTypes.INT); - - writeFilesAndVerify(new Schema(fields), path); - - FileUtils.deleteDirectory(new File(path)); - } - - @Test - public void testWriteFilesJsonSchema() throws IOException { - String path = "./testWriteFilesJsonSchema"; - FileUtils.deleteDirectory(new File(path)); - - String schema = new StringBuilder() - .append("[ \n") - .append(" {\"name\":\"string\"},\n") - .append(" {\"age\":\"int\"},\n") - .append(" {\"height\":\"double\"}\n") - .append("]") - .toString(); - - writeFilesAndVerify(Schema.parseJson(schema), path); - - FileUtils.deleteDirectory(new File(path)); - } - - private void writeFilesAndVerify(Schema schema, String path) { - writeFilesAndVerify(schema, path, null); - } - - private void writeFilesAndVerify(Schema schema, String path, String[] sortColumns) { - writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1); - } - - private void writeFilesAndVerify(Schema schema, String path, boolean persistSchema) { - writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1); - } - - /** - * Invoke CarbonWriter API to write carbon files and assert the file is rewritten - * @param rows number of rows to write - * @param schema schema of the file - * @param path local write path - * @param sortColumns sort columns - * @param persistSchema true if want to persist schema file - * @param blockletSize blockletSize in the file, -1 for default size - * @param blockSize blockSize in the file, -1 for default size - */ - private void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns, - boolean persistSchema, int blockletSize, int blockSize) { - try { - CarbonWriterBuilder builder = CarbonWriter.builder() - .withSchema(schema) - .outputPath(path); - if (sortColumns != null) { - builder = builder.sortBy(sortColumns); - } - if (persistSchema) { - builder = builder.persistSchemaFile(true); - } - if (blockletSize != -1) { - builder = builder.withBlockletSize(blockletSize); - } - if (blockSize != -1) { - builder = builder.withBlockSize(blockSize); - } - - CarbonWriter writer = builder.buildWriterForCSVInput(); - - for (int i = 0; i < rows; i++) { - writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)}); - } - writer.close(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - - File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); - Assert.assertTrue(segmentFolder.exists()); - - File[] dataFiles = segmentFolder.listFiles(new FileFilter() { - @Override public boolean accept(File pathname) { - return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); - } - }); - Assert.assertNotNull(dataFiles); - Assert.assertTrue(dataFiles.length > 0); - } - - @Test - public void testAllPrimitiveDataType() throws IOException { - // TODO: write all data type and read by CarbonRecordReader to verify the content - String path = "./testWriteFiles"; - FileUtils.deleteDirectory(new File(path)); - - Field[] fields = new Field[9]; - fields[0] = new Field("stringField", DataTypes.STRING); - fields[1] = new Field("intField", DataTypes.INT); - fields[2] = new Field("shortField", DataTypes.SHORT); - fields[3] = new Field("longField", DataTypes.LONG); - fields[4] = new Field("doubleField", DataTypes.DOUBLE); - fields[5] = new Field("boolField", DataTypes.BOOLEAN); - fields[6] = new Field("dateField", DataTypes.DATE); - fields[7] = new Field("timeField", DataTypes.TIMESTAMP); - fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2)); - - try { - CarbonWriterBuilder builder = CarbonWriter.builder() - .withSchema(new Schema(fields)) - .outputPath(path); - - CarbonWriter writer = builder.buildWriterForCSVInput(); - - for (int i = 0; i < 100; i++) { - String[] row = new String[]{ - "robot" + (i % 10), - String.valueOf(i), - String.valueOf(i), - String.valueOf(Long.MAX_VALUE - i), - String.valueOf((double) i / 2), - String.valueOf(true), - "2019-03-02", - "2019-02-12 03:03:34" - }; - writer.write(row); - } - writer.close(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - - File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); - Assert.assertTrue(segmentFolder.exists()); - - File[] dataFiles = segmentFolder.listFiles(new FileFilter() { - @Override public boolean accept(File pathname) { - return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); - } - }); - Assert.assertNotNull(dataFiles); - Assert.assertTrue(dataFiles.length > 0); - - FileUtils.deleteDirectory(new File(path)); - } - - @Test - public void test2Blocklet() throws IOException { - String path = "./testWriteFiles"; - FileUtils.deleteDirectory(new File(path)); - - Field[] fields = new Field[2]; - fields[0] = new Field("name", DataTypes.STRING); - fields[1] = new Field("age", DataTypes.INT); - - writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 1, 100); - - // TODO: implement reader to verify the number of blocklet in the file - - FileUtils.deleteDirectory(new File(path)); - } - - @Test - public void test2Block() throws IOException { - String path = "./testWriteFiles"; - FileUtils.deleteDirectory(new File(path)); - - Field[] fields = new Field[2]; - fields[0] = new Field("name", DataTypes.STRING); - fields[1] = new Field("age", DataTypes.INT); - - writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 2, 2); - - File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); - File[] dataFiles = segmentFolder.listFiles(new FileFilter() { - @Override public boolean accept(File pathname) { - return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); - } - }); - Assert.assertNotNull(dataFiles); - Assert.assertEquals(2, dataFiles.length); - - FileUtils.deleteDirectory(new File(path)); - } - - @Test - public void testSortColumns() throws IOException { - String path = "./testWriteFiles"; - FileUtils.deleteDirectory(new File(path)); - - Field[] fields = new Field[2]; - fields[0] = new Field("name", DataTypes.STRING); - fields[1] = new Field("age", DataTypes.INT); - - writeFilesAndVerify(new Schema(fields), path, new String[]{"name"}); - - // TODO: implement reader and verify the data is sorted - - FileUtils.deleteDirectory(new File(path)); - } - - @Test - public void testPartitionOutput() { - // TODO: test write data with partition - } - - @Test - public void testSchemaPersistence() throws IOException { - String path = "./testWriteFiles"; - FileUtils.deleteDirectory(new File(path)); - - Field[] fields = new Field[2]; - fields[0] = new Field("name", DataTypes.STRING); - fields[1] = new Field("age", DataTypes.INT); - - writeFilesAndVerify(new Schema(fields), path, true); - - String schemaFile = CarbonTablePath.getSchemaFilePath(path); - Assert.assertTrue(new File(schemaFile).exists()); - - FileUtils.deleteDirectory(new File(path)); - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/e39b0a14/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java new file mode 100644 index 0000000..2281fe6 --- /dev/null +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import org.apache.commons.io.FileUtils; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test suite for {@link CSVCarbonWriter} + */ +public class CSVCarbonWriterTest { + + @Test + public void testWriteFiles() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + writeFilesAndVerify(new Schema(fields), path); + + FileUtils.deleteDirectory(new File(path)); + } + + @Test + public void testWriteFilesJsonSchema() throws IOException { + String path = "./testWriteFilesJsonSchema"; + FileUtils.deleteDirectory(new File(path)); + + String schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString(); + + writeFilesAndVerify(Schema.parseJson(schema), path); + + FileUtils.deleteDirectory(new File(path)); + } + + private void writeFilesAndVerify(Schema schema, String path) { + writeFilesAndVerify(schema, path, null); + } + + private void writeFilesAndVerify(Schema schema, String path, String[] sortColumns) { + writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1); + } + + private void writeFilesAndVerify(Schema schema, String path, boolean persistSchema) { + writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1); + } + + /** + * Invoke CarbonWriter API to write carbon files and assert the file is rewritten + * @param rows number of rows to write + * @param schema schema of the file + * @param path local write path + * @param sortColumns sort columns + * @param persistSchema true if want to persist schema file + * @param blockletSize blockletSize in the file, -1 for default size + * @param blockSize blockSize in the file, -1 for default size + */ + private void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns, + boolean persistSchema, int blockletSize, int blockSize) { + try { + CarbonWriterBuilder builder = CarbonWriter.builder() + .withSchema(schema) + .outputPath(path); + if (sortColumns != null) { + builder = builder.sortBy(sortColumns); + } + if (persistSchema) { + builder = builder.persistSchemaFile(true); + } + if (blockletSize != -1) { + builder = builder.withBlockletSize(blockletSize); + } + if (blockSize != -1) { + builder = builder.withBlockSize(blockSize); + } + + CarbonWriter writer = builder.buildWriterForCSVInput(); + + for (int i = 0; i < rows; i++) { + writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)}); + } + writer.close(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + + File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); + Assert.assertTrue(segmentFolder.exists()); + + File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + @Override public boolean accept(File pathname) { + return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); + } + }); + Assert.assertNotNull(dataFiles); + Assert.assertTrue(dataFiles.length > 0); + } + + @Test + public void testAllPrimitiveDataType() throws IOException { + // TODO: write all data type and read by CarbonRecordReader to verify the content + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[9]; + fields[0] = new Field("stringField", DataTypes.STRING); + fields[1] = new Field("intField", DataTypes.INT); + fields[2] = new Field("shortField", DataTypes.SHORT); + fields[3] = new Field("longField", DataTypes.LONG); + fields[4] = new Field("doubleField", DataTypes.DOUBLE); + fields[5] = new Field("boolField", DataTypes.BOOLEAN); + fields[6] = new Field("dateField", DataTypes.DATE); + fields[7] = new Field("timeField", DataTypes.TIMESTAMP); + fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2)); + + try { + CarbonWriterBuilder builder = CarbonWriter.builder() + .withSchema(new Schema(fields)) + .outputPath(path); + + CarbonWriter writer = builder.buildWriterForCSVInput(); + + for (int i = 0; i < 100; i++) { + String[] row = new String[]{ + "robot" + (i % 10), + String.valueOf(i), + String.valueOf(i), + String.valueOf(Long.MAX_VALUE - i), + String.valueOf((double) i / 2), + String.valueOf(true), + "2019-03-02", + "2019-02-12 03:03:34" + }; + writer.write(row); + } + writer.close(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + + File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); + Assert.assertTrue(segmentFolder.exists()); + + File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + @Override public boolean accept(File pathname) { + return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); + } + }); + Assert.assertNotNull(dataFiles); + Assert.assertTrue(dataFiles.length > 0); + + FileUtils.deleteDirectory(new File(path)); + } + + @Test + public void test2Blocklet() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 1, 100); + + // TODO: implement reader to verify the number of blocklet in the file + + FileUtils.deleteDirectory(new File(path)); + } + + @Test + public void test2Block() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 2, 2); + + File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); + File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + @Override public boolean accept(File pathname) { + return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); + } + }); + Assert.assertNotNull(dataFiles); + Assert.assertEquals(2, dataFiles.length); + + FileUtils.deleteDirectory(new File(path)); + } + + @Test + public void testSortColumns() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + writeFilesAndVerify(new Schema(fields), path, new String[]{"name"}); + + // TODO: implement reader and verify the data is sorted + + FileUtils.deleteDirectory(new File(path)); + } + + @Test + public void testPartitionOutput() { + // TODO: test write data with partition + } + + @Test + public void testSchemaPersistence() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + writeFilesAndVerify(new Schema(fields), path, true); + + String schemaFile = CarbonTablePath.getSchemaFilePath(path); + Assert.assertTrue(new File(schemaFile).exists()); + + FileUtils.deleteDirectory(new File(path)); + } + +}
