Repository: carbondata Updated Branches: refs/heads/master 7c827c0a9 -> 8f1a029b9
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java index 4859dd2..58bf3ab 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java @@ -179,7 +179,8 @@ public class CarbonReaderBuilder { * @throws IOException * @throws InterruptedException */ - public <T> CarbonReader<T> build() throws IOException, InterruptedException { + public <T> CarbonReader<T> build(Configuration configuration) + throws IOException, InterruptedException { // DB name is not applicable for SDK reader as, table will be never registered. CarbonTable table; if (isTransactionalTable) { @@ -193,7 +194,7 @@ public class CarbonReaderBuilder { } } final CarbonFileInputFormat format = new CarbonFileInputFormat(); - final Job job = new Job(new Configuration()); + final Job job = new Job(configuration); format.setTableInfo(job.getConfiguration(), table.getTableInfo()); format.setTablePath(job.getConfiguration(), table.getTablePath()); format.setTableName(job.getConfiguration(), table.getTableName()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index 89e69fb..28a0dde 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -45,14 +45,13 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.metadata.schema.table.TableSchema; import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; -import org.apache.carbondata.core.util.CarbonSessionInfo; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.ThreadLocalSessionInfo; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.ThriftWriter; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.s3a.Constants; /** @@ -74,10 +73,6 @@ public class CarbonWriterBuilder { private int localDictionaryThreshold; private boolean isLocalDictionaryEnabled; - public CarbonWriterBuilder() { - ThreadLocalSessionInfo.setCarbonSessionInfo(new CarbonSessionInfo()); - } - /** * Sets the output path of the writer builder * @param path is the absolute path where output files are written @@ -398,13 +393,13 @@ public class CarbonWriterBuilder { * @throws IOException * @throws InvalidLoadOptionException */ - public CarbonWriter buildWriterForCSVInput(Schema schema) + public CarbonWriter buildWriterForCSVInput(Schema schema, Configuration configuration) throws IOException, InvalidLoadOptionException { Objects.requireNonNull(schema, "schema should not be null"); Objects.requireNonNull(path, "path should not be null"); this.schema = schema; CarbonLoadModel loadModel = buildLoadModel(schema); - return new CSVCarbonWriter(loadModel); + return new CSVCarbonWriter(loadModel, configuration); } /** @@ -416,8 +411,8 @@ public class CarbonWriterBuilder { * @throws IOException * @throws InvalidLoadOptionException */ - public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfThreads) - throws IOException, InvalidLoadOptionException { + public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfThreads, + Configuration configuration) throws IOException, InvalidLoadOptionException { Objects.requireNonNull(schema, "schema should not be null"); Objects.requireNonNull(numOfThreads, "numOfThreads should not be null"); Objects.requireNonNull(path, "path should not be null"); @@ -427,7 +422,7 @@ public class CarbonWriterBuilder { } CarbonLoadModel loadModel = buildLoadModel(schema); loadModel.setSdkWriterCores(numOfThreads); - return new CSVCarbonWriter(loadModel); + return new CSVCarbonWriter(loadModel, configuration); } /** @@ -439,8 +434,8 @@ public class CarbonWriterBuilder { * @throws IOException * @throws InvalidLoadOptionException */ - public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema avroSchema) - throws IOException, InvalidLoadOptionException { + public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema avroSchema, + Configuration configuration) throws IOException, InvalidLoadOptionException { this.schema = AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema); Objects.requireNonNull(schema, "schema should not be null"); Objects.requireNonNull(path, "path should not be null"); @@ -450,7 +445,7 @@ public class CarbonWriterBuilder { // removed from the load. LoadWithoutConverter flag is going to point to the Loader Builder // which will skip Conversion Step. loadModel.setLoadWithoutConverterStep(true); - return new AvroCarbonWriter(loadModel); + return new AvroCarbonWriter(loadModel, configuration); } /** @@ -462,7 +457,7 @@ public class CarbonWriterBuilder { * @throws InvalidLoadOptionException */ public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema avroSchema, - short numOfThreads) + short numOfThreads, Configuration configuration) throws IOException, InvalidLoadOptionException { this.schema = AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema); Objects.requireNonNull(schema, "schema should not be null"); @@ -478,7 +473,7 @@ public class CarbonWriterBuilder { // which will skip Conversion Step. loadModel.setLoadWithoutConverterStep(true); loadModel.setSdkWriterCores(numOfThreads); - return new AvroCarbonWriter(loadModel); + return new AvroCarbonWriter(loadModel, configuration); } /** @@ -490,14 +485,14 @@ public class CarbonWriterBuilder { * @throws IOException * @throws InvalidLoadOptionException */ - public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema) + public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema, Configuration configuration) throws IOException, InvalidLoadOptionException { Objects.requireNonNull(carbonSchema, "schema should not be null"); Objects.requireNonNull(path, "path should not be null"); this.schema = carbonSchema; CarbonLoadModel loadModel = buildLoadModel(carbonSchema); loadModel.setJsonFileLoad(true); - return new JsonCarbonWriter(loadModel); + return new JsonCarbonWriter(loadModel, configuration); } /** @@ -510,11 +505,10 @@ public class CarbonWriterBuilder { * @throws IOException * @throws InvalidLoadOptionException */ - public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema carbonSchema, short numOfThreads) - throws IOException, InvalidLoadOptionException { + public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema carbonSchema, short numOfThreads, + Configuration configuration) throws IOException, InvalidLoadOptionException { Objects.requireNonNull(carbonSchema, "schema should not be null"); Objects.requireNonNull(path, "path should not be null"); - Objects.requireNonNull(numOfThreads, "numOfThreads should not be null"); if (numOfThreads <= 0) { throw new IllegalArgumentException(" numOfThreads must be greater than 0"); } @@ -522,7 +516,7 @@ public class CarbonWriterBuilder { CarbonLoadModel loadModel = buildLoadModel(schema); loadModel.setJsonFileLoad(true); loadModel.setSdkWriterCores(numOfThreads); - return new JsonCarbonWriter(loadModel); + return new JsonCarbonWriter(loadModel, configuration); } private void setCsvHeader(CarbonLoadModel model) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java index b6e7ad5..5f65539 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java @@ -23,7 +23,6 @@ import java.util.Random; import java.util.UUID; import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; import org.apache.carbondata.hadoop.internal.ObjectArrayWritable; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; @@ -47,15 +46,14 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; private TaskAttemptContext context; private ObjectArrayWritable writable; - JsonCarbonWriter(CarbonLoadModel loadModel) throws IOException { - Configuration OutputHadoopConf = FileFactory.getConfiguration(); - CarbonTableOutputFormat.setLoadModel(OutputHadoopConf, loadModel); + JsonCarbonWriter(CarbonLoadModel loadModel, Configuration configuration) throws IOException { + CarbonTableOutputFormat.setLoadModel(configuration, loadModel); CarbonTableOutputFormat outputFormat = new CarbonTableOutputFormat(); JobID jobId = new JobID(UUID.randomUUID().toString(), 0); Random random = new Random(); TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt()); TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt()); - TaskAttemptContextImpl context = new TaskAttemptContextImpl(OutputHadoopConf, attemptID); + TaskAttemptContextImpl context = new TaskAttemptContextImpl(configuration, attemptID); this.recordWriter = outputFormat.getRecordWriter(context); this.context = context; this.writable = new ObjectArrayWritable(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java index 4320edc..8f53125 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java @@ -83,7 +83,7 @@ public class AvroCarbonWriterTest { GenericData.Record record = TestUtil.jsonToAvro(json, avroSchema); try { CarbonWriter writer = CarbonWriter.builder().outputPath(path).isTransactionalTable(true) - .buildWriterForAvroInput(new Schema.Parser().parse(avroSchema)); + .buildWriterForAvroInput(new Schema.Parser().parse(avroSchema), TestUtil.configuration); for (int i = 0; i < 100; i++) { writer.write(record); @@ -151,7 +151,7 @@ public class AvroCarbonWriterTest { CarbonWriter writer = CarbonWriter.builder() .outputPath(path) .isTransactionalTable(true) - .buildWriterForAvroInput(new Schema.Parser().parse(avroSchema)); + .buildWriterForAvroInput(new Schema.Parser().parse(avroSchema), TestUtil.configuration); for (int i = 0; i < 100; i++) { writer.write(record); @@ -243,7 +243,7 @@ public class AvroCarbonWriterTest { CarbonWriter writer = CarbonWriter.builder() .outputPath(path) .isTransactionalTable(true) - .buildWriterForAvroInput(nn); + .buildWriterForAvroInput(nn, TestUtil.configuration); for (int i = 0; i < 100; i++) { writer.write(record); @@ -304,7 +304,7 @@ public class AvroCarbonWriterTest { CarbonWriter writer = CarbonWriter.builder() .outputPath(path) .isTransactionalTable(true) - .buildWriterForAvroInput(nn); + .buildWriterForAvroInput(nn, TestUtil.configuration); for (int i = 0; i < 100; i++) { writer.write(record); @@ -340,7 +340,7 @@ public class AvroCarbonWriterTest { CarbonWriter writer = CarbonWriter.builder() .outputPath(path) .isTransactionalTable(true).sortBy(sortColumns) - .buildWriterForAvroInput(nn); + .buildWriterForAvroInput(nn, TestUtil.configuration); for (int i = 0; i < 100; i++) { writer.write(record); @@ -458,7 +458,7 @@ public class AvroCarbonWriterTest { .uniqueIdentifier(System.currentTimeMillis()).outputPath(path); try { - writer.buildWriterForCSVInput(new org.apache.carbondata.sdk.file.Schema(field)); + writer.buildWriterForCSVInput(new org.apache.carbondata.sdk.file.Schema(field), TestUtil.configuration); Assert.fail(); } catch (Exception e) { assert(e.getMessage().contains("Duplicate column name found in table schema")); @@ -478,7 +478,7 @@ public class AvroCarbonWriterTest { Map<String, String> loadOptions = new HashMap<String, String>(); loadOptions.put("bad_records_action", "fail"); CarbonWriter carbonWriter = - writer.isTransactionalTable(false).withLoadOptions(loadOptions).buildWriterForCSVInput(new org.apache.carbondata.sdk.file.Schema(field)); + writer.isTransactionalTable(false).withLoadOptions(loadOptions).buildWriterForCSVInput(new org.apache.carbondata.sdk.file.Schema(field), TestUtil.configuration); carbonWriter.write(new String[] { "k", "20-02-2233" }); carbonWriter.close(); Assert.fail(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java index e71d061..3d59724 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java @@ -120,7 +120,7 @@ public class CSVCarbonWriterTest { .isTransactionalTable(true) .outputPath(path); - CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields)); + CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration); for (int i = 0; i < 100; i++) { String[] row = new String[]{ @@ -243,7 +243,7 @@ public class CSVCarbonWriterTest { fields[1] = new Field("age", DataTypes.INT); try { carbonWriter = CarbonWriter.builder().isTransactionalTable(false). - outputPath(path).buildWriterForCSVInput(new Schema(fields)); + outputPath(path).buildWriterForCSVInput(new Schema(fields), TestUtil.configuration); } catch (InvalidLoadOptionException e) { e.printStackTrace(); Assert.assertTrue(false); @@ -263,7 +263,7 @@ public class CSVCarbonWriterTest { fields[1] = new Field("age", DataTypes.INT); try { carbonWriter = CarbonWriter.builder().isTransactionalTable(false). - outputPath(path).buildWriterForCSVInput(new Schema(fields)); + outputPath(path).buildWriterForCSVInput(new Schema(fields), TestUtil.configuration); } catch (InvalidLoadOptionException e) { e.printStackTrace(); Assert.assertTrue(false); @@ -289,7 +289,7 @@ public class CSVCarbonWriterTest { .isTransactionalTable(true).taskNo(5) .outputPath(path); - CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields)); + CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration); for (int i = 0; i < 2; i++) { String[] row = new String[]{ http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java index c1d5f88..a7ec6cd 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java @@ -132,7 +132,7 @@ public class CSVNonTransactionalCarbonWriterTest { builder = builder.withBlockSize(blockSize); } - CarbonWriter writer = builder.buildWriterForCSVInput(schema); + CarbonWriter writer = builder.buildWriterForCSVInput(schema, TestUtil.configuration); for (int i = 0; i < rows; i++) { writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)}); @@ -183,7 +183,7 @@ public class CSVNonTransactionalCarbonWriterTest { .taskNo(System.nanoTime()) .outputPath(path); - CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields)); + CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration); for (int i = 0; i < 100; i++) { String[] row = new String[]{ http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java index 54b3e9e..dedf30e 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java @@ -40,10 +40,13 @@ import org.apache.carbondata.core.util.path.CarbonTablePath; import junit.framework.TestCase; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.CharEncoding; +import org.apache.hadoop.conf.Configuration; import org.junit.*; public class CarbonReaderTest extends TestCase { + private Configuration conf = new Configuration(false); + @Before public void cleanFile() { assert (TestUtil.cleanMdtFile()); @@ -72,7 +75,7 @@ public class CarbonReaderTest extends TestCase { TestUtil.writeFilesAndVerify(200, new Schema(fields), path, true); CarbonReader reader = CarbonReader.builder(path, "_temp").isTransactionalTable(true) - .projection(new String[]{"name", "age"}).build(); + .projection(new String[]{"name", "age"}).build(conf); // expected output after sorting String[] name = new String[200]; @@ -99,7 +102,7 @@ public class CarbonReaderTest extends TestCase { .builder(path, "_temp") .isTransactionalTable(true) .projection(new String[]{"name", "age"}) - .build(); + .build(conf); i = 0; while (reader2.hasNext()) { @@ -134,7 +137,7 @@ public class CarbonReaderTest extends TestCase { .isTransactionalTable(true) .projection(new String[]{"name", "age"}) .filter(equalToExpression) - .build(); + .build(conf); int i = 0; while (reader.hasNext()) { @@ -176,7 +179,7 @@ public class CarbonReaderTest extends TestCase { .isTransactionalTable(true) .projection(new String[]{"name", "age", "doubleField"}) .filter(andExpression) - .build(); + .build(conf); int i = 0; while (reader.hasNext()) { @@ -213,7 +216,7 @@ public class CarbonReaderTest extends TestCase { .isTransactionalTable(false) .projection(new String[]{"name", "age"}) .filter(equalToExpression) - .build(); + .build(conf); int i = 0; while (reader.hasNext()) { @@ -249,7 +252,7 @@ public class CarbonReaderTest extends TestCase { .isTransactionalTable(false) .projection(new String[]{"name", "age"}) .filter(equalToExpression) - .build(); + .build(conf); int i = 0; while (reader.hasNext()) { @@ -292,7 +295,7 @@ public class CarbonReaderTest extends TestCase { .isTransactionalTable(false) .projection(new String[]{"name", "age", "doubleField"}) .filter(andExpression) - .build(); + .build(conf); int i = 0; while (reader.hasNext()) { @@ -335,7 +338,7 @@ public class CarbonReaderTest extends TestCase { .isTransactionalTable(false) .projection(new String[]{"name", "age", "doubleField"}) .filter(orExpression) - .build(); + .build(conf); int i = 0; while (reader.hasNext()) { @@ -378,7 +381,7 @@ public class CarbonReaderTest extends TestCase { .isTransactionalTable(false) .projection(new String[]{"name", "age", "doubleField"}) .filter(andExpression) - .build(); + .build(conf); int i = 0; while (reader.hasNext()) { @@ -421,7 +424,7 @@ public class CarbonReaderTest extends TestCase { .isTransactionalTable(false) .projection(new String[]{"name", "age", "doubleField"}) .filter(andExpression) - .build(); + .build(conf); int i = 0; while (reader.hasNext()) { @@ -464,7 +467,7 @@ public class CarbonReaderTest extends TestCase { .isTransactionalTable(false) .projection(new String[]{"name", "age", "doubleField"}) .filter(andExpression) - .build(); + .build(conf); int i = 0; while (reader.hasNext()) { @@ -507,7 +510,7 @@ public class CarbonReaderTest extends TestCase { .isTransactionalTable(false) .projection(new String[]{"name", "age", "doubleField"}) .filter(andExpression) - .build(); + .build(conf); int i = 0; while (reader.hasNext()) { @@ -550,7 +553,7 @@ public class CarbonReaderTest extends TestCase { .isTransactionalTable(false) .projection(new String[]{"name", "age", "doubleField"}) .filter(andExpression) - .build(); + .build(conf); int i = 0; while (reader.hasNext()) { @@ -582,7 +585,7 @@ public class CarbonReaderTest extends TestCase { CarbonWriter carbonWriter = null; try { carbonWriter = builder.outputPath(path1).isTransactionalTable(false).uniqueIdentifier(12345) - .buildWriterForCSVInput(schema); + .buildWriterForCSVInput(schema, TestUtil.configuration); } catch (InvalidLoadOptionException e) { e.printStackTrace(); } @@ -597,7 +600,7 @@ public class CarbonReaderTest extends TestCase { CarbonWriter carbonWriter1 = null; try { carbonWriter1 = builder1.outputPath(path2).isTransactionalTable(false).uniqueIdentifier(12345) - .buildWriterForCSVInput(schema1); + .buildWriterForCSVInput(schema1, TestUtil.configuration); } catch (InvalidLoadOptionException e) { e.printStackTrace(); } @@ -608,14 +611,14 @@ public class CarbonReaderTest extends TestCase { CarbonReader reader = CarbonReader.builder(path1, "_temp"). projection(new String[] { "c1", "c3" }) - .isTransactionalTable(false).build(); + .isTransactionalTable(false).build(conf); } catch (Exception e){ System.out.println("Success"); } CarbonReader reader1 = CarbonReader.builder(path2, "_temp1") .projection(new String[] { "p1", "p2" }) - .isTransactionalTable(false).build(); + .isTransactionalTable(false).build(conf); while (reader1.hasNext()) { Object[] row1 = (Object[]) reader1.readNextRow(); @@ -643,7 +646,7 @@ public class CarbonReaderTest extends TestCase { .builder(path, "_temp") .projection(new String[]{"name", "name", "age", "name"}) .isTransactionalTable(true) - .build(); + .build(conf); // expected output after sorting String[] name = new String[100]; @@ -685,13 +688,13 @@ public class CarbonReaderTest extends TestCase { .builder(path, "_temp") .projection(new String[]{"name", "age"}) .isTransactionalTable(true) - .build(); + .build(conf); // Reader 2 CarbonReader reader2 = CarbonReader .builder(path, "_temp") .projection(new String[]{"name", "age"}) .isTransactionalTable(true) - .build(); + .build(conf); while (reader.hasNext()) { Object[] row = (Object[]) reader.readNextRow(); @@ -719,7 +722,7 @@ public class CarbonReaderTest extends TestCase { TestUtil.writeFilesAndVerify(100, new Schema(fields), path, true); CarbonReader reader = CarbonReader.builder(path, "_temp").isTransactionalTable(true) - .projection(new String[]{"name", "age"}).build(); + .projection(new String[]{"name", "age"}).build(conf); reader.close(); String msg = "CarbonReader not initialise, please create it first."; @@ -762,7 +765,7 @@ public class CarbonReaderTest extends TestCase { .builder(path) .projection(new String[]{"name", "age"}) .isTransactionalTable(true) - .build(); + .build(conf); // expected output after sorting String[] name = new String[100]; @@ -799,7 +802,7 @@ public class CarbonReaderTest extends TestCase { CarbonReader reader = CarbonReader .builder(path) - .build(); + .build(conf); // expected output after sorting String[] name = new String[100]; @@ -910,7 +913,7 @@ public class CarbonReaderTest extends TestCase { CarbonReader reader = CarbonReader.builder(path, "_temp") .projection(new String[]{"name", "age"}) - .build(); + .build(conf); // expected output after sorting String[] name = new String[100]; @@ -987,7 +990,7 @@ public class CarbonReaderTest extends TestCase { .persistSchemaFile(true) .outputPath(path); - CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields)); + CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration); for (int i = 0; i < 100; i++) { String[] row = new String[]{ @@ -1045,7 +1048,7 @@ public class CarbonReaderTest extends TestCase { , "dateField" , "timeField" , "decimalField"}) - .build(); + .build(conf); int i = 0; while (reader.hasNext()) { @@ -1108,7 +1111,7 @@ public class CarbonReaderTest extends TestCase { .persistSchemaFile(true) .outputPath(path); - CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields)); + CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration); for (int i = 0; i < 100; i++) { String[] row2 = new String[]{ @@ -1161,7 +1164,7 @@ public class CarbonReaderTest extends TestCase { .builder(path, "_temp") .isTransactionalTable(true) .projection(strings) - .build(); + .build(conf); int i = 0; while (reader.hasNext()) { @@ -1224,7 +1227,7 @@ public class CarbonReaderTest extends TestCase { .persistSchemaFile(true) .outputPath(path); - CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields)); + CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration); for (int i = 0; i < 100; i++) { String[] row2 = new String[]{ @@ -1275,7 +1278,7 @@ public class CarbonReaderTest extends TestCase { .builder(path, "_temp") .projection(strings) .isTransactionalTable(true) - .build(); + .build(conf); int i = 0; while (reader.hasNext()) { @@ -1338,7 +1341,7 @@ public class CarbonReaderTest extends TestCase { .persistSchemaFile(true) .outputPath(path); - CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields)); + CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration); for (int i = 0; i < 100; i++) { String[] row2 = new String[]{ @@ -1381,7 +1384,7 @@ public class CarbonReaderTest extends TestCase { .builder(path, "_temp") .isTransactionalTable(true) .projection(strings) - .build(); + .build(conf); int i = 0; while (reader.hasNext()) { @@ -1423,7 +1426,7 @@ public class CarbonReaderTest extends TestCase { CarbonReader reader = CarbonReader .builder(path, "_temp") .isTransactionalTable(true) - .build(); + .build(conf); // expected output after sorting String[] name = new String[100]; @@ -1461,7 +1464,7 @@ public class CarbonReaderTest extends TestCase { CarbonReader reader = CarbonReader .builder(path, "_temp") .isTransactionalTable(true) - .build(); + .build(conf); // expected output after sorting String[] name = new String[100]; @@ -1498,7 +1501,7 @@ public class CarbonReaderTest extends TestCase { .builder(path, "_temp") .projection(new String[]{}) .isTransactionalTable(true) - .build(); + .build(conf); assert (false); } catch (RuntimeException e) { assert (e.getMessage().equalsIgnoreCase("Projection can't be empty")); @@ -1517,7 +1520,7 @@ public class CarbonReaderTest extends TestCase { CarbonWriter writer = CarbonWriter.builder() .outputPath(path) .isTransactionalTable(isTransactionalTable) - .buildWriterForAvroInput(nn); + .buildWriterForAvroInput(nn, TestUtil.configuration); for (int i = 0; i < 100; i++) { writer.write(record); @@ -1645,7 +1648,7 @@ public class CarbonReaderTest extends TestCase { CarbonReader reader = CarbonReader .builder(path, "_temp") .isTransactionalTable(false) - .build(); + .build(conf); // expected output String name = "bob"; @@ -1688,7 +1691,7 @@ public class CarbonReaderTest extends TestCase { .isTransactionalTable(false) .projection(new String[]{"name", "age"}) .filter(equalToExpression) - .build(); + .build(conf); int i = 0; while (reader.hasNext()) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java index 9a6e8e0..4cb816b 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import org.apache.avro.generic.GenericData; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; import org.junit.Assert; import org.junit.Test; @@ -59,7 +60,8 @@ public class ConcurrentAvroSdkWriterTest { ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads); try { CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path); - CarbonWriter writer = builder.buildThreadSafeWriterForAvroInput(avroSchema, numOfThreads); + CarbonWriter writer = builder.buildThreadSafeWriterForAvroInput(avroSchema, numOfThreads, + TestUtil.configuration); // write in multi-thread for (int i = 0; i < numOfThreads; i++) { executorService.submit(new WriteLogic(writer, record)); @@ -76,7 +78,7 @@ public class ConcurrentAvroSdkWriterTest { CarbonReader reader; try { reader = - CarbonReader.builder(path, "_temp").projection(new String[] { "name", "age" }).build(); + CarbonReader.builder(path, "_temp").projection(new String[] { "name", "age" }).build(new Configuration(false)); int i = 0; while (reader.hasNext()) { Object[] row = (Object[]) reader.readNextRow(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java index 8ce1ef1..9bb3f29 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; import org.junit.Assert; import org.junit.Test; @@ -53,7 +54,7 @@ public class ConcurrentSdkWriterTest { CarbonWriterBuilder builder = CarbonWriter.builder() .outputPath(path); CarbonWriter writer = - builder.buildThreadSafeWriterForCSVInput(new Schema(fields), numOfThreads); + builder.buildThreadSafeWriterForCSVInput(new Schema(fields), numOfThreads, TestUtil.configuration); // write in multi-thread for (int i = 0; i < numOfThreads; i++) { executorService.submit(new WriteLogic(writer)); @@ -72,7 +73,7 @@ public class ConcurrentSdkWriterTest { reader = CarbonReader .builder(path, "_temp") .projection(new String[]{"name", "age"}) - .build(); + .build(new Configuration(false)); int i = 0; while (reader.hasNext()) { Object[] row = (Object[]) reader.readNextRow(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java index 83026a2..2d5dbcd 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java @@ -38,10 +38,13 @@ import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.Encoder; import org.apache.avro.io.JsonDecoder; +import org.apache.hadoop.conf.Configuration; import org.junit.Assert; public class TestUtil { + public static Configuration configuration = new Configuration(); + public static GenericData.Record jsonToAvro(String json, String avroSchema) throws IOException { InputStream input = null; DataFileWriter writer = null; @@ -130,7 +133,7 @@ public class TestUtil { builder = builder.withBlockSize(blockSize); } - CarbonWriter writer = builder.buildWriterForCSVInput(schema); + CarbonWriter writer = builder.buildWriterForCSVInput(schema, configuration); for (int i = 0; i < rows; i++) { writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)}); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java ---------------------------------------------------------------------- diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java index 30aa415..7a764dd 100644 --- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java +++ b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java @@ -33,6 +33,7 @@ import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper; import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; @@ -176,7 +177,7 @@ public class SearchRequestHandler { if (uniqueSegments.get(segmentId) == null) { segments.add(Segment.toSegment(segmentId, new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(), - loadMetadataDetails))); + loadMetadataDetails, FileFactory.getConfiguration()))); uniqueSegments.put(segmentId, 1); } else { uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1);
