http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java index ff2ffdd..809d68b 100644 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java @@ -29,6 +29,7 @@ import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.DataMapModel; import org.apache.carbondata.core.datamap.dev.DataMapWriter; import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainDataMap; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.features.TableOperation; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; @@ -53,8 +54,8 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer, getDataMapSchema()); try { dataMap.init(new DataMapModel( - DataMapWriter.getDefaultDataMapPath( - tableIdentifier.getTablePath(), segment.getSegmentNo(), dataMapName))); + DataMapWriter.getDefaultDataMapPath(tableIdentifier.getTablePath(), + segment.getSegmentNo(), dataMapName), segment.getConfiguration())); } catch (MemoryException e) { LOGGER.error("failed to get lucene datamap , detail is {}" + e.getMessage()); return lstDataMap; @@ -73,7 +74,7 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer, getDataMapSchema()); String indexPath = ((LuceneDataMapDistributable) distributable).getIndexPath(); try { - dataMap.init(new DataMapModel(indexPath)); + dataMap.init(new DataMapModel(indexPath, FileFactory.getConfiguration())); } catch (MemoryException e) { LOGGER.error(String.format("failed to get lucene datamap , detail is %s", e.getMessage())); return lstDataMap;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/docs/sdk-guide.md ---------------------------------------------------------------------- diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md index c80cc75..9c1e18d 100644 --- a/docs/sdk-guide.md +++ b/docs/sdk-guide.md @@ -383,11 +383,12 @@ public CarbonWriterBuilder withTableProperties(Map<String, String> options); * this writer is not thread safe, use buildThreadSafeWriterForCSVInput in multi thread environment * Build a {@link CarbonWriter}, which accepts row in CSV format object * @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema} +* @param configuration hadoop configuration object. * @return CSVCarbonWriter * @throws IOException * @throws InvalidLoadOptionException */ -public CarbonWriter buildWriterForCSVInput(org.apache.carbondata.sdk.file.Schema schema) throws IOException, InvalidLoadOptionException; +public CarbonWriter buildWriterForCSVInput(org.apache.carbondata.sdk.file.Schema schema, Configuration configuration) throws IOException, InvalidLoadOptionException; ``` ``` @@ -395,12 +396,13 @@ public CarbonWriter buildWriterForCSVInput(org.apache.carbondata.sdk.file.Schema * Can use this writer in multi-thread instance. * Build a {@link CarbonWriter}, which accepts row in CSV format * @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema} -* @param numOfThreads number of threads() in which .write will be called. +* @param numOfThreads number of threads() in which .write will be called. +* @param configuration hadoop configuration object * @return CSVCarbonWriter * @throws IOException * @throws InvalidLoadOptionException */ -public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfThreads) +public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfThreads, Configuration configuration) throws IOException, InvalidLoadOptionException; ``` @@ -410,11 +412,12 @@ public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfT * this writer is not thread safe, use buildThreadSafeWriterForAvroInput in multi thread environment * Build a {@link CarbonWriter}, which accepts Avro format object * @param avroSchema avro Schema object {org.apache.avro.Schema} +* @param configuration hadoop configuration object * @return AvroCarbonWriter * @throws IOException * @throws InvalidLoadOptionException */ -public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema schema) throws IOException, InvalidLoadOptionException; +public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema schema, Configuration configuration) throws IOException, InvalidLoadOptionException; ``` ``` @@ -423,11 +426,13 @@ public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema schema) throw * Build a {@link CarbonWriter}, which accepts Avro object * @param avroSchema avro Schema object {org.apache.avro.Schema} * @param numOfThreads number of threads() in which .write will be called. +* @param configuration hadoop configuration object * @return AvroCarbonWriter * @throws IOException * @throws InvalidLoadOptionException */ -public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema avroSchema, short numOfThreads) +public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema avroSchema, short +numOfThreads, Configuration configuration) throws IOException, InvalidLoadOptionException ``` @@ -437,11 +442,12 @@ public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema avr * this writer is not thread safe, use buildThreadSafeWriterForJsonInput in multi thread environment * Build a {@link CarbonWriter}, which accepts Json object * @param carbonSchema carbon Schema object +* @param configuration hadoop configuration object * @return JsonCarbonWriter * @throws IOException * @throws InvalidLoadOptionException */ -public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema); +public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema, Configuration configuration); ``` ``` @@ -450,11 +456,12 @@ public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema); * Build a {@link CarbonWriter}, which accepts Json object * @param carbonSchema carbon Schema object * @param numOfThreads number of threads() in which .write will be called. +* @param configuration hadoop configuraiton object. * @return JsonCarbonWriter * @throws IOException * @throws InvalidLoadOptionException */ -public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema carbonSchema, short numOfThreads) +public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema carbonSchema, short numOfThreads, Configuration configuration) ``` ### Class org.apache.carbondata.sdk.file.CarbonWriter http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java index ada1a8c..4eec4bf 100644 --- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java +++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java @@ -23,6 +23,7 @@ import java.sql.Date; import java.sql.Timestamp; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.sdk.file.CarbonReader; @@ -55,7 +56,7 @@ public class CarbonReaderExample { CarbonWriter writer = CarbonWriter.builder() .outputPath(path) - .buildWriterForCSVInput(new Schema(fields)); + .buildWriterForCSVInput(new Schema(fields), new Configuration(false)); for (int i = 0; i < 10; i++) { String[] row2 = new String[]{ @@ -98,7 +99,7 @@ public class CarbonReaderExample { CarbonReader reader = CarbonReader .builder(path, "_temp") .projection(strings) - .build(); + .build(new Configuration(false)); System.out.println("\nData:"); long day = 24L * 3600 * 1000; @@ -116,7 +117,7 @@ public class CarbonReaderExample { // Read data CarbonReader reader2 = CarbonReader .builder(path, "_temp") - .build(); + .build(new Configuration(false)); System.out.println("\nData:"); i = 0; http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java index 52d51b5..3abc342 100644 --- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java +++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java @@ -25,6 +25,8 @@ import org.apache.carbondata.core.scan.expression.LiteralExpression; import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression; import org.apache.carbondata.sdk.file.*; +import org.apache.hadoop.conf.Configuration; + /** * Example for testing CarbonWriter on S3 */ @@ -56,7 +58,7 @@ public class SDKS3Example { .setEndPoint(args[2]) .outputPath(path); - CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields)); + CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), new Configuration(false)); for (int i = 0; i < num; i++) { writer.write(new String[]{"robot" + (i % 10), String.valueOf(i)}); @@ -75,7 +77,7 @@ public class SDKS3Example { .setAccessKey(args[0]) .setSecretKey(args[1]) .setEndPoint(args[2]) - .build(); + .build(new Configuration(false)); System.out.println("\nData:"); int i = 0; http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala index a011d80..86bf854 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala @@ -20,6 +20,7 @@ package org.apache.carbondata.examples import java.io.File import org.apache.commons.io.FileUtils +import org.apache.spark.sql.SparkSession import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.examples.util.ExampleUtils @@ -36,7 +37,7 @@ object DirectSQLExample { def buildTestData( path: String, num: Int = 3, - persistSchema: Boolean = false): Any = { + persistSchema: Boolean = false, sparkSession: SparkSession): Any = { // getCanonicalPath gives path with \, but the code expects /. val writerPath = path.replace("\\", "/"); @@ -56,7 +57,8 @@ object DirectSQLExample { if (persistSchema) { builder.persistSchemaFile(true) } - val writer = builder.buildWriterForCSVInput(new Schema(fields)) + val writer = builder + .buildWriterForCSVInput(new Schema(fields), sparkSession.sparkContext.hadoopConfiguration) var i = 0 while (i < num) { writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) @@ -82,7 +84,7 @@ object DirectSQLExample { import carbonSession._ // 1. generate data file cleanTestData(path) - buildTestData(path, 20) + buildTestData(path, 20, sparkSession = carbonSession) val readPath = path + "Fact/Part0/Segment_null" println("Running SQL on carbon files directly") http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala index 1795960..c5c9710 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala @@ -16,6 +16,7 @@ */ package org.apache.carbondata.examples +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY} import org.apache.spark.sql.SparkSession import org.slf4j.{Logger, LoggerFactory} @@ -52,12 +53,12 @@ object S3UsingSDKExample { builder.outputPath(writerPath).isTransactionalTable(true) .uniqueIdentifier( System.currentTimeMillis) - .buildWriterForCSVInput(new Schema(fields)) + .buildWriterForCSVInput(new Schema(fields), new Configuration(false)) } else { builder.outputPath(writerPath).isTransactionalTable(true) .uniqueIdentifier( System.currentTimeMillis).withBlockSize(2) - .buildWriterForCSVInput(new Schema(fields)) + .buildWriterForCSVInput(new Schema(fields), new Configuration(false)) } var i = 0 var row = num http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java index 9e5edc1..fcfb346 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java @@ -113,11 +113,14 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se ReadCommittedScope readCommittedScope = null; if (carbonTable.isTransactionalTable()) { readCommittedScope = new LatestFilesReadCommittedScope( - identifier.getTablePath() + "/Fact/Part0/Segment_null/"); + identifier.getTablePath() + "/Fact/Part0/Segment_null/", job.getConfiguration()); } else { readCommittedScope = getReadCommittedScope(job.getConfiguration()); if (readCommittedScope == null) { - readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath()); + readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath(), job + .getConfiguration()); + } else { + readCommittedScope.setConfiguration(job.getConfiguration()); } } // this will be null in case of corrupt schema file. http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java index e5e3165..eb9ff7c 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java @@ -188,8 +188,8 @@ public class CarbonOutputCommitter extends FileOutputCommitter { context.getConfiguration().get(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, ""); List<Segment> segmentDeleteList = Segment.toSegmentList(segmentsToBeDeleted.split(","), null); Set<Segment> segmentSet = new HashSet<>( - new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier()) - .getValidAndInvalidSegments().getValidSegments()); + new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(), + context.getConfiguration()).getValidAndInvalidSegments().getValidSegments()); if (updateTime != null) { CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, updateTime, true, segmentDeleteList); @@ -223,8 +223,8 @@ public class CarbonOutputCommitter extends FileOutputCommitter { if (partitionSpecs != null && partitionSpecs.size() > 0) { List<Segment> validSegments = - new SegmentStatusManager(table.getAbsoluteTableIdentifier()).getValidAndInvalidSegments() - .getValidSegments(); + new SegmentStatusManager(table.getAbsoluteTableIdentifier()) + .getValidAndInvalidSegments().getValidSegments(); String uniqueId = String.valueOf(System.currentTimeMillis()); List<String> tobeUpdatedSegs = new ArrayList<>(); List<String> tobeDeletedSegs = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index ba6e043..ba3accf 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -143,7 +143,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { List<UpdateVO> invalidTimestampsList = new ArrayList<>(); List<Segment> streamSegments = null; // get all valid segments and set them into the configuration - SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier, + readCommittedScope.getConfiguration()); SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager .getValidAndInvalidSegments(loadMetadataDetails, this.readCommittedScope); @@ -583,7 +584,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager( table, loadMetadataDetails); SegmentStatusManager.ValidAndInvalidSegmentsInfo allSegments = - new SegmentStatusManager(identifier) + new SegmentStatusManager(identifier, readCommittedScope.getConfiguration()) .getValidAndInvalidSegments(loadMetadataDetails, readCommittedScope); Map<String, Long> blockRowCountMapping = new HashMap<>(); Map<String, Long> segmentAndBlockCountMapping = new HashMap<>(); @@ -649,11 +650,12 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { if (readCommittedScope == null) { ReadCommittedScope readCommittedScope; if (job.getConfiguration().getBoolean(CARBON_TRANSACTIONAL_TABLE, true)) { - readCommittedScope = new TableStatusReadCommittedScope(identifier); + readCommittedScope = new TableStatusReadCommittedScope(identifier, job.getConfiguration()); } else { readCommittedScope = getReadCommittedScope(job.getConfiguration()); if (readCommittedScope == null) { - readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath()); + readCommittedScope = + new LatestFilesReadCommittedScope(identifier.getTablePath(), job.getConfiguration()); } } this.readCommittedScope = readCommittedScope; http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index 99d8532..2d4f370 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -270,6 +270,8 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false); throw new RuntimeException(e); + } finally { + ThreadLocalSessionInfo.unsetAll(); } } }); @@ -444,6 +446,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje } finally { executorService.shutdownNow(); dataLoadExecutor.close(); + ThreadLocalSessionInfo.unsetAll(); // clean up the folders and files created locally for data load operation TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala index de0d731..40a0a62 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala @@ -112,7 +112,7 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach { .outputPath(writerPath) .isTransactionalTable(false) .uniqueIdentifier(System.currentTimeMillis) - .buildWriterForCSVInput(Schema.parseJson(schema)) + .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext.hadoopConfiguration) } else { if (options != null) { builder.outputPath(writerPath) @@ -120,14 +120,15 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach { .sortBy(sortColumns.toArray) .uniqueIdentifier( System.currentTimeMillis).withBlockSize(2).withLoadOptions(options) - .buildWriterForCSVInput(Schema.parseJson(schema)) + .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext.hadoopConfiguration) } else { builder.outputPath(writerPath) .isTransactionalTable(false) .sortBy(sortColumns.toArray) .uniqueIdentifier( System.currentTimeMillis).withBlockSize(2) - .buildWriterForCSVInput(Schema.parseJson(schema)) + .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext + .hadoopConfiguration) } } var i = 0 @@ -544,7 +545,7 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach { try { val writer = CarbonWriter.builder .outputPath(writerPath).isTransactionalTable(false) - .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn) + .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) var i = 0 while (i < rows) { writer.write(record) @@ -743,7 +744,7 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach { .toString() val builder = CarbonWriter.builder() val writer = builder.outputPath(writerPath) - .buildWriterForCSVInput(Schema.parseJson(schema)) + .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext.hadoopConfiguration) for (i <- 0 until 5) { writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(33000), i.toString)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala index a1d4290..63fb2e6 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala @@ -21,6 +21,7 @@ import java.util import scala.collection.JavaConverters._ +import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.{CarbonEnv, Row} import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.test.util.QueryTest @@ -35,6 +36,7 @@ import org.apache.carbondata.core.indexstore.Blocklet import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension +import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope import org.apache.carbondata.core.scan.expression.conditional.NotEqualsExpression import org.apache.carbondata.core.scan.expression.logical.AndExpression import org.apache.carbondata.core.scan.expression.{ColumnExpression, LiteralExpression} @@ -304,7 +306,8 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be val resolveFilter: FilterResolverIntf = CarbonTable.resolveFilter(andExpression, carbonTable.getAbsoluteTableIdentifier) val exprWrapper = DataMapChooser.getDefaultDataMap(carbonTable, resolveFilter) - val segment = new Segment("0") + val segment = new Segment("0", new TableStatusReadCommittedScope(carbonTable + .getAbsoluteTableIdentifier, new Configuration(false))) // get the pruned blocklets val prunedBlocklets = exprWrapper.prune(List(segment).asJava, null) prunedBlocklets.asScala.foreach { blocklet => http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala index 08daa34..1b181bc 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala @@ -59,9 +59,11 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be val writer = if (persistSchema) { builder.persistSchemaFile(true) - builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema)) + builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), + sqlContext.sparkContext.hadoopConfiguration) } else { - builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema)) + builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), + sqlContext.sparkContext.hadoopConfiguration) } var i = 0 http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala index 0b6813f..a03a5eb 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala @@ -142,7 +142,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { .outputPath(writerPath) .isTransactionalTable(false) .uniqueIdentifier(System.currentTimeMillis) - .buildWriterForCSVInput(Schema.parseJson(schema)) + .buildWriterForCSVInput(Schema.parseJson(schema), + sqlContext.sparkContext.hadoopConfiguration) } else { if (options != null) { builder.outputPath(writerPath) @@ -150,14 +151,16 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { .sortBy(sortColumns.toArray) .uniqueIdentifier( System.currentTimeMillis).withBlockSize(2).withLoadOptions(options) - .buildWriterForCSVInput(Schema.parseJson(schema)) + .buildWriterForCSVInput(Schema.parseJson(schema), + sqlContext.sparkContext.hadoopConfiguration) } else { builder.outputPath(writerPath) .isTransactionalTable(false) .sortBy(sortColumns.toArray) .uniqueIdentifier( System.currentTimeMillis).withBlockSize(2) - .buildWriterForCSVInput(Schema.parseJson(schema)) + .buildWriterForCSVInput(Schema.parseJson(schema), + sqlContext.sparkContext.hadoopConfiguration) } } var i = 0 @@ -194,7 +197,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { builder.outputPath(writerPath) .isTransactionalTable(false) .uniqueIdentifier(System.currentTimeMillis()).withBlockSize(2).sortBy(sortColumns) - .buildWriterForCSVInput(new Schema(fields)) + .buildWriterForCSVInput(new Schema(fields), + sqlContext.sparkContext.hadoopConfiguration) var i = 0 while (i < rows) { @@ -228,7 +232,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { .sortBy(sortColumns.toArray) .uniqueIdentifier( 123).withBlockSize(2) - .buildWriterForCSVInput(Schema.parseJson(schema)) + .buildWriterForCSVInput(Schema.parseJson(schema), + sqlContext.sparkContext.hadoopConfiguration) var i = 0 while (i < rows) { writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) @@ -992,7 +997,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val builder: CarbonWriterBuilder = CarbonWriter.builder .outputPath(writerPath).isTransactionalTable(false).withLoadOptions(options) - val writer: CarbonWriter = builder.buildWriterForCSVInput(new Schema(fields)) + val writer: CarbonWriter = builder.buildWriterForCSVInput(new Schema(fields), + sqlContext.sparkContext.hadoopConfiguration) writer.write(Array("babu","1","02-01-2002","02-01-2002 01:01:00")); writer.close() @@ -1117,7 +1123,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { try { val writer = CarbonWriter.builder .outputPath(writerPath).isTransactionalTable(false) - .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn) + .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn, + sqlContext.sparkContext.hadoopConfiguration) var i = 0 while (i < rows) { writer.write(record) @@ -2091,7 +2098,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { assert(intercept[RuntimeException] { val writer = CarbonWriter.builder.sortBy(Array("name", "id")) - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, + sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() }.getMessage.toLowerCase.contains("column: name specified in sort columns")) @@ -2131,7 +2139,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val record = testUtil.jsonToAvro(json1, schema1) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, + sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() } @@ -2169,7 +2178,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val record = testUtil.jsonToAvro(json1, schema1) val writer = CarbonWriter.builder.sortBy(Array("id")) - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, + sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() } @@ -2213,7 +2223,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val record = testUtil.jsonToAvro(json1, schema) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, + sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() } @@ -2253,7 +2264,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val record = testUtil.jsonToAvro(json1, schema1) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, + sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() sql( @@ -2299,7 +2311,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val record = testUtil.jsonToAvro(json1, schema1) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, + sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() sql( @@ -2346,7 +2359,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, + sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() sql( @@ -2366,7 +2380,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val writer: CarbonWriter = CarbonWriter.builder .outputPath(writerPath) .withTableProperties(options) - .buildWriterForCSVInput(new Schema(fields)) + .buildWriterForCSVInput(new Schema(fields), sqlContext.sparkContext.hadoopConfiguration) writer.write(Array("carbon", "1")) writer.write(Array("hydrogen", "10")) writer.write(Array("boron", "4")) @@ -2384,7 +2398,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { // write local sort data val writer1: CarbonWriter = CarbonWriter.builder .outputPath(writerPath) - .buildWriterForCSVInput(new Schema(fields)) + .buildWriterForCSVInput(new Schema(fields), sqlContext.sparkContext.hadoopConfiguration) writer1.write(Array("carbon", "1")) writer1.write(Array("hydrogen", "10")) writer1.write(Array("boron", "4")) @@ -2493,7 +2507,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { fields(0) = new Field("name", DataTypes.STRING) fields(1) = new Field("surname", DataTypes.STRING) fields(2) = new Field("age", DataTypes.INT) - val carbonWriter = builder.buildWriterForCSVInput(new Schema(fields)) + val carbonWriter = builder.buildWriterForCSVInput(new Schema(fields), + sqlContext.sparkContext.hadoopConfiguration) var i = 0 while (i < 100) { { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala index ff5c062..17aae1d 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala @@ -98,7 +98,8 @@ class TestNonTransactionalCarbonTableJsonWriter extends QueryTest with BeforeAnd .outputPath(writerPath).isTransactionalTable(false) .uniqueIdentifier(System.currentTimeMillis()) .withLoadOptions(options) - .buildWriterForJsonInput(carbonSchema) + .buildWriterForJsonInput(carbonSchema, + sqlContext.sparkContext.hadoopConfiguration) writer.write(jsonRow) writer.close() } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala index dc13b16..e7fcf95 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala @@ -93,7 +93,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val record = testUtil.jsonToAvro(json1, schema1) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() sql( @@ -140,7 +140,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val record = testUtil.jsonToAvro(json1, mySchema) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() sql( @@ -184,7 +184,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val record = testUtil.jsonToAvro(json, mySchema) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() sql( @@ -215,7 +215,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val record = testUtil.jsonToAvro(json1, schema1) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() sql( @@ -245,7 +245,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val record = testUtil.jsonToAvro(json1, schema1) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() sql( @@ -275,7 +275,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val record = testUtil.jsonToAvro(json1, schema1) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() sql( @@ -305,7 +305,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val record = testUtil.jsonToAvro(json1, schema1) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() sql( @@ -335,7 +335,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val record = testUtil.jsonToAvro(json1, schema1) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() sql( @@ -365,7 +365,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val record = testUtil.jsonToAvro(json1, schema1) val exception1 = intercept[UnsupportedOperationException] { val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() } @@ -402,7 +402,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val record = testUtil.jsonToAvro(json1, schema1) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() sql( @@ -438,7 +438,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val record = testUtil.jsonToAvro(json1, schema1) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() sql( @@ -476,7 +476,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val record = testUtil.jsonToAvro(json1, schema1) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() sql( @@ -509,7 +509,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val record = testUtil.jsonToAvro(json1, schema1) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() sql( @@ -568,7 +568,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val record = testUtil.jsonToAvro(json1, schema1) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() sql( @@ -663,7 +663,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val record = testUtil.jsonToAvro(json1, schema1) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() sql( @@ -716,7 +716,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(avroRec) writer.close() sql( @@ -786,7 +786,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef avroRec.put("struct_field_decimal", genericByteArray) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(avroRec) writer.close() sql( @@ -858,7 +858,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef avroRec.put("dec_fields", genericByteArray) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(avroRec) writer.close() sql( @@ -905,7 +905,8 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false) + .buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() sql( @@ -952,7 +953,8 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false) + .buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() sql( @@ -997,7 +999,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val avroRec = new GenericData. Record(nn) avroRec.put("id", bytes1) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(avroRec) writer.close() sql( @@ -1042,7 +1044,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val avroRec = new GenericData. Record(nn) avroRec.put("dec_field", bytes1) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(avroRec) writer.close() sql( @@ -1087,7 +1089,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val avroRec = new GenericData. Record(nn) avroRec.put("dec_field", bytes1) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(avroRec) writer.close() sql( @@ -1128,7 +1130,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val avroRec = new GenericData. Record(nn) avroRec.put("dec_field", bytes) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(avroRec) writer.close() sql( @@ -1170,7 +1172,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef avroRec.put("dec_field", bytes) val exception1 = intercept[Exception] { val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(avroRec) writer.close() } @@ -1220,7 +1222,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val record = testUtil.jsonToAvro(json1, schema1) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() sql( @@ -1256,7 +1258,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val record = testUtil.jsonToAvro(json1, schema1) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(record) writer.close() sql( @@ -1306,7 +1308,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) writer.write(avroRec) writer.close() sql(s"create table sdkOutputTable(union_field struct<union_field0:decimal(10,2),union_field1:int>) " + http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala index 62ba03e..0421ea8 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala @@ -68,11 +68,13 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo CarbonWriter.builder .outputPath(writerPath).isTransactionalTable(false).enableLocalDictionary(true) .localDictionaryThreshold(2000) - .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn) + .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn, + sqlContext.sparkContext.hadoopConfiguration) } else { CarbonWriter.builder .outputPath(writerPath).isTransactionalTable(false) - .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn) + .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn, + sqlContext.sparkContext.hadoopConfiguration) } var i = 0 while (i < rows) { @@ -268,7 +270,8 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo """.stripMargin val pschema= org.apache.avro.Schema.parse(mySchema) val records = testUtil.jsonToAvro(jsonvalue, mySchema) - val writer=CarbonWriter.builder().outputPath(writerPath).buildWriterForAvroInput(pschema) + val writer=CarbonWriter.builder().outputPath(writerPath).buildWriterForAvroInput(pschema, + sqlContext.sparkContext.hadoopConfiguration) writer.write(records) writer.close() sql("DROP TABLE IF EXISTS sdkOutputTable") http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala index 4e2197d..a8bdb31 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala @@ -20,6 +20,7 @@ package org.apache.carbondata.spark.testsuite.createTable import java.io.File import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.SparkSession import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -58,9 +59,11 @@ object TestSparkCarbonFileFormatWithSparkSession { val writer = if (persistSchema) { builder.persistSchemaFile(true) - builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema)) + builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), new + Configuration(false)) } else { - builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema)) + builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), new + Configuration(false)) } var i = 0 http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala index 39785a3..6e8e79b 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala @@ -18,9 +18,10 @@ package org.apache.carbondata.spark.testsuite.dataload import scala.collection.JavaConverters._ - import java.io.{File, FilenameFilter} +import org.apache.hadoop.conf.Configuration + import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.reader.CarbonIndexFileReader import org.apache.carbondata.core.util.CarbonProperties @@ -64,7 +65,7 @@ class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll { } else { val segment = Segment.getSegment("0", carbonTable.getTablePath) val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName) - store.readIndexFiles() + store.readIndexFiles(new Configuration(false)) store.getIndexCarbonFiles.asScala.map(f => new File(f.getAbsolutePath)).toArray } for (carbonIndexPath <- carbonIndexPaths) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala index 0c42264..44bc243 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import java.io.{File, FileWriter} import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties @@ -281,7 +282,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo } else { val segment = Segment.getSegment("0", carbonTable.getTablePath) val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName) - store.readIndexFiles() + store.readIndexFiles(new Configuration(false)) val size = store.getIndexFilesMap.asScala.map(f => f._2.size()).sum assertResult(Math.max(4, defaultParallelism) + 1)(size + store.getIndexFilesMap.size()) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala index 046a2a6..a4bc6f0 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala @@ -22,12 +22,13 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream +import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Segment} -import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapBuilder, DataMapWriter} +import org.apache.carbondata.core.datamap.dev.{DataMapBuilder, DataMapModel, DataMapWriter} import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, CoarseGrainDataMapFactory} import org.apache.carbondata.core.datastore.FileReader import org.apache.carbondata.core.datastore.block.SegmentProperties @@ -71,7 +72,7 @@ class CGDataMapFactory( val files = file.listFiles() files.map {f => val dataMap: CoarseGrainDataMap = new CGDataMap() - dataMap.init(new DataMapModel(f.getCanonicalPath)) + dataMap.init(new DataMapModel(f.getCanonicalPath, new Configuration(false))) dataMap }.toList.asJava } @@ -83,7 +84,8 @@ class CGDataMapFactory( override def getDataMaps(distributable: DataMapDistributable): java.util.List[CoarseGrainDataMap] = { val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable] val dataMap: CoarseGrainDataMap = new CGDataMap() - dataMap.init(new DataMapModel(mapDistributable.getFilePath)) + dataMap.init(new DataMapModel(mapDistributable.getFilePath, new + Configuration(false))) Seq(dataMap).asJava } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala index b13582b..57b3672 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala @@ -22,12 +22,14 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream +import org.apache.hadoop.conf.Configuration + import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Segment} -import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapBuilder, DataMapWriter} +import org.apache.carbondata.core.datamap.dev.{DataMapBuilder, DataMapModel, DataMapWriter} import org.apache.carbondata.core.datamap.dev.fgdatamap.{FineGrainBlocklet, FineGrainDataMap, FineGrainDataMapFactory} import org.apache.carbondata.core.datastore.FileReader import org.apache.carbondata.core.datastore.block.SegmentProperties @@ -68,7 +70,7 @@ class FGDataMapFactory(carbonTable: CarbonTable, val files = file.listFiles() files.map { f => val dataMap: FineGrainDataMap = new FGDataMap() - dataMap.init(new DataMapModel(f.getCanonicalPath)) + dataMap.init(new DataMapModel(f.getCanonicalPath, new Configuration(false))) dataMap }.toList.asJava } @@ -79,7 +81,7 @@ class FGDataMapFactory(carbonTable: CarbonTable, override def getDataMaps(distributable: DataMapDistributable): java.util.List[FineGrainDataMap]= { val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable] val dataMap: FineGrainDataMap = new FGDataMap() - dataMap.init(new DataMapModel(mapDistributable.getFilePath)) + dataMap.init(new DataMapModel(mapDistributable.getFilePath, new Configuration(false))) Seq(dataMap).asJava } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala index 8ebed1f..edd3e9c 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala @@ -18,9 +18,9 @@ package org.apache.carbondata.spark.testsuite.datamap import scala.collection.JavaConverters._ - import java.io.{File, FilenameFilter} +import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.Row import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll @@ -337,7 +337,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { } else { val segment = Segment.getSegment("0", path) val store = new SegmentFileStore(path, segment.getSegmentFileName) - store.readIndexFiles() + store.readIndexFiles(new Configuration(false)) val size = store.getIndexFilesMap.asScala.map(f => f._2.size()).sum assertResult(true)(size > 0) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala index 133454a..f4c725e 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala @@ -18,6 +18,7 @@ package org.apache.carbondata.spark.testsuite.partition import scala.collection.JavaConverters._ +import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.Row import org.apache.spark.sql.test.TestQueryExecutor import org.scalatest.BeforeAndAfterAll @@ -78,7 +79,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll } else { val segment = Segment.getSegment(segmentId, carbonTable.getTablePath) val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName) - store.readIndexFiles() + store.readIndexFiles(new Configuration(false)) store.getIndexFilesMap.asScala.flatMap(_._2.asScala).map(f => FileFactory.getCarbonFile(f)).toArray } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala index e3e8e68..9a0080c 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala @@ -21,6 +21,7 @@ import java.util import java.util.concurrent.{Callable, ExecutorService, Executors} import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan import org.apache.spark.sql.optimizer.CarbonFilters @@ -352,7 +353,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_mergeindexpartitionthree") val details = SegmentStatusManager.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)) val store = new SegmentFileStore(carbonTable.getTablePath, details(0).getSegmentFile) - store.readIndexFiles() + store.readIndexFiles(new Configuration(false)) store.getIndexFiles assert(store.getIndexFiles.size() == 10) CarbonProperties.getInstance() http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala index af05613..3a650ec 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala +++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala @@ -117,7 +117,7 @@ class CarbonFileIndex( } CarbonInputFormat.setReadCommittedScope( hadoopConf, - new LatestFilesReadCommittedScope(indexFiles)) + new LatestFilesReadCommittedScope(indexFiles, hadoopConf)) filter match { case Some(c) => CarbonInputFormat.setFilterPredicates(hadoopConf, c) case None => None http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala index a5e1b39..62d9903 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala +++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala @@ -78,9 +78,10 @@ class SparkCarbonFileFormat extends FileFormat override def inferSchema(sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { + val conf = sparkSession.sessionState.newHadoopConf() val tablePath = options.get("path") match { case Some(path) => - FileFactory.checkAndAppendDefaultFs(path, sparkSession.sparkContext.hadoopConfiguration) + FileFactory.checkAndAppendDefaultFs(path, conf) case _ if files.nonEmpty => FileFactory.getUpdatedFilePath(files.head.getPath.getParent.toUri.toString) case _ => @@ -89,7 +90,8 @@ class SparkCarbonFileFormat extends FileFormat if (options.get(CarbonCommonConstants.SORT_COLUMNS).isDefined) { throw new UnsupportedOperationException("Cannot use sort columns during infer schema") } - val tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "", ""), false) + val tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "", ""), + false, conf) val table = CarbonTable.buildFromTableInfo(tableInfo) var schema = new StructType val fields = tableInfo.getFactTable.getListOfColumns.asScala.map { col => @@ -395,7 +397,7 @@ class SparkCarbonFileFormat extends FileFormat vectorizedReader } else { val reader = new CarbonRecordReader(model, - new SparkUnsafeRowReadSuport(requiredSchema), null) + new SparkUnsafeRowReadSuport(requiredSchema), broadcastedHadoopConf.value.value) reader.initialize(split, hadoopAttemptContext) reader } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala index 66c0224..825cdec 100644 --- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala +++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala @@ -868,7 +868,7 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll { builder.outputPath(writerPath) .isTransactionalTable(false) .uniqueIdentifier(System.nanoTime()).withBlockSize(2).sortBy(sortColumns) - .buildWriterForCSVInput(new Schema(fields)) + .buildWriterForCSVInput(new Schema(fields), spark.sparkContext.hadoopConfiguration) var i = 0 while (i < rows) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala index 26f67f8..43f04b8 100644 --- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala +++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala @@ -22,8 +22,8 @@ import java.io.File import org.apache.commons.io.FileUtils import org.apache.commons.lang.RandomStringUtils import org.scalatest.{BeforeAndAfterAll, FunSuite} -import org.apache.spark.sql.carbondata.datasource.TestUtil._ import org.apache.spark.util.SparkUtil +import org.apache.spark.sql.carbondata.datasource.TestUtil.{spark, _} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.filesystem.CarbonFile @@ -70,9 +70,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA val writer = if (persistSchema) { builder.persistSchemaFile(true) - builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema)) + builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), spark + .sparkContext.hadoopConfiguration) } else { - builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema)) + builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), spark + .sparkContext.hadoopConfiguration) } var i = 0 @@ -333,7 +335,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA .toString() val builder = CarbonWriter.builder() val writer = builder.outputPath(writerPath) - .buildWriterForCSVInput(Schema.parseJson(schema)) + .buildWriterForCSVInput(Schema.parseJson(schema), spark.sessionState.newHadoopConf()) for (i <- 0 until 3) { // write a varchar with 75,000 length writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(75000), i.toString)) @@ -348,15 +350,12 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA s"""CREATE TABLE sdkOutputTable (name string, address string, age int) |USING carbon OPTIONS (PATH '$writerPath', "long_String_columns" "address") """ .stripMargin) - } else if (spark.sparkContext.version.startsWith("2.2")) { + } else { //data source file format spark.sql( s"""CREATE TABLE sdkOutputTable (name string, address string, age int) USING carbon |OPTIONS("long_String_columns"="address") LOCATION |'$writerPath' """.stripMargin) - } else { - // TODO. spark2.3 ? - assert(false) } assert(spark.sql("select * from sdkOutputTable where age = 0").count() == 1) val op = spark.sql("select address from sdkOutputTable limit 1").collectAsList() @@ -371,14 +370,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA .sql( s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS (PATH |'$writerPath', "long_String_columns" "address") """.stripMargin) - } else if (spark.sparkContext.version.startsWith("2.2")) { + } else { //data source file format spark.sql( s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS |("long_String_columns"="address") LOCATION '$writerPath' """.stripMargin) - } else { - // TODO. spark2.3 ? - assert(false) } assert(spark.sql("select * from sdkOutputTableWithoutSchema where age = 0").count() == 1) val op1 = spark.sql("select address from sdkOutputTableWithoutSchema limit 1").collectAsList() http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 0fd4e34..57887a7 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -436,7 +436,7 @@ object CarbonDataRDDFactory { res.foreach { resultOfSeg => resultSize = resultSize + resultOfSeg.size resultOfSeg.foreach { resultOfBlock => - segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName, null)) + segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName)) } } val segmentFiles = updateSegmentFiles(carbonTable, segmentDetails, updateModel.get) http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala index b77632d..4921b33 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala @@ -159,14 +159,13 @@ object DeleteExecution { resultOfBlock => { if (resultOfBlock._1 == SegmentStatus.SUCCESS) { blockUpdateDetailsList.add(resultOfBlock._2._1) - segmentDetails.add(new Segment(resultOfBlock._2._1.getSegmentName, null)) + segmentDetails.add(new Segment(resultOfBlock._2._1.getSegmentName)) // if this block is invalid then decrement block count in map. if (CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getSegmentStatus)) { CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1, blockMappingVO.getSegmentNumberOfBlockMapping) } - } - else { + } else { // In case of failure , clean all related delete delta files CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp) LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }") http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index e3da86d..2951283 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -1202,7 +1202,7 @@ public final class CarbonDataMergerUtil { segmentUpdateStatusManager.setUpdateStatusDetails(segmentUpdateDetails); CarbonFile[] deleteDeltaFiles = - segmentUpdateStatusManager.getDeleteDeltaFilesList(new Segment(seg, null), blockName); + segmentUpdateStatusManager.getDeleteDeltaFilesList(new Segment(seg), blockName); String destFileName = blockName + "-" + timestamp.toString() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT; http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index 19353d1..f6cc485 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -326,7 +326,7 @@ public final class CarbonLoaderUtil { for (LoadMetadataDetails detail: listOfLoadFolderDetails) { // if the segments is in the list of marked for delete then update the status. - if (segmentsToBeDeleted.contains(new Segment(detail.getLoadName(), null))) { + if (segmentsToBeDeleted.contains(new Segment(detail.getLoadName()))) { detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE); } else if (segmentFilesTobeUpdated .contains(Segment.toSegment(detail.getLoadName(), null))) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java index dd70cc9..a183197 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java @@ -74,8 +74,7 @@ public class AvroCarbonWriter extends CarbonWriter { private static final LogService LOGGER = LogServiceFactory.getLogService(CarbonTable.class.getName()); - AvroCarbonWriter(CarbonLoadModel loadModel) throws IOException { - Configuration hadoopConf = new Configuration(); + AvroCarbonWriter(CarbonLoadModel loadModel, Configuration hadoopConf) throws IOException { CarbonTableOutputFormat.setLoadModel(hadoopConf, loadModel); CarbonTableOutputFormat format = new CarbonTableOutputFormat(); JobID jobId = new JobID(UUID.randomUUID().toString(), 0); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java index 627e060..a8899a7 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java @@ -46,8 +46,7 @@ class CSVCarbonWriter extends CarbonWriter { private TaskAttemptContext context; private ObjectArrayWritable writable; - CSVCarbonWriter(CarbonLoadModel loadModel) throws IOException { - Configuration hadoopConf = new Configuration(); + CSVCarbonWriter(CarbonLoadModel loadModel, Configuration hadoopConf) throws IOException { CarbonTableOutputFormat.setLoadModel(hadoopConf, loadModel); CarbonTableOutputFormat format = new CarbonTableOutputFormat(); JobID jobId = new JobID(UUID.randomUUID().toString(), 0);
