http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala index 9ccc02c..755a7df 100644 --- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala +++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala @@ -68,9 +68,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA //getCanonicalPath gives path with \, but the code expects /. writerPath = writerPath.replace("\\", "/"); - val filePath = writerPath + "/Fact/Part0/Segment_null/" - - def buildTestData(persistSchema:Boolean) = { + def buildTestData(): Any = { FileUtils.deleteDirectory(new File(writerPath)) @@ -83,17 +81,9 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA .toString() try { - val builder = CarbonWriter.builder().isTransactionalTable(true) + val builder = CarbonWriter.builder() val writer = - if (persistSchema) { - builder.persistSchemaFile(true) - builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), spark - .sparkContext.hadoopConfiguration) - } else { - builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), spark - .sparkContext.hadoopConfiguration) - } - + builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).build() var i = 0 while (i < 100) { writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) @@ -126,19 +116,19 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA //TO DO, need to remove segment dependency and tableIdentifier Dependency test("read carbondata files (sdk Writer Output) using the SparkCarbonFileFormat ") { - buildTestData(false) - assert(new File(filePath).exists()) + buildTestData() + assert(new File(writerPath).exists()) spark.sql("DROP TABLE IF EXISTS sdkOutputTable") //data source file format if (SparkUtil.isSparkVersionEqualTo("2.1")) { //data source file format - spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$filePath') """) + spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """) } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { //data source file format spark.sql( s"""CREATE TABLE sdkOutputTable USING carbon LOCATION - |'$filePath' """.stripMargin) + |'$writerPath' """.stripMargin) } else{ // TO DO } @@ -169,55 +159,55 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA spark.sql("DROP TABLE sdkOutputTable") // drop table should not delete the files - assert(new File(filePath).exists()) + assert(new File(writerPath).exists()) cleanTestData() } test("Running SQL directly and read carbondata files (sdk Writer Output) using the SparkCarbonFileFormat ") { - buildTestData(false) - assert(new File(filePath).exists()) + buildTestData() + assert(new File(writerPath).exists()) spark.sql("DROP TABLE IF EXISTS sdkOutputTable") //data source file format if (SparkUtil.isSparkVersionEqualTo("2.1")) { //data source file format - spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$filePath') """) + spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """) } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { //data source file format spark.sql( s"""CREATE TABLE sdkOutputTable USING carbon LOCATION - |'$filePath' """.stripMargin) + |'$writerPath' """.stripMargin) } else { // TO DO } - val directSQL = spark.sql(s"""select * FROM carbon.`$filePath`""".stripMargin) + val directSQL = spark.sql(s"""select * FROM carbon.`$writerPath`""".stripMargin) directSQL.show(false) TestUtil.checkAnswer(spark.sql("select * from sdkOutputTable"), directSQL) spark.sql("DROP TABLE sdkOutputTable") // drop table should not delete the files - assert(new File(filePath).exists()) + assert(new File(writerPath).exists()) cleanTestData() } // TODO: Make the sparkCarbonFileFormat to work without index file test("Read sdk writer output file without Carbondata file should fail") { - buildTestData(false) + buildTestData() deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT) - assert(new File(filePath).exists()) + assert(new File(writerPath).exists()) spark.sql("DROP TABLE IF EXISTS sdkOutputTable") val exception = intercept[Exception] { // data source file format if (SparkUtil.isSparkVersionEqualTo("2.1")) { //data source file format - spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$filePath') """) + spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """) } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { //data source file format spark.sql( s"""CREATE TABLE sdkOutputTable USING carbon LOCATION - |'$filePath' """.stripMargin) + |'$writerPath' """.stripMargin) } else{ // TO DO } @@ -226,28 +216,28 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA .contains("CarbonData file is not present in the table location")) // drop table should not delete the files - assert(new File(filePath).exists()) + assert(new File(writerPath).exists()) cleanTestData() } test("Read sdk writer output file without any file should fail") { - buildTestData(false) + buildTestData() deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT) deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT) - assert(new File(filePath).exists()) + assert(new File(writerPath).exists()) spark.sql("DROP TABLE IF EXISTS sdkOutputTable") val exception = intercept[Exception] { //data source file format if (SparkUtil.isSparkVersionEqualTo("2.1")) { //data source file format - spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$filePath') """) + spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """) } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { //data source file format spark.sql( s"""CREATE TABLE sdkOutputTable USING carbon LOCATION - |'$filePath' """.stripMargin) + |'$writerPath' """.stripMargin) } else{ // TO DO } @@ -258,13 +248,13 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA .contains("CarbonData file is not present in the table location")) // drop table should not delete the files - assert(new File(filePath).exists()) + assert(new File(writerPath).exists()) cleanTestData() } test("Read sdk writer output file withSchema") { - buildTestData(true) - assert(new File(filePath).exists()) + buildTestData() + assert(new File(writerPath).exists()) spark.sql("DROP TABLE IF EXISTS sdkOutputTable") //data source file format @@ -272,12 +262,12 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA if (SparkUtil.isSparkVersionEqualTo("2.1")) { //data source file format - spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$filePath') """) + spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """) } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { //data source file format spark.sql( s"""CREATE TABLE sdkOutputTable USING carbon LOCATION - |'$filePath' """.stripMargin) + |'$writerPath' """.stripMargin) } else{ // TO DO } @@ -309,24 +299,24 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA spark.sql("DROP TABLE sdkOutputTable") // drop table should not delete the files - assert(new File(filePath).exists()) + assert(new File(writerPath).exists()) cleanTestData() } test("Read sdk writer output file without index file should not fail") { - buildTestData(false) + buildTestData() deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT) - assert(new File(filePath).exists()) + assert(new File(writerPath).exists()) spark.sql("DROP TABLE IF EXISTS sdkOutputTable") if (SparkUtil.isSparkVersionEqualTo("2.1")) { //data source file format - spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$filePath') """) + spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """) } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { //data source file format spark.sql( s"""CREATE TABLE sdkOutputTable USING carbon LOCATION - |'$filePath' """.stripMargin) + |'$writerPath' """.stripMargin) } else{ // TO DO } @@ -336,7 +326,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA spark.sql("DROP TABLE sdkOutputTable") // drop table should not delete the files - assert(new File(filePath).exists()) + assert(new File(writerPath).exists()) cleanTestData() } test("Read data having multi blocklet and validate min max flag") { @@ -388,7 +378,9 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA val emailDataBlocklet2 = "Email for testing min max for allowed chars" try{ val options=Map("bad_records_action"->"FORCE","complex_delimiter_level_1"->"$").asJava - val writer=CarbonWriter.builder().outputPath(writerPath).withBlockletSize(16).sortBy(Array("myid","ingestion_time","event_id")).withLoadOptions(options).buildWriterForCSVInput(new Schema(fields),spark.sessionState.newHadoopConf()) + val writer = CarbonWriter.builder().outputPath(writerPath).withBlockletSize(16) + .sortBy(Array("myid", "ingestion_time", "event_id")).withLoadOptions(options) + .withCsvInput(new Schema(fields)).build() val timeF=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val date_F=new SimpleDateFormat("yyyy-MM-dd") for(i<- 1 to recordsInBlocklet1){ @@ -445,8 +437,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA .append("]") .toString() val builder = CarbonWriter.builder() - val writer = builder.outputPath(writerPath) - .buildWriterForCSVInput(Schema.parseJson(schema), spark.sessionState.newHadoopConf()) + val writer = builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).build() for (i <- 0 until 3) { // write a varchar with 75,000 length writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(75000), i.toString))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java index 3ae3d59..af3480f 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.UUID; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.annotations.InterfaceStability; @@ -34,7 +33,6 @@ import org.apache.carbondata.core.util.ThreadLocalSessionInfo; import org.apache.carbondata.hadoop.api.CarbonFileInputFormat; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; @@ -51,7 +49,7 @@ public class CarbonReaderBuilder { private String[] projectionColumns; private Expression filterExpression; private String tableName; - private boolean isTransactionalTable; + private Configuration hadoopConf; /** * Construct a CarbonReaderBuilder with table path and table name @@ -79,21 +77,6 @@ public class CarbonReaderBuilder { } /** - * Configure the transactional status of table - * If set to false, then reads the carbondata and carbonindex files from a flat folder structure. - * If set to true, then reads the carbondata and carbonindex files from segment folder structure. - * Default value is false - * - * @param isTransactionalTable whether is transactional table or not - * @return CarbonReaderBuilder object - */ - public CarbonReaderBuilder isTransactionalTable(boolean isTransactionalTable) { - Objects.requireNonNull(isTransactionalTable); - this.isTransactionalTable = isTransactionalTable; - return this; - } - - /** * Configure the filter expression for carbon reader * * @param filterExpression filter expression @@ -106,72 +89,19 @@ public class CarbonReaderBuilder { } /** - * Set the access key for S3 - * - * @param key the string of access key for different S3 type,like: fs.s3a.access.key - * @param value the value of access key - * @return CarbonWriterBuilder object - */ - public CarbonReaderBuilder setAccessKey(String key, String value) { - FileFactory.getConfiguration().set(key, value); - return this; - } - - /** - * Set the access key for S3. - * - * @param value the value of access key - * @return CarbonWriterBuilder object - */ - public CarbonReaderBuilder setAccessKey(String value) { - return setAccessKey(Constants.ACCESS_KEY, value); - } - - /** - * Set the secret key for S3 - * - * @param key the string of secret key for different S3 type,like: fs.s3a.secret.key - * @param value the value of secret key - * @return CarbonWriterBuilder object - */ - public CarbonReaderBuilder setSecretKey(String key, String value) { - FileFactory.getConfiguration().set(key, value); - return this; - } - - /** - * Set the secret key for S3 - * - * @param value the value of secret key - * @return CarbonWriterBuilder object - */ - public CarbonReaderBuilder setSecretKey(String value) { - return setSecretKey(Constants.SECRET_KEY, value); - } - - /** - * Set the endpoint for S3 + * To support hadoop configuration * - * @param key the string of endpoint for different S3 type,like: fs.s3a.endpoint - * @param value the value of endpoint - * @return CarbonWriterBuilder object + * @param conf hadoop configuration support, can set s3a AK,SK,end point and other conf with this + * @return updated CarbonReaderBuilder */ - public CarbonReaderBuilder setEndPoint(String key, String value) { - FileFactory.getConfiguration().set(key, value); + public CarbonReaderBuilder withHadoopConf(Configuration conf) { + if (conf != null) { + this.hadoopConf = conf; + } return this; } /** - * Set the endpoint for S3 - * - * @param value the value of endpoint - * @return CarbonWriterBuilder object - */ - public CarbonReaderBuilder setEndPoint(String value) { - return setEndPoint(Constants.ENDPOINT, value); - } - - /** * Build CarbonReader * * @param <T> @@ -179,22 +109,19 @@ public class CarbonReaderBuilder { * @throws IOException * @throws InterruptedException */ - public <T> CarbonReader<T> build(Configuration configuration) + public <T> CarbonReader<T> build() throws IOException, InterruptedException { - // DB name is not applicable for SDK reader as, table will be never registered. + if (hadoopConf == null) { + hadoopConf = FileFactory.getConfiguration(); + } CarbonTable table; - if (isTransactionalTable) { - table = CarbonTable - .buildFromTablePath(tableName, "default", tablePath, UUID.randomUUID().toString()); + if (filterExpression != null) { + table = CarbonTable.buildTable(tablePath, tableName, hadoopConf); } else { - if (filterExpression != null) { - table = CarbonTable.buildTable(tablePath, tableName, configuration); - } else { - table = CarbonTable.buildDummyTable(tablePath); - } + table = CarbonTable.buildDummyTable(tablePath); } final CarbonFileInputFormat format = new CarbonFileInputFormat(); - final Job job = new Job(configuration); + final Job job = new Job(hadoopConf); format.setTableInfo(job.getConfiguration(), table.getTableInfo()); format.setTablePath(job.getConfiguration(), table.getTablePath()); format.setTableName(job.getConfiguration(), table.getTableName()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index 02434fc..87930f6 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -33,26 +33,19 @@ import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.annotations.InterfaceStability; import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.metadata.CarbonMetadata; -import org.apache.carbondata.core.metadata.converter.SchemaConverter; -import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.datatype.MapType; import org.apache.carbondata.core.metadata.datatype.StructField; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.metadata.schema.table.TableSchema; import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonTablePath; -import org.apache.carbondata.core.writer.ThriftWriter; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.s3a.Constants; /** * Builder for {@link CarbonWriter} @@ -63,15 +56,20 @@ public class CarbonWriterBuilder { private Schema schema; private String path; private String[] sortColumns; - private boolean persistSchemaFile; private int blockletSize; private int blockSize; - private boolean isTransactionalTable; private long timestamp; private Map<String, String> options; private String taskNo; private int localDictionaryThreshold; private boolean isLocalDictionaryEnabled; + private short numOfThreads; + private Configuration hadoopConf; + private enum WRITER_TYPE { + CSV, AVRO, JSON + } + + private WRITER_TYPE writerType; /** * Sets the output path of the writer builder @@ -114,102 +112,6 @@ public class CarbonWriterBuilder { return this; } - - - /** - * If set, create a schema file in metadata folder. - * @param persist is a boolean value, If set to true, creates a schema file in metadata folder. - * By default set to false. will not create metadata folder - * @return updated CarbonWriterBuilder - */ - public CarbonWriterBuilder persistSchemaFile(boolean persist) { - this.persistSchemaFile = persist; - return this; - } - - /** - * If set false, writes the carbondata and carbonindex files in a flat folder structure - * @param isTransactionalTable is a boolelan value - * If set to false, then writes the carbondata and carbonindex files - * in a flat folder structure. - * If set to true, then writes the carbondata and carbonindex files - * in segment folder structure. - * By default set to false. - * @return updated CarbonWriterBuilder - */ - public CarbonWriterBuilder isTransactionalTable(boolean isTransactionalTable) { - Objects.requireNonNull(isTransactionalTable, "Transactional Table should not be null"); - this.isTransactionalTable = isTransactionalTable; - return this; - } - - /** - * Set the access key for S3 - * - * @param key the string of access key for different S3 type,like: fs.s3a.access.key - * @param value the value of access key - * @return CarbonWriterBuilder - */ - public CarbonWriterBuilder setAccessKey(String key, String value) { - FileFactory.getConfiguration().set(key, value); - return this; - } - - /** - * Set the access key for S3. - * - * @param value the value of access key - * @return CarbonWriterBuilder - */ - public CarbonWriterBuilder setAccessKey(String value) { - return setAccessKey(Constants.ACCESS_KEY, value); - } - - /** - * Set the secret key for S3 - * - * @param key the string of secret key for different S3 type,like: fs.s3a.secret.key - * @param value the value of secret key - * @return CarbonWriterBuilder - */ - public CarbonWriterBuilder setSecretKey(String key, String value) { - FileFactory.getConfiguration().set(key, value); - return this; - } - - /** - * Set the secret key for S3 - * - * @param value the value of secret key - * @return CarbonWriterBuilder - */ - public CarbonWriterBuilder setSecretKey(String value) { - return setSecretKey(Constants.SECRET_KEY, value); - } - - /** - * Set the endpoint for S3 - * - * @param key the string of endpoint for different S3 type,like: fs.s3a.endpoint - * @param value the value of endpoint - * @return CarbonWriterBuilder - */ - public CarbonWriterBuilder setEndPoint(String key, String value) { - FileFactory.getConfiguration().set(key, value); - return this; - } - - /** - * Set the endpoint for S3 - * - * @param value the value of endpoint - * @return CarbonWriterBuilder - */ - public CarbonWriterBuilder setEndPoint(String value) { - FileFactory.getConfiguration().set(Constants.ENDPOINT, value); - return this; - } - /** * to set the timestamp in the carbondata and carbonindex index files * @param timestamp is a timestamp to be used in the carbondata and carbonindex index files. @@ -286,6 +188,7 @@ public class CarbonWriterBuilder { * c. local_dictionary_threshold -- positive value, default is 10000 * d. local_dictionary_enable -- true / false. Default is false * e. sort_columns -- comma separated column. "c1,c2". Default all dimensions are sorted. + * If empty string "" is passed. No columns are sorted * j. sort_scope -- "local_sort", "no_sort", "batch_sort". default value is "local_sort" * k. long_string_columns -- comma separated string columns which are more than 32k length. * default value is null. @@ -316,7 +219,12 @@ public class CarbonWriterBuilder { this.enableLocalDictionary((entry.getValue().equalsIgnoreCase("true"))); } else if (entry.getKey().equalsIgnoreCase("sort_columns")) { //sort columns - String[] sortColumns = entry.getValue().split(","); + String[] sortColumns; + if (entry.getValue().trim().isEmpty()) { + sortColumns = new String[0]; + } else { + sortColumns = entry.getValue().split(","); + } this.sortBy(sortColumns); } else if (entry.getKey().equalsIgnoreCase("sort_scope")) { this.withSortScope(entry); @@ -328,6 +236,36 @@ public class CarbonWriterBuilder { } /** + * To make sdk writer thread safe. + * + * @param numOfThreads should number of threads in which writer is called in multi-thread scenario + * default sdk writer is not thread safe. + * can use one writer instance in one thread only. + * @return updated CarbonWriterBuilder + */ + public CarbonWriterBuilder withThreadSafe(short numOfThreads) { + if (numOfThreads < 1) { + throw new IllegalArgumentException("number of threads cannot be lesser than 1. " + + "suggest to keep two times the number of cores available"); + } + this.numOfThreads = numOfThreads; + return this; + } + + /** + * To support hadoop configuration + * + * @param conf hadoop configuration support, can set s3a AK,SK,end point and other conf with this + * @return updated CarbonWriterBuilder + */ + public CarbonWriterBuilder withHadoopConf(Configuration conf) { + if (conf != null) { + this.hadoopConf = conf; + } + return this; + } + + /** * To set the carbondata file size in MB between 1MB-2048MB * @param blockSize is size in MB between 1MB to 2048 MB * default value is 1024 MB @@ -379,138 +317,79 @@ public class CarbonWriterBuilder { } /** - * This writer is not thread safe, - * use buildThreadSafeWriterForCSVInput in multi thread environment - * Build a {@link CarbonWriter}, which accepts row in CSV format - * @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema} - * @return CSVCarbonWriter - * @throws IOException - * @throws InvalidLoadOptionException - */ - public CarbonWriter buildWriterForCSVInput(Schema schema, Configuration configuration) - throws IOException, InvalidLoadOptionException { - Objects.requireNonNull(schema, "schema should not be null"); - Objects.requireNonNull(path, "path should not be null"); - this.schema = schema; - CarbonLoadModel loadModel = buildLoadModel(schema); - return new CSVCarbonWriter(loadModel, configuration); - } - - /** + * to build a {@link CarbonWriter}, which accepts row in CSV format * - * Build a {@link CarbonWriter}, which accepts row in CSV format * @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema} - * @param numOfThreads number of threads() in which .write will be called. - * @return CSVCarbonWriter - * @throws IOException - * @throws InvalidLoadOptionException + * @return CarbonWriterBuilder */ - public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfThreads, - Configuration configuration) throws IOException, InvalidLoadOptionException { + public CarbonWriterBuilder withCsvInput(Schema schema) { Objects.requireNonNull(schema, "schema should not be null"); - Objects.requireNonNull(numOfThreads, "numOfThreads should not be null"); - Objects.requireNonNull(path, "path should not be null"); this.schema = schema; - if (numOfThreads <= 0) { - throw new IllegalArgumentException(" numOfThreads must be greater than 0"); - } - CarbonLoadModel loadModel = buildLoadModel(schema); - loadModel.setSdkWriterCores(numOfThreads); - return new CSVCarbonWriter(loadModel, configuration); - } - - /** - * This writer is not thread safe, - * use buildThreadSafeWriterForAvroInput in multi thread environment - * Build a {@link CarbonWriter}, which accepts Avro object - * @param avroSchema avro Schema object {org.apache.avro.Schema} - * @return AvroCarbonWriter - * @throws IOException - * @throws InvalidLoadOptionException - */ - public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema avroSchema, - Configuration configuration) throws IOException, InvalidLoadOptionException { - this.schema = AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema); - Objects.requireNonNull(schema, "schema should not be null"); - Objects.requireNonNull(path, "path should not be null"); - CarbonLoadModel loadModel = buildLoadModel(schema); - // AVRO records are pushed to Carbon as Object not as Strings. This was done in order to - // handle multi level complex type support. As there are no conversion converter step is - // removed from the load. LoadWithoutConverter flag is going to point to the Loader Builder - // which will skip Conversion Step. - loadModel.setLoadWithoutConverterStep(true); - return new AvroCarbonWriter(loadModel, configuration); + this.writerType = WRITER_TYPE.CSV; + return this; } /** - * Build a {@link CarbonWriter}, which accepts Avro object + * to build a {@link CarbonWriter}, which accepts Avro object + * * @param avroSchema avro Schema object {org.apache.avro.Schema} - * @param numOfThreads number of threads() in which .write will be called. - * @return AvroCarbonWriter - * @throws IOException - * @throws InvalidLoadOptionException + * @return CarbonWriterBuilder */ - public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema avroSchema, - short numOfThreads, Configuration configuration) - throws IOException, InvalidLoadOptionException { + public CarbonWriterBuilder withAvroInput(org.apache.avro.Schema avroSchema) { + Objects.requireNonNull(avroSchema, "Avro schema should not be null"); this.schema = AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema); - Objects.requireNonNull(schema, "schema should not be null"); - Objects.requireNonNull(path, "path should not be null"); - Objects.requireNonNull(numOfThreads, "numOfThreads should not be null"); - if (numOfThreads <= 0) { - throw new IllegalArgumentException(" numOfThreads must be greater than 0"); - } - CarbonLoadModel loadModel = buildLoadModel(schema); - // AVRO records are pushed to Carbon as Object not as Strings. This was done in order to - // handle multi level complex type support. As there are no conversion converter step is - // removed from the load. LoadWithoutConverter flag is going to point to the Loader Builder - // which will skip Conversion Step. - loadModel.setLoadWithoutConverterStep(true); - loadModel.setSdkWriterCores(numOfThreads); - return new AvroCarbonWriter(loadModel, configuration); + this.writerType = WRITER_TYPE.AVRO; + return this; } /** - * This writer is not thread safe, - * use buildThreadSafeWriterForJsonInput in multi thread environment - * Build a {@link CarbonWriter}, which accepts Json object + * to build a {@link CarbonWriter}, which accepts Json object + * * @param carbonSchema carbon Schema object - * @return JsonCarbonWriter - * @throws IOException - * @throws InvalidLoadOptionException + * @return CarbonWriterBuilder */ - public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema, Configuration configuration) - throws IOException, InvalidLoadOptionException { + public CarbonWriterBuilder withJsonInput(Schema carbonSchema) { Objects.requireNonNull(carbonSchema, "schema should not be null"); - Objects.requireNonNull(path, "path should not be null"); this.schema = carbonSchema; - CarbonLoadModel loadModel = buildLoadModel(carbonSchema); - loadModel.setJsonFileLoad(true); - return new JsonCarbonWriter(loadModel, configuration); + this.writerType = WRITER_TYPE.JSON; + return this; } /** - * Can use this writer in multi-thread instance. + * Build a {@link CarbonWriter} + * This writer is not thread safe, + * use withThreadSafe() configuration in multi thread environment * - * Build a {@link CarbonWriter}, which accepts Json object - * @param carbonSchema carbon Schema object - * @param numOfThreads number of threads() in which .write will be called. - * @return JsonCarbonWriter + * @return CarbonWriter {AvroCarbonWriter/CSVCarbonWriter/JsonCarbonWriter based on Input Type } * @throws IOException * @throws InvalidLoadOptionException */ - public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema carbonSchema, short numOfThreads, - Configuration configuration) throws IOException, InvalidLoadOptionException { - Objects.requireNonNull(carbonSchema, "schema should not be null"); + public CarbonWriter build() throws IOException, InvalidLoadOptionException { Objects.requireNonNull(path, "path should not be null"); - if (numOfThreads <= 0) { - throw new IllegalArgumentException(" numOfThreads must be greater than 0"); + if (this.writerType == null) { + throw new IOException( + "Writer type is not set, use withCsvInput() or withAvroInput() or withJsonInput() " + + "API based on input"); } - this.schema = carbonSchema; CarbonLoadModel loadModel = buildLoadModel(schema); - loadModel.setJsonFileLoad(true); loadModel.setSdkWriterCores(numOfThreads); - return new JsonCarbonWriter(loadModel, configuration); + if (hadoopConf == null) { + hadoopConf = FileFactory.getConfiguration(); + } + if (this.writerType == WRITER_TYPE.AVRO) { + // AVRO records are pushed to Carbon as Object not as Strings. This was done in order to + // handle multi level complex type support. As there are no conversion converter step is + // removed from the load. LoadWithoutConverter flag is going to point to the Loader Builder + // which will skip Conversion Step. + loadModel.setLoadWithoutConverterStep(true); + return new AvroCarbonWriter(loadModel, hadoopConf); + } else if (this.writerType == WRITER_TYPE.JSON) { + loadModel.setJsonFileLoad(true); + return new JsonCarbonWriter(loadModel, hadoopConf); + } else { + // CSV + return new CSVCarbonWriter(loadModel, hadoopConf); + } } private void setCsvHeader(CarbonLoadModel model) { @@ -542,10 +421,6 @@ public class CarbonWriterBuilder { this.schema = updateSchemaFields(carbonSchema, longStringColumns); // build CarbonTable using schema CarbonTable table = buildCarbonTable(); - if (persistSchemaFile) { - // we are still using the traditional carbon table folder structure - persistSchemaFile(table, CarbonTablePath.getSchemaFilePath(path)); - } // build LoadModel return buildLoadModel(table, timestamp, taskNo, options); } @@ -614,18 +489,13 @@ public class CarbonWriterBuilder { tableSchemaBuilder.setSortColumns(Arrays.asList(sortColumnsSchemaList)); String tableName; String dbName; - if (isTransactionalTable) { - tableName = "_tempTable"; - dbName = "_tempDB"; - } else { - dbName = ""; - tableName = "_tempTable_" + String.valueOf(timestamp); - } + dbName = ""; + tableName = "_tempTable_" + String.valueOf(timestamp); TableSchema schema = tableSchemaBuilder.build(); schema.setTableName(tableName); CarbonTable table = CarbonTable.builder().tableName(schema.getTableName()).databaseName(dbName).tablePath(path) - .tableSchema(schema).isTransactionalTable(isTransactionalTable).build(); + .tableSchema(schema).isTransactionalTable(false).build(); return table; } @@ -708,36 +578,6 @@ public class CarbonWriterBuilder { } /** - * Save the schema of the {@param table} to {@param persistFilePath} - * @param table table object containing schema - * @param persistFilePath absolute file path with file name - */ - private void persistSchemaFile(CarbonTable table, String persistFilePath) throws IOException { - TableInfo tableInfo = table.getTableInfo(); - String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(persistFilePath); - CarbonMetadata.getInstance().loadTableMetadata(tableInfo); - SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); - org.apache.carbondata.format.TableInfo thriftTableInfo = - schemaConverter.fromWrapperToExternalTableInfo( - tableInfo, - tableInfo.getDatabaseName(), - tableInfo.getFactTable().getTableName()); - org.apache.carbondata.format.SchemaEvolutionEntry schemaEvolutionEntry = - new org.apache.carbondata.format.SchemaEvolutionEntry( - tableInfo.getLastUpdatedTime()); - thriftTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history() - .add(schemaEvolutionEntry); - FileFactory.FileType fileType = FileFactory.getFileType(schemaMetadataPath); - if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) { - FileFactory.mkdirs(schemaMetadataPath, fileType); - } - ThriftWriter thriftWriter = new ThriftWriter(persistFilePath, false); - thriftWriter.open(); - thriftWriter.write(thriftTableInfo); - thriftWriter.close(); - } - - /** * Build a {@link CarbonLoadModel} */ private CarbonLoadModel buildLoadModel(CarbonTable table, long timestamp, String taskNo, http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/store/sdk/src/main/java/org/apache/carbondata/sdk/file/TestUtil.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/TestUtil.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/TestUtil.java index f4c2408..dc6e012 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/TestUtil.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/TestUtil.java @@ -29,7 +29,6 @@ import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; @@ -68,25 +67,19 @@ public class TestUtil { } } - static void writeFilesAndVerify(Schema schema, String path) { - writeFilesAndVerify(schema, path, null); - } - static void writeFilesAndVerify(Schema schema, String path, String[] sortColumns) { writeFilesAndVerify( - 100, schema, path, sortColumns, false, -1, -1, true); + 100, schema, path, sortColumns, -1, -1); } - public static void writeFilesAndVerify( - int rows, Schema schema, String path, boolean persistSchema) { + public static void writeFilesAndVerify(int rows, Schema schema, String path) { writeFilesAndVerify( - rows, schema, path, null, persistSchema, -1, -1, true); + rows, schema, path, null, -1, -1); } - public static void writeFilesAndVerify(Schema schema, String path, boolean persistSchema, - boolean isTransactionalTable) { + public static void writeFilesAndVerify(Schema schema, String path) { writeFilesAndVerify( - 100, schema, path, null, persistSchema, -1, -1, isTransactionalTable); + 100, schema, path, null, -1, -1); } /** @@ -100,7 +93,7 @@ public class TestUtil { */ public static void writeFilesAndVerify( int rows, Schema schema, String path, boolean persistSchema, boolean isTransactionalTable) { - writeFilesAndVerify(rows, schema, path, null, persistSchema, -1, -1, isTransactionalTable); + writeFilesAndVerify(rows, schema, path, null, -1, -1); } /** @@ -109,23 +102,17 @@ public class TestUtil { * @param schema schema of the file * @param path local write path * @param sortColumns sort columns - * @param persistSchema true if want to persist schema file * @param blockletSize blockletSize in the file, -1 for default size * @param blockSize blockSize in the file, -1 for default size - * @param isTransactionalTable set to true if this is written for Transactional Table. */ public static void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns, - boolean persistSchema, int blockletSize, int blockSize, boolean isTransactionalTable) { + int blockletSize, int blockSize) { try { CarbonWriterBuilder builder = CarbonWriter.builder() - .isTransactionalTable(isTransactionalTable) .outputPath(path); if (sortColumns != null) { builder = builder.sortBy(sortColumns); } - if (persistSchema) { - builder = builder.persistSchemaFile(true); - } if (blockletSize != -1) { builder = builder.withBlockletSize(blockletSize); } @@ -133,7 +120,7 @@ public class TestUtil { builder = builder.withBlockSize(blockSize); } - CarbonWriter writer = builder.buildWriterForCSVInput(schema, configuration); + CarbonWriter writer = builder.withCsvInput(schema).build(); for (int i = 0; i < rows; i++) { writer.write(new String[]{ @@ -145,20 +132,7 @@ public class TestUtil { throw new RuntimeException(e); } - File segmentFolder = null; - if (isTransactionalTable) { - segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); - if (!segmentFolder.exists()) { - throw new RuntimeException("Test failed: file not exists"); - } - } else { - segmentFolder = new File(path); - if (!segmentFolder.exists()) { - throw new RuntimeException("Test failed: file not exists"); - } - } - - File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + File[] dataFiles = new File(path).listFiles(new FileFilter() { @Override public boolean accept(File pathname) { return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java b/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java index e43f750..ed2be51 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java +++ b/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java @@ -22,8 +22,11 @@ import java.util.HashMap; import java.util.Map; import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.converter.SchemaConverter; import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; +import org.apache.carbondata.core.metadata.schema.SchemaReader; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.util.CarbonUtil; @@ -42,12 +45,18 @@ abstract class MetaCachedCarbonStore implements CarbonStore { if (cache.containsKey(path)) { return cache.get(path); } - org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil - .readSchemaFile(CarbonTablePath.getSchemaFilePath(path)); - SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); - TableInfo tableInfo1 = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, "", "", ""); - tableInfo1.setTablePath(path); - CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo1); + String schemaPath = CarbonTablePath.getSchemaFilePath(path); + TableInfo tableInfo; + if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) { + tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(path), false); + } else { + org.apache.carbondata.format.TableInfo tableInfoFormat; + tableInfoFormat = CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(path)); + SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); + tableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfoFormat, "", "", ""); + tableInfo.setTablePath(path); + } + CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo); cache.put(path, table); return table; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java index abaa7f1..53ed90f 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java @@ -20,7 +20,6 @@ package org.apache.carbondata.sdk.file; import java.io.File; import java.io.FileFilter; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -31,15 +30,12 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.schema.SchemaReader; -import org.apache.carbondata.core.metadata.schema.table.DiskBasedDMSchemaStorageProvider; import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.avro.generic.GenericData; import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.CharEncoding; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -88,8 +84,8 @@ public class AvroCarbonWriterTest { // conversion to GenericData.Record GenericData.Record record = TestUtil.jsonToAvro(json, avroSchema); try { - CarbonWriter writer = CarbonWriter.builder().outputPath(path).isTransactionalTable(true) - .buildWriterForAvroInput(new Schema.Parser().parse(avroSchema), TestUtil.configuration); + CarbonWriter writer = CarbonWriter.builder().outputPath(path) + .withAvroInput(new Schema.Parser().parse(avroSchema)).build(); for (int i = 0; i < 100; i++) { writer.write(record); @@ -100,10 +96,7 @@ public class AvroCarbonWriterTest { Assert.fail(e.getMessage()); } - File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); - Assert.assertTrue(segmentFolder.exists()); - - File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + File[] dataFiles = new File(path).listFiles(new FileFilter() { @Override public boolean accept(File pathname) { return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); } @@ -156,8 +149,8 @@ public class AvroCarbonWriterTest { try { CarbonWriter writer = CarbonWriter.builder() .outputPath(path) - .isTransactionalTable(true) - .buildWriterForAvroInput(new Schema.Parser().parse(avroSchema), TestUtil.configuration); + + .withAvroInput(new Schema.Parser().parse(avroSchema)).build(); for (int i = 0; i < 100; i++) { writer.write(record); @@ -168,10 +161,7 @@ public class AvroCarbonWriterTest { Assert.fail(e.getMessage()); } - File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); - Assert.assertTrue(segmentFolder.exists()); - - File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + File[] dataFiles = new File(path).listFiles(new FileFilter() { @Override public boolean accept(File pathname) { return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); } @@ -246,11 +236,7 @@ public class AvroCarbonWriterTest { GenericData.Record record = TestUtil.jsonToAvro(json, mySchema); try { - CarbonWriter writer = CarbonWriter.builder() - .outputPath(path) - .isTransactionalTable(true) - .buildWriterForAvroInput(nn, TestUtil.configuration); - + CarbonWriter writer = CarbonWriter.builder().outputPath(path).withAvroInput(nn).build(); for (int i = 0; i < 100; i++) { writer.write(record); } @@ -260,10 +246,7 @@ public class AvroCarbonWriterTest { Assert.fail(e.getMessage()); } - File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); - Assert.assertTrue(segmentFolder.exists()); - - File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + File[] dataFiles = new File(path).listFiles(new FileFilter() { @Override public boolean accept(File pathname) { return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); } @@ -307,11 +290,7 @@ public class AvroCarbonWriterTest { GenericData.Record record = TestUtil.jsonToAvro(json, mySchema); try { - CarbonWriter writer = CarbonWriter.builder() - .outputPath(path) - .isTransactionalTable(true) - .buildWriterForAvroInput(nn, TestUtil.configuration); - + CarbonWriter writer = CarbonWriter.builder().outputPath(path).withAvroInput(nn).build(); for (int i = 0; i < 100; i++) { writer.write(record); } @@ -321,10 +300,7 @@ public class AvroCarbonWriterTest { Assert.fail(e.getMessage()); } - File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); - Assert.assertTrue(segmentFolder.exists()); - - File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + File[] dataFiles = new File(path).listFiles(new FileFilter() { @Override public boolean accept(File pathname) { return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); } @@ -337,17 +313,14 @@ public class AvroCarbonWriterTest { private void WriteAvroComplexData(String mySchema, String json, String[] sortColumns) - throws UnsupportedEncodingException, IOException, InvalidLoadOptionException { + throws IOException, InvalidLoadOptionException { // conversion to GenericData.Record Schema nn = new Schema.Parser().parse(mySchema); GenericData.Record record = TestUtil.jsonToAvro(json, mySchema); try { - CarbonWriter writer = CarbonWriter.builder() - .outputPath(path) - .isTransactionalTable(true).sortBy(sortColumns) - .buildWriterForAvroInput(nn, TestUtil.configuration); - + CarbonWriter writer = + CarbonWriter.builder().outputPath(path).sortBy(sortColumns).withAvroInput(nn).build(); for (int i = 0; i < 100; i++) { writer.write(record); } @@ -396,10 +369,7 @@ public class AvroCarbonWriterTest { WriteAvroComplexData(mySchema, json, null); - File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); - Assert.assertTrue(segmentFolder.exists()); - - File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + File[] dataFiles = new File(path).listFiles(new FileFilter() { @Override public boolean accept(File pathname) { return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); } @@ -460,11 +430,11 @@ public class AvroCarbonWriterTest { Field[] field = new Field[2]; field[0] = new Field("name", DataTypes.STRING); field[1] = new Field("name", DataTypes.STRING); - CarbonWriterBuilder writer = CarbonWriter.builder().isTransactionalTable(false) + CarbonWriterBuilder writer = CarbonWriter.builder() .uniqueIdentifier(System.currentTimeMillis()).outputPath(path); try { - writer.buildWriterForCSVInput(new org.apache.carbondata.sdk.file.Schema(field), TestUtil.configuration); + writer.withCsvInput(new org.apache.carbondata.sdk.file.Schema(field)).build(); Assert.fail(); } catch (Exception e) { assert(e.getMessage().contains("Duplicate column name found in table schema")); @@ -477,14 +447,14 @@ public class AvroCarbonWriterTest { Field[] field = new Field[2]; field[0] = new Field("name", DataTypes.STRING); field[1] = new Field("date", DataTypes.DATE); - CarbonWriterBuilder writer = CarbonWriter.builder().isTransactionalTable(false) + CarbonWriterBuilder writer = CarbonWriter.builder() .uniqueIdentifier(System.currentTimeMillis()).outputPath(path); try { Map<String, String> loadOptions = new HashMap<String, String>(); loadOptions.put("bad_records_action", "fail"); CarbonWriter carbonWriter = - writer.isTransactionalTable(false).withLoadOptions(loadOptions).buildWriterForCSVInput(new org.apache.carbondata.sdk.file.Schema(field), TestUtil.configuration); + writer.withLoadOptions(loadOptions).withCsvInput(new org.apache.carbondata.sdk.file.Schema(field)).build(); carbonWriter.write(new String[] { "k", "20-02-2233" }); carbonWriter.close(); Assert.fail(); @@ -510,15 +480,15 @@ public class AvroCarbonWriterTest { // conversion to GenericData.Record GenericData.Record record = TestUtil.jsonToAvro(json, avroSchema); try { - CarbonWriter writer = CarbonWriter.builder().outputPath(path).isTransactionalTable(true) - .buildWriterForAvroInput(new Schema.Parser().parse(avroSchema), TestUtil.configuration); + CarbonWriter writer = CarbonWriter.builder().outputPath(path) + .withAvroInput(new Schema.Parser().parse(avroSchema)).build(); for (int i = 0; i < 100; i++) { writer.write(record); } writer.close(); TableInfo tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(path, "", - ""), true, TestUtil.configuration); + ""), false); List<String> dataTypes = new ArrayList<>(); for(ColumnSchema columnSchema: tableInfo.getFactTable().getListOfColumns()) { dataTypes.add(columnSchema.getDataType().toString()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java index 5bc453b..ba6d772 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java @@ -34,7 +34,6 @@ import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.datatype.StructField; -import org.apache.carbondata.core.metadata.datatype.StructType; import org.apache.carbondata.core.metadata.schema.SchemaReader; import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; @@ -123,11 +122,8 @@ public class CSVCarbonWriterTest { fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2)); try { - CarbonWriterBuilder builder = CarbonWriter.builder() - .isTransactionalTable(true) - .outputPath(path); - - CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration); + CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path); + CarbonWriter writer = builder.withCsvInput(new Schema(fields)).build(); for (int i = 0; i < 100; i++) { String[] row = new String[]{ @@ -148,10 +144,7 @@ public class CSVCarbonWriterTest { Assert.fail(e.getMessage()); } - File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); - Assert.assertTrue(segmentFolder.exists()); - - File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + File[] dataFiles = new File(path).listFiles(new FileFilter() { @Override public boolean accept(File pathname) { return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); } @@ -171,7 +164,7 @@ public class CSVCarbonWriterTest { fields[0] = new Field("name", DataTypes.STRING); fields[1] = new Field("age", DataTypes.INT); - TestUtil.writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 1, 100, false); + TestUtil.writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, 1, 100); // TODO: implement reader to verify the number of blocklet in the file @@ -187,10 +180,8 @@ public class CSVCarbonWriterTest { fields[0] = new Field("name", DataTypes.STRING); fields[1] = new Field("age", DataTypes.INT); - TestUtil.writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 2, 2, true); - - File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); - File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + TestUtil.writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, 2, 2); + File[] dataFiles = new File(path).listFiles(new FileFilter() { @Override public boolean accept(File pathname) { return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); } @@ -222,23 +213,6 @@ public class CSVCarbonWriterTest { // TODO: test write data with partition } - @Test - public void testSchemaPersistence() throws IOException { - String path = "./testWriteFiles"; - FileUtils.deleteDirectory(new File(path)); - - Field[] fields = new Field[2]; - fields[0] = new Field("name", DataTypes.STRING); - fields[1] = new Field("age", DataTypes.INT); - - TestUtil.writeFilesAndVerify(100, new Schema(fields), path, true); - - String schemaFile = CarbonTablePath.getSchemaFilePath(path); - Assert.assertTrue(new File(schemaFile).exists()); - - FileUtils.deleteDirectory(new File(path)); - } - @Test(expected = IOException.class) public void testWhenWriterthrowsError() throws IOException{ CarbonWriter carbonWriter = null; @@ -249,8 +223,8 @@ public class CSVCarbonWriterTest { fields[0] = new Field("name", DataTypes.STRING); fields[1] = new Field("age", DataTypes.INT); try { - carbonWriter = CarbonWriter.builder().isTransactionalTable(false). - outputPath(path).buildWriterForCSVInput(new Schema(fields), TestUtil.configuration); + carbonWriter = CarbonWriter.builder(). + outputPath(path).withCsvInput(new Schema(fields)).build(); } catch (InvalidLoadOptionException e) { e.printStackTrace(); Assert.assertTrue(false); @@ -269,8 +243,8 @@ public class CSVCarbonWriterTest { fields[0] = new Field("name", DataTypes.STRING); fields[1] = new Field("age", DataTypes.INT); try { - carbonWriter = CarbonWriter.builder().isTransactionalTable(false). - outputPath(path).buildWriterForCSVInput(new Schema(fields), TestUtil.configuration); + carbonWriter = CarbonWriter.builder(). + outputPath(path).withCsvInput(new Schema(fields)).build(); } catch (InvalidLoadOptionException e) { e.printStackTrace(); Assert.assertTrue(false); @@ -293,10 +267,10 @@ public class CSVCarbonWriterTest { try { CarbonWriterBuilder builder = CarbonWriter.builder() - .isTransactionalTable(true).taskNo(5) + .taskNo(5) .outputPath(path); - CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration); + CarbonWriter writer = builder.withCsvInput(new Schema(fields)).build(); for (int i = 0; i < 2; i++) { String[] row = new String[]{ @@ -307,10 +281,7 @@ public class CSVCarbonWriterTest { } writer.close(); - File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); - Assert.assertTrue(segmentFolder.exists()); - - File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + File[] dataFiles = new File(path).listFiles(new FileFilter() { @Override public boolean accept(File pathname) { return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); } @@ -338,8 +309,7 @@ public class CSVCarbonWriterTest { fields[0] = new Field("name", DataTypes.STRING); fields[1] = new Field("age", DataTypes.INT); - TestUtil.writeFilesAndVerify(1000000, new Schema(fields), path, new String[]{"name"}, - true, 3, 8, false); + TestUtil.writeFilesAndVerify(1000000, new Schema(fields), path, new String[]{"name"}, 3, 8); // read footer and verify number of blocklets CarbonFile folder = FileFactory.getCarbonFile(path); @@ -374,8 +344,8 @@ public class CSVCarbonWriterTest { fields[2] = new Field("doubleField", DataTypes.DOUBLE); try { - CarbonWriterBuilder builder = CarbonWriter.builder().isTransactionalTable(true).taskNo(5).outputPath(path); - CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration); + CarbonWriterBuilder builder = CarbonWriter.builder().taskNo(5).outputPath(path); + CarbonWriter writer = builder.withCsvInput(new Schema(fields)).build(); for (int i = 0; i < 15; i++) { String[] row = new String[] { "robot" + (i % 10), String.valueOf(i + "." + i), String.valueOf(i + "." + i) }; @@ -383,7 +353,7 @@ public class CSVCarbonWriterTest { } writer.close(); TableInfo tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(path, "", - ""), true, TestUtil.configuration); + ""), false); List<String> dataTypes = new ArrayList<>(); for(ColumnSchema columnSchema: tableInfo.getFactTable().getListOfColumns()) { dataTypes.add(columnSchema.getDataType().toString()); @@ -409,15 +379,15 @@ public class CSVCarbonWriterTest { fields[1] = new Field("byteField", DataTypes.BYTE); try { - CarbonWriterBuilder builder = CarbonWriter.builder().isTransactionalTable(true).taskNo(5).outputPath(path); - CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration); + CarbonWriterBuilder builder = CarbonWriter.builder().taskNo(5).outputPath(path); + CarbonWriter writer = builder.withCsvInput(new Schema(fields)).build(); for (int i = 0; i < 15; i++) { String[] row = new String[] { "robot" + (i % 10), "" + i }; writer.write(row); } writer.close(); TableInfo tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(path, "", - ""), true, TestUtil.configuration); + ""), false); List<String> dataTypes = new ArrayList<>(); for(ColumnSchema columnSchema: tableInfo.getFactTable().getListOfColumns()) { dataTypes.add(columnSchema.getDataType().toString()); @@ -443,15 +413,15 @@ public class CSVCarbonWriterTest { fields[2] = new Field("floatField", DataTypes.FLOAT); try { - CarbonWriterBuilder builder = CarbonWriter.builder().isTransactionalTable(true).taskNo(5).outputPath(path); - CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration); + CarbonWriterBuilder builder = CarbonWriter.builder().taskNo(5).outputPath(path); + CarbonWriter writer = builder.withCsvInput(new Schema(fields)).build(); for (int i = 0; i < 15; i++) { String[] row = new String[] { "robot" + (i % 10), "" + i, i + "." + i }; writer.write(row); } writer.close(); CarbonReader carbonReader = - new CarbonReaderBuilder(path, "table1").build(TestUtil.configuration); + new CarbonReaderBuilder(path, "table1").build(); for (int i = 0; i < 15; i++) { Object[] actualRow = (Object[]) carbonReader.readNextRow(); String[] expectedRow = new String[] { "robot" + (i % 10), "" + i, i + "." + i }; @@ -483,9 +453,8 @@ public class CSVCarbonWriterTest { Field structType = new Field("structField", "struct", Arrays.asList(fields)); try { - CarbonWriterBuilder builder = CarbonWriter.builder().isTransactionalTable(true).taskNo(5).outputPath(path); - CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(new Field[] {structType}), - TestUtil.configuration); + CarbonWriterBuilder builder = CarbonWriter.builder().taskNo(5).outputPath(path); + CarbonWriter writer = builder.withCsvInput(new Schema(new Field[] {structType})).build(); for (int i = 0; i < 15; i++) { String[] row = new String[] { "robot" + (i % 10)+"$" + i+ "$" + i + "." + i }; writer.write(row); @@ -523,9 +492,8 @@ public class CSVCarbonWriterTest { Field structType2 = new Field("bytearray", "array", Arrays.asList(fields2)); try { - CarbonWriterBuilder builder = CarbonWriter.builder().isTransactionalTable(true).taskNo(5).outputPath(path); - CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(new Field[] {structType1, structType2}), - TestUtil.configuration); + CarbonWriterBuilder builder = CarbonWriter.builder().taskNo(5).outputPath(path); + CarbonWriter writer = builder.withCsvInput(new Schema(new Field[] {structType1, structType2})).build(); for (int i = 0; i < 15; i++) { String[] row = new String[] { "1.0$2.0$3.0", "1$2$3" }; writer.write(row); http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java deleted file mode 100644 index a7ec6cd..0000000 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java +++ /dev/null @@ -1,298 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.sdk.file; - -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; - -import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.metadata.schema.table.DiskBasedDMSchemaStorageProvider; -import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.core.util.path.CarbonTablePath; - -import org.apache.commons.io.FileUtils; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -/** - * Test suite for {@link CSVCarbonWriter} - */ -public class CSVNonTransactionalCarbonWriterTest { - @Before - public void cleanFile() { - String path = null; - try { - path = new File(CSVNonTransactionalCarbonWriterTest.class.getResource("/").getPath() + "../") - .getCanonicalPath().replaceAll("\\\\", "/"); - } catch (IOException e) { - assert (false); - } - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION, path); - assert (TestUtil.cleanMdtFile()); - } - - @After - public void verifyDMFile() { - assert (!TestUtil.verifyMdtFile()); - } - - @Test - public void testWriteFiles() throws IOException { - String path = "./testWriteFiles"; - FileUtils.deleteDirectory(new File(path)); - - Field[] fields = new Field[2]; - fields[0] = new Field("name", DataTypes.STRING); - fields[1] = new Field("age", DataTypes.INT); - - writeFilesAndVerify(new Schema(fields), path); - - FileUtils.deleteDirectory(new File(path)); - } - - @Test - public void testWriteFilesJsonSchema() throws IOException { - String path = "./testWriteFilesJsonSchema"; - FileUtils.deleteDirectory(new File(path)); - - String schema = new StringBuilder() - .append("[ \n") - .append(" {\"name\":\"string\"},\n") - .append(" {\"age\":\"int\"},\n") - .append(" {\"height\":\"double\"}\n") - .append("]") - .toString(); - - writeFilesAndVerify(Schema.parseJson(schema), path); - - FileUtils.deleteDirectory(new File(path)); - } - - private void writeFilesAndVerify(Schema schema, String path) { - writeFilesAndVerify(schema, path, null); - } - - private void writeFilesAndVerify(Schema schema, String path, String[] sortColumns) { - writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1); - } - - private void writeFilesAndVerify(Schema schema, String path, boolean persistSchema) { - writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1); - } - - /** - * Invoke CarbonWriter API to write carbon files and assert the file is rewritten - * @param rows number of rows to write - * @param schema schema of the file - * @param path local write path - * @param sortColumns sort columns - * @param persistSchema true if want to persist schema file - * @param blockletSize blockletSize in the file, -1 for default size - * @param blockSize blockSize in the file, -1 for default size - */ - private void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns, - boolean persistSchema, int blockletSize, int blockSize) { - try { - CarbonWriterBuilder builder = CarbonWriter.builder() - .isTransactionalTable(false) - .uniqueIdentifier(System.currentTimeMillis()) - .taskNo(System.nanoTime()) - .outputPath(path); - if (sortColumns != null) { - builder = builder.sortBy(sortColumns); - } - if (persistSchema) { - builder = builder.persistSchemaFile(true); - } - if (blockletSize != -1) { - builder = builder.withBlockletSize(blockletSize); - } - if (blockSize != -1) { - builder = builder.withBlockSize(blockSize); - } - - CarbonWriter writer = builder.buildWriterForCSVInput(schema, TestUtil.configuration); - - for (int i = 0; i < rows; i++) { - writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)}); - } - writer.close(); - } catch (IOException e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); - } catch (InvalidLoadOptionException l) { - l.printStackTrace(); - Assert.fail(l.getMessage()); - } - - File segmentFolder = new File(path); - Assert.assertTrue(segmentFolder.exists()); - - File[] dataFiles = segmentFolder.listFiles(new FileFilter() { - @Override public boolean accept(File pathname) { - return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); - } - }); - Assert.assertNotNull(dataFiles); - Assert.assertTrue(dataFiles.length > 0); - } - - - @Test - public void testAllPrimitiveDataType() throws IOException { - // TODO: write all data type and read by CarbonRecordReader to verify the content - String path = "./testWriteFiles"; - FileUtils.deleteDirectory(new File(path)); - - Field[] fields = new Field[9]; - fields[0] = new Field("stringField", DataTypes.STRING); - fields[1] = new Field("intField", DataTypes.INT); - fields[2] = new Field("shortField", DataTypes.SHORT); - fields[3] = new Field("longField", DataTypes.LONG); - fields[4] = new Field("doubleField", DataTypes.DOUBLE); - fields[5] = new Field("boolField", DataTypes.BOOLEAN); - fields[6] = new Field("dateField", DataTypes.DATE); - fields[7] = new Field("timeField", DataTypes.TIMESTAMP); - fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2)); - - try { - CarbonWriterBuilder builder = CarbonWriter.builder() - .uniqueIdentifier(System.currentTimeMillis()) - .isTransactionalTable(false) - .taskNo(System.nanoTime()) - .outputPath(path); - - CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration); - - for (int i = 0; i < 100; i++) { - String[] row = new String[]{ - "robot" + (i % 10), - String.valueOf(i), - String.valueOf(i), - String.valueOf(Long.MAX_VALUE - i), - String.valueOf((double) i / 2), - String.valueOf(true), - "2019-03-02", - "2019-02-12 03:03:34" - }; - writer.write(row); - } - writer.close(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - - File segmentFolder = new File(path); - Assert.assertTrue(segmentFolder.exists()); - - File[] dataFiles = segmentFolder.listFiles(new FileFilter() { - @Override public boolean accept(File pathname) { - return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); - } - }); - Assert.assertNotNull(dataFiles); - Assert.assertTrue(dataFiles.length > 0); - - FileUtils.deleteDirectory(new File(path)); - } - - @Test - public void test2Blocklet() throws IOException { - String path = "./testWriteFiles"; - FileUtils.deleteDirectory(new File(path)); - - Field[] fields = new Field[2]; - fields[0] = new Field("name", DataTypes.STRING); - fields[1] = new Field("age", DataTypes.INT); - - writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 1, 100); - - // TODO: implement reader to verify the number of blocklet in the file - - FileUtils.deleteDirectory(new File(path)); - } - - @Test - public void test2Block() throws IOException { - String path = "./testWriteFiles"; - FileUtils.deleteDirectory(new File(path)); - - Field[] fields = new Field[2]; - fields[0] = new Field("name", DataTypes.STRING); - fields[1] = new Field("age", DataTypes.INT); - - writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 2, 2); - - File segmentFolder = new File(path); - File[] dataFiles = segmentFolder.listFiles(new FileFilter() { - @Override public boolean accept(File pathname) { - return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); - } - }); - Assert.assertNotNull(dataFiles); - Assert.assertEquals(2, dataFiles.length); - - FileUtils.deleteDirectory(new File(path)); - } - - @Test - public void testSortColumns() throws IOException { - String path = "./testWriteFiles"; - FileUtils.deleteDirectory(new File(path)); - - Field[] fields = new Field[2]; - fields[0] = new Field("name", DataTypes.STRING); - fields[1] = new Field("age", DataTypes.INT); - - writeFilesAndVerify(new Schema(fields), path, new String[]{"name"}); - - // TODO: implement reader and verify the data is sorted - - FileUtils.deleteDirectory(new File(path)); - } - - @Test - public void testPartitionOutput() { - // TODO: test write data with partition - } - - @Test - public void testSchemaPersistence() throws IOException { - String path = "./testWriteFiles"; - FileUtils.deleteDirectory(new File(path)); - - Field[] fields = new Field[2]; - fields[0] = new Field("name", DataTypes.STRING); - fields[1] = new Field("age", DataTypes.INT); - - writeFilesAndVerify(new Schema(fields), path, true); - - String schemaFile = CarbonTablePath.getSchemaFilePath(path); - Assert.assertTrue(new File(schemaFile).exists()); - - FileUtils.deleteDirectory(new File(path)); - } - -}
