Repository: carbondata Updated Branches: refs/heads/master 937868d1b -> b4dc866fe
[CARBONDATA-2036] Fix the insert static partition with integer values prefix with 0 not working When trying to insert overwrite on the static partition with 0 at first on int column has an issue.Example :create table test(d1 string) partition by (c1 int, c2 int, c3 int)And use insert overwrite table partition(01, 02, 03) select s1The above case has a problem as 01 is not converting to an actual integer to partition map file.Solution :Convert the partition values to corresponding datatype value before adding to partition file. This closes #1833 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b4dc866f Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b4dc866f Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b4dc866f Branch: refs/heads/master Commit: b4dc866fec0a42196435c6da0e1413dc2a2398d1 Parents: 937868d Author: ravipesala <[email protected]> Authored: Thu Jan 18 18:51:50 2018 +0530 Committer: kumarvishal <[email protected]> Committed: Sat Jan 20 00:14:24 2018 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 38 +++- .../core/indexstore/UnsafeMemoryDMStore.java | 22 ++- .../carbondata/core/util/CarbonProperties.java | 52 ++--- .../core/CarbonPropertiesValidationTest.java | 2 +- ...tandardPartitionTableOverwriteTestCase.scala | 20 ++ .../carbondata/spark/rdd/CarbonScanRDD.scala | 189 ++++++++++--------- .../carbondata/spark/util/CarbonScalaUtil.scala | 10 +- .../management/CarbonLoadDataCommand.scala | 29 ++- .../CarbonProjectForUpdateCommand.scala | 2 +- .../datasources/CarbonFileFormat.scala | 61 ++---- .../apache/spark/sql/hive/CarbonMetaStore.scala | 4 +- .../src/main/spark2.1/CarbonSQLConf.scala | 12 +- .../src/main/spark2.2/CarbonSqlConf.scala | 12 +- 13 files changed, 246 insertions(+), 207 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index cd7abe0..13c8a42 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1299,11 +1299,6 @@ public final class CarbonCommonConstants { @CarbonProperty public static final String CARBON_CUSTOM_BLOCK_DISTRIBUTION = "carbon.custom.block.distribution"; - public static final String CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT = "false"; - - @CarbonProperty - public static final String CARBON_COMBINE_SMALL_INPUT_FILES = "carbon.mergeSmallFileRead.enable"; - public static final String CARBON_COMBINE_SMALL_INPUT_FILES_DEFAULT = "false"; public static final int DICTIONARY_DEFAULT_CARDINALITY = 1; @CarbonProperty @@ -1404,9 +1399,38 @@ public final class CarbonCommonConstants { public static final String USE_DISTRIBUTED_DATAMAP_DEFAULT = "false"; - public static final String CARBON_USE_BLOCKLET_DISTRIBUTION = "carbon.blocklet.distribution"; + /** + * This property defines how the tasks are splitted/combined and launch spark tasks during query + */ + @CarbonProperty + public static final String CARBON_TASK_DISTRIBUTION = "carbon.task.distribution"; + + /** + * It combines the available blocks as per the maximum available tasks in the cluster. + */ + public static final String CARBON_TASK_DISTRIBUTION_CUSTOM = "custom"; + + /** + * It creates the splits as per the number of blocks/carbondata files available for query. + */ + public static final String CARBON_TASK_DISTRIBUTION_BLOCK = "block"; + + /** + * It creates the splits as per the number of blocklets available for query. + */ + public static final String CARBON_TASK_DISTRIBUTION_BLOCKLET = "blocklet"; + + /** + * It merges all the small files and create tasks as per the configurable partition size. + */ + public static final String CARBON_TASK_DISTRIBUTION_MERGE_FILES = "merge_small_files"; + + /** + * Default task distribution. + */ + public static final String CARBON_TASK_DISTRIBUTION_DEFAULT = CARBON_TASK_DISTRIBUTION_BLOCK; + - public static final String CARBON_USE_BLOCKLET_DISTRIBUTION_DEFAULT = "true"; /** * The property to configure the mdt file folder path, earlier it was pointing to the * fixed carbon store path. This is needed in case of the federation setup when user removes http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java index dc630ff..31ecac2 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java @@ -36,7 +36,7 @@ public class UnsafeMemoryDMStore { private MemoryBlock memoryBlock; - private static int capacity = 8 * 1024 * 1024; + private static int capacity = 8 * 1024; private int allocatedSize; @@ -66,14 +66,8 @@ public class UnsafeMemoryDMStore { * @param rowSize */ private void ensureSize(int rowSize) throws MemoryException { - if (runningLength + rowSize >= allocatedSize) { - MemoryBlock allocate = - UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize + capacity); - getUnsafe().copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(), - allocate.getBaseObject(), allocate.getBaseOffset(), runningLength); - UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock); - allocatedSize = allocatedSize + capacity; - memoryBlock = allocate; + while (runningLength + rowSize >= allocatedSize) { + increaseMemory(); } if (this.pointers.length <= rowCount + 1) { int[] newPointer = new int[pointers.length + 1000]; @@ -82,6 +76,16 @@ public class UnsafeMemoryDMStore { } } + private void increaseMemory() throws MemoryException { + MemoryBlock allocate = + UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize + capacity); + getUnsafe().copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(), + allocate.getBaseObject(), allocate.getBaseOffset(), runningLength); + UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock); + allocatedSize = allocatedSize + capacity; + memoryBlock = allocate; + } + /** * Add the index row to unsafe. * http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index a918611..fd78efc 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -35,25 +35,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; -import static org.apache.carbondata.core.constants.CarbonCommonConstants.BLOCKLET_SIZE; -import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES; -import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION; -import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_DATA_FILE_VERSION; -import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_DATE_FORMAT; -import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_EXECUTOR_STARTUP_TIMEOUT; -import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE; -import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE; -import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT; -import static org.apache.carbondata.core.constants.CarbonCommonConstants.CSV_READ_BUFFER_SIZE; -import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_AUTO_HANDOFF; -import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_SORT; -import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_VECTOR_READER; -import static org.apache.carbondata.core.constants.CarbonCommonConstants.HANDOFF_SIZE; -import static org.apache.carbondata.core.constants.CarbonCommonConstants.LOCK_TYPE; -import static org.apache.carbondata.core.constants.CarbonCommonConstants.NUM_CORES; -import static org.apache.carbondata.core.constants.CarbonCommonConstants.NUM_CORES_BLOCK_SORT; -import static org.apache.carbondata.core.constants.CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT; -import static org.apache.carbondata.core.constants.CarbonCommonConstants.SORT_SIZE; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.*; import static org.apache.carbondata.core.constants.CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB; import static org.apache.carbondata.core.constants.CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO; @@ -151,8 +133,8 @@ public final class CarbonProperties { case HANDOFF_SIZE: validateHandoffSize(); break; - case CARBON_COMBINE_SMALL_INPUT_FILES: - validateCombineSmallInputFiles(); + case CARBON_TASK_DISTRIBUTION: + validateCarbonTaskDistribution(); break; // The method validate the validity of configured carbon.timestamp.format value // and reset to default value if validation fail @@ -199,7 +181,7 @@ public final class CarbonProperties { validateLockType(); validateCarbonCSVReadBufferSizeByte(); validateHandoffSize(); - validateCombineSmallInputFiles(); + validateCarbonTaskDistribution(); // The method validate the validity of configured carbon.timestamp.format value // and reset to default value if validation fail validateTimeFormatKey(CARBON_TIMESTAMP_FORMAT, @@ -361,22 +343,24 @@ public final class CarbonProperties { if (!isValidBooleanValue) { LOGGER.warn("The custom block distribution value \"" + customBlockDistributionStr + "\" is invalid. Using the default value \"" - + CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT); - carbonProperties.setProperty(CARBON_CUSTOM_BLOCK_DISTRIBUTION, - CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT); + + false); + carbonProperties.setProperty(CARBON_CUSTOM_BLOCK_DISTRIBUTION, "false"); } } - private void validateCombineSmallInputFiles() { - String combineSmallInputFilesStr = - carbonProperties.getProperty(CARBON_COMBINE_SMALL_INPUT_FILES); - boolean isValidBooleanValue = CarbonUtil.validateBoolean(combineSmallInputFilesStr); - if (!isValidBooleanValue) { - LOGGER.warn("The combine small files value \"" + combineSmallInputFilesStr + private void validateCarbonTaskDistribution() { + String carbonTaskDistribution = carbonProperties.getProperty(CARBON_TASK_DISTRIBUTION); + boolean isValid = carbonTaskDistribution != null && ( + carbonTaskDistribution.equalsIgnoreCase(CARBON_TASK_DISTRIBUTION_MERGE_FILES) + || carbonTaskDistribution.equalsIgnoreCase(CARBON_TASK_DISTRIBUTION_BLOCKLET) + || carbonTaskDistribution.equalsIgnoreCase(CARBON_TASK_DISTRIBUTION_BLOCK) + || carbonTaskDistribution.equalsIgnoreCase(CARBON_TASK_DISTRIBUTION_CUSTOM)); + if (!isValid) { + LOGGER.warn("The carbon task distribution value \"" + carbonTaskDistribution + "\" is invalid. Using the default value \"" - + CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES_DEFAULT); - carbonProperties.setProperty(CARBON_COMBINE_SMALL_INPUT_FILES, - CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES_DEFAULT); + + CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT); + carbonProperties.setProperty(CARBON_TASK_DISTRIBUTION, + CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java b/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java index ae29b03..daf6db0 100644 --- a/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java +++ b/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java @@ -77,7 +77,7 @@ public class CarbonPropertiesValidationTest extends TestCase { String valueAfterValidation = carbonProperties.getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION); assertTrue(valueBeforeValidation.equals(valueAfterValidation)); - assertTrue(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT + assertTrue("false" .equalsIgnoreCase(valueAfterValidation)); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala index 15126b6..4104ea3 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala @@ -155,6 +155,25 @@ class StandardPartitionTableOverwriteTestCase extends QueryTest with BeforeAndAf checkAnswer(sql("select count(*) from weather6"), Seq(Row(2))) } + test("Test overwrite static partition with wrong int value") { + sql( + """ + | CREATE TABLE weather7 (type String) + | PARTITIONED BY (year int, month int, day int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + sql("insert into weather7 partition(year=2014, month=05, day=25) select 'rainy'") + sql("insert into weather7 partition(year=2014, month=04, day=23) select 'cloudy'") + sql("insert overwrite table weather7 partition(year=2014, month=05, day=25) select 'sunny'") + checkExistence(sql("select * from weather7"), true, "sunny") + checkAnswer(sql("select count(*) from weather7"), Seq(Row(2))) + sql("insert into weather7 partition(year=2014, month, day) select 'rainy1',06,25") + sql("insert into weather7 partition(year=2014, month=01, day) select 'rainy2',27") + sql("insert into weather7 partition(year=2014, month=01, day=02) select 'rainy3'") + checkAnswer(sql("select count(*) from weather7 where month=1"), Seq(Row(2))) + } + override def afterAll = { dropTable @@ -168,6 +187,7 @@ class StandardPartitionTableOverwriteTestCase extends QueryTest with BeforeAndAf sql("drop table if exists insertstaticpartitiondynamic") sql("drop table if exists partitionallcompaction") sql("drop table if exists weather6") + sql("drop table if exists weather7") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index cc68b9c..a04e9e1 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -145,7 +145,9 @@ class CarbonScanRDD( statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis) statisticRecorder.recordStatisticsForDriver(statistic, queryId) statistic = new QueryStatistic() - + val carbonDistribution = CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_TASK_DISTRIBUTION, + CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT) // If bucketing is enabled on table then partitions should be grouped based on buckets. if (bucketedTable != null) { var i = 0 @@ -161,101 +163,106 @@ class CarbonScanRDD( i += 1 result.add(partition) } - } else if (CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, - CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean) { - // create a list of block based on split - val blockList = splits.asScala.map(_.asInstanceOf[Distributable]) - - // get the list of executors and map blocks to executors based on locality - val activeNodes = DistributionUtil.ensureExecutorsAndGetNodeList(blockList, sparkContext) - - // divide the blocks among the tasks of the nodes as per the data locality - val nodeBlockMapping = CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, - parallelism, activeNodes.toList.asJava) - var i = 0 - // Create Spark Partition for each task and assign blocks - nodeBlockMapping.asScala.foreach { case (node, blockList) => - blockList.asScala.foreach { blocksPerTask => - val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit]) - if (blocksPerTask.size() != 0) { - val multiBlockSplit = - new CarbonMultiBlockSplit(identifier, splits.asJava, Array(node)) - val partition = new CarbonSparkPartition(id, i, multiBlockSplit) - result.add(partition) - i += 1 + } else { + val useCustomDistribution = + CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, + "false").toBoolean || + carbonDistribution.equalsIgnoreCase(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_CUSTOM) + if (useCustomDistribution) { + // create a list of block based on split + val blockList = splits.asScala.map(_.asInstanceOf[Distributable]) + + // get the list of executors and map blocks to executors based on locality + val activeNodes = DistributionUtil.ensureExecutorsAndGetNodeList(blockList, sparkContext) + + // divide the blocks among the tasks of the nodes as per the data locality + val nodeBlockMapping = CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, + parallelism, activeNodes.toList.asJava) + var i = 0 + // Create Spark Partition for each task and assign blocks + nodeBlockMapping.asScala.foreach { case (node, blockList) => + blockList.asScala.foreach { blocksPerTask => + val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit]) + if (blocksPerTask.size() != 0) { + val multiBlockSplit = + new CarbonMultiBlockSplit(identifier, splits.asJava, Array(node)) + val partition = new CarbonSparkPartition(id, i, multiBlockSplit) + result.add(partition) + i += 1 + } } } - } - noOfNodes = nodeBlockMapping.size - } else if (CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_USE_BLOCKLET_DISTRIBUTION, - CarbonCommonConstants.CARBON_USE_BLOCKLET_DISTRIBUTION_DEFAULT).toBoolean) { - // Use blocklet distribution - // Randomize the blocklets for better shuffling - Random.shuffle(splits.asScala).zipWithIndex.foreach { splitWithIndex => - val multiBlockSplit = - new CarbonMultiBlockSplit(identifier, - Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava, - splitWithIndex._1.getLocations) - val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit) - result.add(partition) - } - } else if (CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES, - CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES_DEFAULT).toBoolean) { - - // sort blocks in reverse order of length - val blockSplits = splits - .asScala - .map(_.asInstanceOf[CarbonInputSplit]) - .groupBy(f => f.getBlockPath) - .map { blockSplitEntry => - new CarbonMultiBlockSplit(identifier, - blockSplitEntry._2.asJava, - blockSplitEntry._2.flatMap(f => f.getLocations).distinct.toArray) - }.toArray.sortBy(_.getLength)(implicitly[Ordering[Long]].reverse) - - val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes - val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes - val defaultParallelism = spark.sparkContext.defaultParallelism - val totalBytes = blockSplits.map(_.getLength + openCostInBytes).sum - val bytesPerCore = totalBytes / defaultParallelism - - val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) - LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + - s"open cost is considered as scanning $openCostInBytes bytes.") - - val currentFiles = new ArrayBuffer[CarbonMultiBlockSplit] - var currentSize = 0L - - def closePartition(): Unit = { - if (currentFiles.nonEmpty) { - result.add(combineSplits(currentFiles, currentSize, result.size())) + noOfNodes = nodeBlockMapping.size + } else if (carbonDistribution.equalsIgnoreCase( + CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_BLOCKLET)) { + // Use blocklet distribution + // Randomize the blocklets for better shuffling + Random.shuffle(splits.asScala).zipWithIndex.foreach { splitWithIndex => + val multiBlockSplit = + new CarbonMultiBlockSplit(identifier, + Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava, + splitWithIndex._1.getLocations) + val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit) + result.add(partition) + } + } else if (carbonDistribution.equalsIgnoreCase( + CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES)) { + + // sort blocks in reverse order of length + val blockSplits = splits + .asScala + .map(_.asInstanceOf[CarbonInputSplit]) + .groupBy(f => f.getBlockPath) + .map { blockSplitEntry => + new CarbonMultiBlockSplit(identifier, + blockSplitEntry._2.asJava, + blockSplitEntry._2.flatMap(f => f.getLocations).distinct.toArray) + }.toArray.sortBy(_.getLength)(implicitly[Ordering[Long]].reverse) + + val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes + val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes + val defaultParallelism = spark.sparkContext.defaultParallelism + val totalBytes = blockSplits.map(_.getLength + openCostInBytes).sum + val bytesPerCore = totalBytes / defaultParallelism + + val maxSplitBytes = Math + .min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + + s"open cost is considered as scanning $openCostInBytes bytes.") + + val currentFiles = new ArrayBuffer[CarbonMultiBlockSplit] + var currentSize = 0L + + def closePartition(): Unit = { + if (currentFiles.nonEmpty) { + result.add(combineSplits(currentFiles, currentSize, result.size())) + } + currentFiles.clear() + currentSize = 0 } - currentFiles.clear() - currentSize = 0 - } - blockSplits.foreach { file => - if (currentSize + file.getLength > maxSplitBytes) { - closePartition() + blockSplits.foreach { file => + if (currentSize + file.getLength > maxSplitBytes) { + closePartition() + } + // Add the given file to the current partition. + currentSize += file.getLength + openCostInBytes + currentFiles += file + } + closePartition() + } else { + // Use block distribution + splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).groupBy { f => + f.getSegmentId.concat(f.getBlockPath) + }.values.zipWithIndex.foreach { splitWithIndex => + val multiBlockSplit = + new CarbonMultiBlockSplit(identifier, + splitWithIndex._1.asJava, + splitWithIndex._1.flatMap(f => f.getLocations).distinct.toArray) + val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit) + result.add(partition) } - // Add the given file to the current partition. - currentSize += file.getLength + openCostInBytes - currentFiles += file - } - closePartition() - } else { - // Use block distribution - splits.asScala.map(_.asInstanceOf[CarbonInputSplit]) - .groupBy(f => f.getBlockPath).values.zipWithIndex.foreach { splitWithIndex => - val multiBlockSplit = - new CarbonMultiBlockSplit(identifier, - splitWithIndex._1.asJava, - splitWithIndex._1.flatMap(f => f.getLocations).distinct.toArray) - val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit) - result.add(partition) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala index 6c8a6b0..86d25b4 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala @@ -177,9 +177,9 @@ object CarbonScalaUtil { return null } dataType match { - case TimestampType => + case TimestampType if timeStampFormat != null => DateTimeUtils.timestampToString(timeStampFormat.parse(value).getTime * 1000) - case DateType => + case DateType if dateFormat != null => DateTimeUtils.dateToString( (dateFormat.parse(value).getTime / DateTimeUtils.MILLIS_PER_DAY).toInt) case ShortType => value.toShort.toString @@ -233,16 +233,18 @@ object CarbonScalaUtil { serializationNullFormat: String, badRecordAction: String, isEmptyBadRecord: Boolean): Map[String, String] = { + val hivedefaultpartition = "__HIVE_DEFAULT_PARTITION__" partitionSpec.map{ case (col, pvalue) => // replace special string with empty value. - val value = if (pvalue.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) { + val value = if (pvalue == null) { + hivedefaultpartition + } else if (pvalue.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) { "" } else { pvalue } val carbonColumn = table.getColumnByName(table.getTableName, col.toLowerCase) val dataType = CarbonScalaUtil.convertCarbonToSparkDataType(carbonColumn.getDataType) - val hivedefaultpartition = "__HIVE_DEFAULT_PARTITION__" try { if (isEmptyBadRecord && value.length == 0 && badRecordAction.equalsIgnoreCase(LoggerAction.IGNORE.toString) && http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 9577615..6b43152 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -521,9 +521,10 @@ case class CarbonLoadDataCommand( CarbonSession.threadSet( CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, badRecordAction) + val isEmptyBadRecord = carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1) CarbonSession.threadSet( CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, - carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1)) + isEmptyBadRecord) try { val query: LogicalPlan = if (dataFrame.isDefined) { val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 @@ -632,7 +633,13 @@ case class CarbonLoadDataCommand( overwrite = false, ifPartitionNotExists = false) if (isOverwriteTable && partition.nonEmpty) { - overwritePartition(sparkSession, table, convertedPlan) + overwritePartition( + sparkSession, + table, + convertedPlan, + serializationNullFormat, + badRecordAction, + isEmptyBadRecord.toBoolean) } else { Dataset.ofRows(sparkSession, convertedPlan) } @@ -754,11 +761,25 @@ case class CarbonLoadDataCommand( private def overwritePartition( sparkSession: SparkSession, table: CarbonTable, - logicalPlan: LogicalPlan): Unit = { + logicalPlan: LogicalPlan, + serializationNullFormat: String, + badRecordAction: String, + isEmptyBadRecord: Boolean): Unit = { val identifier = TableIdentifier(table.getTableName, Some(table.getDatabaseName)) + + // Update the partitions as per the datatype expect for time and datetype as we + // expect user provides the format in standard spark/hive formats. + val updatedPartitions = CarbonScalaUtil.updatePartitions( + partition.filter(_._2.isDefined).map(f => (f._1, f._2.get)), + table, + timeFormat = null, + dateFormat = null, + serializationNullFormat, + badRecordAction, + isEmptyBadRecord) val existingPartitions = sparkSession.sessionState.catalog.listPartitions( identifier, - Some(partition.filter(_._2.isDefined).map(f => (f._1, f._2.get)))) + Some(updatedPartitions)) val partitionNames = existingPartitions.toList.flatMap { partition => partition.spec.seq.map{case (column, value) => column + "=" + value} }.toSet http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala index 20a6bab..2f12bef 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala @@ -143,7 +143,7 @@ private[sql] case class CarbonProjectForUpdateCommand( CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString) case e: Exception => - LOGGER.error("Exception in update operation" + e) + LOGGER.error(e, "Exception in update operation") // ****** start clean up. // In case of failure , clean all related delete delta files CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, currentTime + "") http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala index c43a204..d74e461 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala @@ -245,7 +245,7 @@ private class CarbonOutputWriter(path: String, null } } - lazy val partitionData = if (partitions.nonEmpty) { + lazy val (updatedPartitions, partitionData) = if (partitions.nonEmpty) { val updatedPartitions = partitions.map{ p => val value = p.substring(p.indexOf("=") + 1, p.length) val col = p.substring(0, p.indexOf("=")) @@ -273,24 +273,25 @@ private class CarbonOutputWriter(path: String, dateFormatString = loadModel.getDefaultDateFormat } val dateFormat = new SimpleDateFormat(dateFormatString) - updatedPartitions.map {case (col, value) => + val formattedPartitions = updatedPartitions.map {case (col, value) => // Only convert the static partitions to the carbon format and use it while loading data // to carbon. if (staticPartition.getOrDefault(col, false)) { - CarbonScalaUtil.convertToCarbonFormat(value, + (col, CarbonScalaUtil.convertToCarbonFormat(value, CarbonScalaUtil.convertCarbonToSparkDataType( table.getColumnByName(table.getTableName, col).getDataType), timeFormat, - dateFormat) + dateFormat)) } else { - value + (col, value) } } + (formattedPartitions, formattedPartitions.map(_._2)) } else { - updatedPartitions.map(_._2) + (updatedPartitions, updatedPartitions.map(_._2)) } } else { - Array.empty + (Map.empty[String, String].toArray, Array.empty) } val writable = new StringArrayWritable() @@ -346,41 +347,17 @@ private class CarbonOutputWriter(path: String, val isEmptyBadRecord = loadModel.getIsEmptyDataBadRecord.split(",")(1).toBoolean // write partition info to new file. val partitonList = new util.ArrayList[String]() - val splitPartitions = partitions.map{ p => - val value = p.substring(p.indexOf("=") + 1, p.length) - val col = p.substring(0, p.indexOf("=")) - (col, value) - }.toMap - val updatedPartitions = - if (staticPartition != null) { - // There can be scnerio like dynamic and static combination, in that case we should convert - // only the dyanamic partition values to the proper format and store to carbon parttion map - splitPartitions.map { case (col, value) => - if (!staticPartition.getOrDefault(col, false)) { - CarbonScalaUtil.updatePartitions( - Seq((col, value)).toMap, - table, - timeFormat, - dateFormat, - serializeFormat, - badRecordAction, - isEmptyBadRecord).toSeq.head - } else { - (col, value) - } - } - } else { - // All dynamic partitions need to be converted to proper format - CarbonScalaUtil.updatePartitions( - splitPartitions, - table, - timeFormat, - dateFormat, - serializeFormat, - badRecordAction, - isEmptyBadRecord) - } - updatedPartitions.foreach(p => partitonList.add(p._1 + "=" + p._2)) + val formattedPartitions = + // All dynamic partitions need to be converted to proper format + CarbonScalaUtil.updatePartitions( + updatedPartitions.toMap, + table, + timeFormat, + dateFormat, + serializeFormat, + badRecordAction, + isEmptyBadRecord) + formattedPartitions.foreach(p => partitonList.add(p._1 + "=" + p._2)) new PartitionMapFileStore().writePartitionMapFile( segmentPath, loadModel.getTaskNo, http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala index eb59184..93c7c09 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala @@ -182,10 +182,10 @@ object CarbonMetaStoreFactory { def createCarbonMetaStore(conf: RuntimeConfig): CarbonMetaStore = { val readSchemaFromHiveMetaStore = readSchemaFromHive(conf) if (readSchemaFromHiveMetaStore) { - LOGGER.info("Hive based carbon metastore is enabled") + LOGGER.audit("Hive based carbon metastore is enabled") new CarbonHiveMetaStore() } else { - LOGGER.info("File based carbon metastore is enabled") + LOGGER.audit("File based carbon metastore is enabled") new CarbonFileMetastore() } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala b/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala index 837b21f..15ccb0c 100644 --- a/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala +++ b/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala @@ -42,11 +42,11 @@ class CarbonSQLConf(sparkSession: SparkSession) { CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean) val CARBON_CUSTOM_BLOCK_DISTRIBUTION = SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION) - .doc("To enable/ disable carbon custom block distribution.") - .booleanConf + .doc("To set carbon task distribution.") + .stringConf .createWithDefault(carbonProperties - .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, - CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean) + .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION, + CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)) val BAD_RECORDS_LOGGER_ENABLE = SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE) .doc("To enable/ disable carbon bad record logger.") @@ -117,8 +117,8 @@ class CarbonSQLConf(sparkSession: SparkSession) { CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean) sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, carbonProperties - .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, - CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean) + .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION, + CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)) sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE, CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean) sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/integration/spark2/src/main/spark2.2/CarbonSqlConf.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/CarbonSqlConf.scala b/integration/spark2/src/main/spark2.2/CarbonSqlConf.scala index eef6604..2128ffd 100644 --- a/integration/spark2/src/main/spark2.2/CarbonSqlConf.scala +++ b/integration/spark2/src/main/spark2.2/CarbonSqlConf.scala @@ -41,11 +41,11 @@ class CarbonSQLConf(sparkSession: SparkSession) { CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean) val CARBON_CUSTOM_BLOCK_DISTRIBUTION = buildConf(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION) - .doc("To enable/ disable carbon custom block distribution.") - .booleanConf + .doc("To set carbon task distribution.") + .stringConf .createWithDefault(carbonProperties - .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, - CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean) + .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION, + CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)) val BAD_RECORDS_LOGGER_ENABLE = buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE) .doc("To enable/ disable carbon bad record logger.") @@ -116,8 +116,8 @@ class CarbonSQLConf(sparkSession: SparkSession) { CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean) sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, carbonProperties - .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, - CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean) + .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION, + CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)) sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE, CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean) sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
