Repository: carbondata Updated Branches: refs/heads/master 28c94183b -> 694ee774c
[CARBONDATA-1880] Combine input small files for GLOBAL_SORT Combine input small files for GLOBAL_SORT to avoid carbon small file issue This closes #1669 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/694ee774 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/694ee774 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/694ee774 Branch: refs/heads/master Commit: 694ee774cb8ca55ee24cd906368c3cf9cc96b0eb Parents: 28c9418 Author: QiangCai <[email protected]> Authored: Fri Dec 15 22:02:28 2017 +0800 Committer: Jacky Li <[email protected]> Committed: Wed Dec 20 14:58:37 2017 +0800 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 4 + .../carbondata/core/util/CarbonProperties.java | 14 ++ .../hadoop/CarbonMultiBlockSplit.java | 17 ++- ...ompactionSupportGlobalSortFunctionTest.scala | 4 +- ...mpactionSupportGlobalSortParameterTest.scala | 5 - .../MajorCompactionIgnoreInMinorTest.scala | 16 +- .../dataload/TestGlobalSortDataLoad.scala | 33 ++++- .../testsuite/datamap/DataMapWriterSuite.scala | 1 + .../load/DataLoadProcessBuilderOnSpark.scala | 148 ++++++++++++++++--- .../load/DataLoadProcessorStepOnSpark.scala | 7 +- .../carbondata/spark/rdd/CarbonScanRDD.scala | 121 +++++++++++---- .../apache/spark/sql/util/CarbonException.scala | 8 +- .../apache/spark/sql/util/SparkSQLUtil.scala | 25 ++++ .../spark/rdd/CarbonDataRDDFactory.scala | 2 +- .../sql/CarbonDatasourceHadoopRelation.scala | 2 +- 15 files changed, 333 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index f67b0c5..9534099 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1277,6 +1277,10 @@ public final class CarbonCommonConstants { public static final String CARBON_CUSTOM_BLOCK_DISTRIBUTION = "carbon.custom.block.distribution"; public static final String CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT = "false"; + @CarbonProperty + public static final String CARBON_COMBINE_SMALL_INPUT_FILES = "carbon.mergeSmallFileRead.enable"; + public static final String CARBON_COMBINE_SMALL_INPUT_FILES_DEFAULT = "false"; + public static final int DICTIONARY_DEFAULT_CARDINALITY = 1; @CarbonProperty public static final String SPARK_SCHEMA_STRING_LENGTH_THRESHOLD = http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index fe396cb..11aea99 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -106,6 +106,7 @@ public final class CarbonProperties { validateLockType(); validateCarbonCSVReadBufferSizeByte(); validateHandoffSize(); + validateCombineSmallInputFiles(); } private void validateCarbonCSVReadBufferSizeByte() { @@ -205,6 +206,19 @@ public final class CarbonProperties { } } + private void validateCombineSmallInputFiles() { + String combineSmallInputFilesStr = + carbonProperties.getProperty(CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES); + boolean isValidBooleanValue = CarbonUtil.validateBoolean(combineSmallInputFilesStr); + if (!isValidBooleanValue) { + LOGGER.warn("The combine small files value \"" + combineSmallInputFilesStr + + "\" is invalid. Using the default value \"" + + CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES_DEFAULT); + carbonProperties.setProperty(CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES, + CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES_DEFAULT); + } + } + private void validateEnableUnsafeSort() { String unSafeSortStr = carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT); boolean isValidBooleanValue = CarbonUtil.validateBoolean(unSafeSortStr); http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java index 96fe909..aed3449 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java @@ -47,15 +47,19 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable { private FileFormat fileFormat = FileFormat.COLUMNAR_V3; + private long length; + public CarbonMultiBlockSplit() { splitList = null; locations = null; + length = 0; } public CarbonMultiBlockSplit(AbsoluteTableIdentifier identifier, List<CarbonInputSplit> splitList, String[] locations) throws IOException { this.splitList = splitList; this.locations = locations; + calculateLength(); } public CarbonMultiBlockSplit(AbsoluteTableIdentifier identifier, List<CarbonInputSplit> splitList, @@ -63,6 +67,7 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable { this.splitList = splitList; this.locations = locations; this.fileFormat = fileFormat; + calculateLength(); } /** @@ -75,11 +80,19 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable { @Override public long getLength() throws IOException, InterruptedException { + return length; + } + + public void setLength(long length) { + this.length = length; + } + + private void calculateLength() { long total = 0; - for (InputSplit split: splitList) { + for (CarbonInputSplit split : splitList) { total += split.getLength(); } - return total; + length = total; } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala index 4958f55..9014edb 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala @@ -439,7 +439,7 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf } sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'") - assert(getIndexFileCount("compaction_globalsort", "0.1") === 3) + assert(getIndexFileCount("compaction_globalsort", "0.1") === 2) checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(72))) checkAnswer(sql("SELECT * FROM compaction_globalsort order by name, id"), sql("SELECT * FROM carbon_localsort order by name, id")) @@ -454,7 +454,7 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf } sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'") - assert(getIndexFileCount("compaction_globalsort", "0.1") === 3) + assert(getIndexFileCount("compaction_globalsort", "0.1") === 2) checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(72))) checkAnswer(sql("SELECT * FROM compaction_globalsort order by name, id"), sql("SELECT * FROM carbon_localsort order by name, id")) http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala index f9959fa..02c602a 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala @@ -17,8 +17,6 @@ package org.apache.carbondata.spark.testsuite.datacompaction -import java.io.{File, FilenameFilter} - import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties import org.apache.spark.sql.Row @@ -26,10 +24,7 @@ import org.apache.spark.sql.test.util.QueryTest import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore -import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.core.metadata.CarbonMetadata -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.util.path.CarbonStorePath class CompactionSupportGlobalSortParameterTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll { val filePath: String = s"$resourcesPath/globalsort" http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala index ed63fdf..61de615 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala @@ -59,16 +59,16 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll "('DELIMITER'= ',', 'QUOTECHAR'= '\"')" ) // compaction will happen here. - sql("alter table ignoremajor compact 'major'" - ) - sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE ignoremajor OPTIONS" + + sql("alter table ignoremajor compact 'major'") + + sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE ignoremajor OPTIONS" + "('DELIMITER'= ',', 'QUOTECHAR'= '\"')" - ) - sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE ignoremajor OPTIONS" + + ) + sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE ignoremajor OPTIONS" + "('DELIMITER'= ',', 'QUOTECHAR'= '\"')" - ) - sql("alter table ignoremajor compact 'minor'" - ) + ) + sql("alter table ignoremajor compact 'minor'" + ) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala index 6bbc763..9ce9675 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala @@ -17,6 +17,10 @@ package org.apache.carbondata.spark.testsuite.dataload +import java.io.{File, FileWriter} + +import org.apache.commons.io.FileUtils + import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.exception.MalformedCarbonCommandException @@ -105,7 +109,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort " + "OPTIONS('BAD_RECORDS_ACTION'='REDIRECT')") - assert(getIndexFileCount("carbon_globalsort") === 3) + assert(getIndexFileCount("carbon_globalsort") === 2) checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort"), Seq(Row(11))) } @@ -115,7 +119,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort " + "OPTIONS('SINGLE_PASS'='TRUE')") - assert(getIndexFileCount("carbon_globalsort") === 3) + assert(getIndexFileCount("carbon_globalsort") === 2) checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort"), Seq(Row(12))) checkAnswer(sql("SELECT * FROM carbon_globalsort ORDER BY name"), sql("SELECT * FROM carbon_localsort_once ORDER BY name")) @@ -164,7 +168,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort") sql("ALTER TABLE carbon_globalsort COMPACT 'MAJOR'") - assert(getIndexFileCount("carbon_globalsort") === 3) + assert(getIndexFileCount("carbon_globalsort") === 2) checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort"), Seq(Row(24))) checkAnswer(sql("SELECT * FROM carbon_globalsort ORDER BY name, id"), sql("SELECT * FROM carbon_localsort_twice ORDER BY name, id")) @@ -223,7 +227,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort") sql("DELETE FROM carbon_globalsort WHERE id = 1").show - assert(getIndexFileCount("carbon_globalsort") === 3) + assert(getIndexFileCount("carbon_globalsort") === 2) checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort"), Seq(Row(11))) checkAnswer(sql("SELECT * FROM carbon_globalsort ORDER BY name, id"), sql("SELECT * FROM carbon_localsort_delete ORDER BY name, id")) @@ -250,6 +254,27 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo sql("SELECT * FROM carbon_localsort_update ORDER BY name, id")) } + test("LOAD with small files") { + val inputPath = new File("target/small_files").getCanonicalPath + val folder = new File(inputPath) + if (folder.exists()) { + FileUtils.deleteDirectory(folder) + } + folder.mkdir() + for (i <- 0 to 100) { + val file = s"$folder/file$i.csv" + val writer = new FileWriter(file) + writer.write("id,name,city,age\n") + writer.write(s"$i,name_$i,city_$i,${ i % 100 }") + writer.close() + } + sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE carbon_globalsort") + val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbon_globalsort") + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) + val segmentDir = carbonTablePath.getSegmentDir("0", "0") + assertResult(5)(new File(segmentDir).listFiles().length) + } + // ----------------------------------- INSERT INTO ----------------------------------- test("INSERT INTO") { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "GLOBAL_SORT") http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala index c137fc7..f73a202 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala @@ -70,6 +70,7 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll { sqlContext.sparkContext.parallelize(1 to numRows) .map(x => ("a", "b", x)) .toDF("c1", "c2", "c3") + .sort("c3") } def dropTable(): Unit = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index c14e0a7..2537a0c 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -17,17 +17,26 @@ package org.apache.carbondata.spark.load -import java.util.Comparator +import java.text.SimpleDateFormat +import java.util.{Comparator, Date, Locale} + +import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat -import org.apache.spark.{SparkContext, TaskContext} +import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType} +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} +import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl} +import org.apache.spark.TaskContext import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.rdd.NewHadoopRDD -import org.apache.spark.sql.DataFrame +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.command.ExecutionErrors +import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile} +import org.apache.spark.sql.util.SparkSQLUtil.sessionState import org.apache.spark.storage.StorageLevel import org.apache.carbondata.common.logging.LogServiceFactory @@ -36,10 +45,11 @@ import org.apache.carbondata.core.datastore.row.CarbonRow import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus} import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, FailureCauses} -import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, StringArrayWritable} +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters} import org.apache.carbondata.processing.util.CarbonDataProcessorUtil +import org.apache.carbondata.spark.rdd.SerializableConfiguration import org.apache.carbondata.spark.util.CommonUtil /** @@ -49,7 +59,7 @@ object DataLoadProcessBuilderOnSpark { private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) def loadDataUsingGlobalSort( - sc: SparkContext, + sparkSession: SparkSession, dataFrame: Option[DataFrame], model: CarbonLoadModel, hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { @@ -57,21 +67,13 @@ object DataLoadProcessBuilderOnSpark { dataFrame.get.rdd } else { // input data from files - CommonUtil.configureCSVInputFormat(hadoopConf, model) - hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath) val columnCount = model.getCsvHeaderColumns.length - val jobConf = new JobConf(hadoopConf) - SparkHadoopUtil.get.addCredentials(jobConf) - new NewHadoopRDD[NullWritable, StringArrayWritable]( - sc, - classOf[CSVInputFormat], - classOf[NullWritable], - classOf[StringArrayWritable], - jobConf) - .map(x => DataLoadProcessorStepOnSpark.toStringArrayRow(x._2, columnCount)) + csvFileScanRDD(sparkSession, model, hadoopConf) + .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount)) } model.setPartitionId("0") + val sc = sparkSession.sparkContext val modelBroadcast = sc.broadcast(model) val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") @@ -160,4 +162,112 @@ object DataLoadProcessBuilderOnSpark { Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) } } + + /** + * creates a RDD that does reading of multiple CSV files + */ + def csvFileScanRDD( + spark: SparkSession, + model: CarbonLoadModel, + hadoopConf: Configuration + ): RDD[InternalRow] = { + // 1. partition + val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes + val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes + val defaultParallelism = spark.sparkContext.defaultParallelism + CommonUtil.configureCSVInputFormat(hadoopConf, model) + hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath) + val jobConf = new JobConf(hadoopConf) + SparkHadoopUtil.get.addCredentials(jobConf) + val jobContext = new JobContextImpl(jobConf, null) + val inputFormat = new CSVInputFormat() + val rawSplits = inputFormat.getSplits(jobContext).toArray + val splitFiles = rawSplits.map { split => + val fileSplit = split.asInstanceOf[FileSplit] + PartitionedFile( + InternalRow.empty, + fileSplit.getPath.toString, + fileSplit.getStart, + fileSplit.getLength, + fileSplit.getLocations) + }.sortBy(_.length)(implicitly[Ordering[Long]].reverse) + val totalBytes = splitFiles.map(_.length + openCostInBytes).sum + val bytesPerCore = totalBytes / defaultParallelism + + val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + + s"open cost is considered as scanning $openCostInBytes bytes.") + + val partitions = new ArrayBuffer[FilePartition] + val currentFiles = new ArrayBuffer[PartitionedFile] + var currentSize = 0L + + def closePartition(): Unit = { + if (currentFiles.nonEmpty) { + val newPartition = + FilePartition( + partitions.size, + currentFiles.toArray.toSeq) + partitions += newPartition + } + currentFiles.clear() + currentSize = 0 + } + + splitFiles.foreach { file => + if (currentSize + file.length > maxSplitBytes) { + closePartition() + } + // Add the given file to the current partition. + currentSize += file.length + openCostInBytes + currentFiles += file + } + closePartition() + + // 2. read function + val serializableConfiguration = new SerializableConfiguration(jobConf) + val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable { + override def apply(file: PartitionedFile): Iterator[InternalRow] = { + new Iterator[InternalRow] { + val hadoopConf = serializableConfiguration.value + val jobTrackerId: String = { + val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US) + formatter.format(new Date()) + } + val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0) + val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId) + val inputSplit = + new FileSplit(new Path(file.filePath), file.start, file.length, file.locations) + var finished = false + val inputFormat = new CSVInputFormat() + val reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext) + reader.initialize(inputSplit, hadoopAttemptContext) + + override def hasNext: Boolean = { + if (!finished) { + if (reader != null) { + if (reader.nextKeyValue()) { + true + } else { + finished = true + reader.close() + false + } + } else { + finished = true + false + } + } else { + false + } + } + + override def next(): InternalRow = { + new GenericInternalRow(reader.getCurrentValue.get().asInstanceOf[Array[Any]]) + } + } + } + } + new FileScanRDD(spark, readFunction, partitions) + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala index 5e6ba98..154d3ed 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala @@ -23,6 +23,8 @@ import com.univocity.parsers.common.TextParsingException import org.apache.spark.{Accumulator, SparkEnv, TaskContext} import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException @@ -30,7 +32,6 @@ import org.apache.carbondata.core.datastore.row.CarbonRow import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, TableProcessingOperations} import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl -import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl @@ -45,9 +46,9 @@ import org.apache.carbondata.spark.util.Util object DataLoadProcessorStepOnSpark { private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - def toStringArrayRow(row: StringArrayWritable, columnCount: Int): StringArrayRow = { + def toStringArrayRow(row: InternalRow, columnCount: Int): StringArrayRow = { val outRow = new StringArrayRow(new Array[String](columnCount)) - outRow.setValues(row.get()) + outRow.setValues(row.asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[String]]) } def toRDDIterator( http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index e58bfd4..09dbd71 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -31,6 +31,8 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.hive.DistributionUtil +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.util.SparkSQLUtil.sessionState import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -55,7 +57,7 @@ import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl * level filtering in driver side. */ class CarbonScanRDD( - @transient sc: SparkContext, + @transient spark: SparkSession, val columnProjection: CarbonProjection, var filterExpression: Expression, identifier: AbsoluteTableIdentifier, @@ -63,7 +65,7 @@ class CarbonScanRDD( @transient tableInfo: TableInfo, inputMetricsStats: InitInputMetrics, @transient val partitionNames: Seq[String]) - extends CarbonRDDWithTableInfo[InternalRow](sc, Nil, serializedTableInfo) { + extends CarbonRDDWithTableInfo[InternalRow](spark.sparkContext, Nil, serializedTableInfo) { private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "") private val jobTrackerId: String = { @@ -186,31 +188,74 @@ class CarbonScanRDD( } } noOfNodes = nodeBlockMapping.size - } else { - if (CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CARBON_USE_BLOCKLET_DISTRIBUTION, - CarbonCommonConstants.CARBON_USE_BLOCKLET_DISTRIBUTION_DEFAULT).toBoolean) { - // Use blocklet distribution - // Randomize the blocklets for better shuffling - Random.shuffle(splits.asScala).zipWithIndex.foreach { splitWithIndex => - val multiBlockSplit = - new CarbonMultiBlockSplit(identifier, - Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava, - splitWithIndex._1.getLocations) - val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit) - result.add(partition) + } else if (CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_USE_BLOCKLET_DISTRIBUTION, + CarbonCommonConstants.CARBON_USE_BLOCKLET_DISTRIBUTION_DEFAULT).toBoolean) { + // Use blocklet distribution + // Randomize the blocklets for better shuffling + Random.shuffle(splits.asScala).zipWithIndex.foreach { splitWithIndex => + val multiBlockSplit = + new CarbonMultiBlockSplit(identifier, + Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava, + splitWithIndex._1.getLocations) + val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit) + result.add(partition) + } + } else if (CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES, + CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES_DEFAULT).toBoolean) { + + // sort blocks in reverse order of length + val blockSplits = splits + .asScala + .map(_.asInstanceOf[CarbonInputSplit]) + .groupBy(f => f.getBlockPath) + .map { blockSplitEntry => + new CarbonMultiBlockSplit(identifier, + blockSplitEntry._2.asJava, + blockSplitEntry._2.flatMap(f => f.getLocations).distinct.toArray) + }.toArray.sortBy(_.getLength)(implicitly[Ordering[Long]].reverse) + + val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes + val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes + val defaultParallelism = spark.sparkContext.defaultParallelism + val totalBytes = blockSplits.map(_.getLength + openCostInBytes).sum + val bytesPerCore = totalBytes / defaultParallelism + + val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + + s"open cost is considered as scanning $openCostInBytes bytes.") + + val currentFiles = new ArrayBuffer[CarbonMultiBlockSplit] + var currentSize = 0L + + def closePartition(): Unit = { + if (currentFiles.nonEmpty) { + result.add(combineSplits(currentFiles, currentSize, result.size())) } - } else { - // Use block distribution - splits.asScala.map(_.asInstanceOf[CarbonInputSplit]). - groupBy(f => f.getBlockPath).values.zipWithIndex.foreach { splitWithIndex => - val multiBlockSplit = - new CarbonMultiBlockSplit(identifier, - splitWithIndex._1.asJava, - splitWithIndex._1.flatMap(f => f.getLocations).distinct.toArray) - val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit) - result.add(partition) + currentFiles.clear() + currentSize = 0 + } + + blockSplits.foreach { file => + if (currentSize + file.getLength > maxSplitBytes) { + closePartition() } + // Add the given file to the current partition. + currentSize += file.getLength + openCostInBytes + currentFiles += file + } + closePartition() + } else { + // Use block distribution + splits.asScala.map(_.asInstanceOf[CarbonInputSplit]) + .groupBy(f => f.getBlockPath).values.zipWithIndex.foreach { splitWithIndex => + val multiBlockSplit = + new CarbonMultiBlockSplit(identifier, + splitWithIndex._1.asJava, + splitWithIndex._1.flatMap(f => f.getLocations).distinct.toArray) + val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit) + result.add(partition) } } @@ -232,6 +277,32 @@ class CarbonScanRDD( result.asScala } + def combineSplits( + splits: ArrayBuffer[CarbonMultiBlockSplit], + size: Long, + partitionId: Int + ): CarbonSparkPartition = { + val carbonInputSplits = splits.flatMap(_.getAllSplits.asScala) + + // Computes total number of bytes can be retrieved from each host. + val hostToNumBytes = mutable.HashMap.empty[String, Long] + splits.foreach { split => + split.getLocations.filter(_ != "localhost").foreach { host => + hostToNumBytes(host) = hostToNumBytes.getOrElse(host, 0L) + split.getLength + } + } + // Takes the first 3 hosts with the most data to be retrieved + val locations = hostToNumBytes + .toSeq + .sortBy(_._2)(implicitly[Ordering[Long]].reverse) + .take(3) + .map(_._1) + .toArray + + val multiBlockSplit = new CarbonMultiBlockSplit(null, carbonInputSplits.asJava, locations) + new CarbonSparkPartition(id, partitionId, multiBlockSplit) + } + override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val queryStartTime = System.currentTimeMillis val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null) http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common/src/main/scala/org/apache/spark/sql/util/CarbonException.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/CarbonException.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/CarbonException.scala index 9fd7099..7dabddb 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/CarbonException.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/CarbonException.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.util - import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.AnalysisException - object CarbonException { - def analysisException(message: String): Nothing = throw new AnalysisException(message) - } +object CarbonException { + def analysisException(message: String): Nothing = throw new AnalysisException(message) +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala new file mode 100644 index 0000000..370f80c --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.util + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SessionState + +object SparkSQLUtil { + def sessionState(sparkSession: SparkSession): SessionState = sparkSession.sessionState +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 0b786b5..72c979a 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -342,7 +342,7 @@ object CarbonDataRDDFactory { status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) { loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel, hadoopConf) } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { - DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkContext, + DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession, dataFrame, carbonLoadModel, hadoopConf) } else if (dataFrame.isDefined) { loadDataFrame(sqlContext, dataFrame, carbonLoadModel) http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index 148fca8..ca0c51d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -76,7 +76,7 @@ case class CarbonDatasourceHadoopRelation( requiredColumns.foreach(projection.addColumn) val inputMetricsStats: CarbonInputMetrics = new CarbonInputMetrics new CarbonScanRDD( - sparkSession.sparkContext, + sparkSession, projection, filterExpression.orNull, identifier,
