Repository: carbondata Updated Branches: refs/heads/master e8df8ba51 -> 90ce0d459
Enhance update performance by increasing parallelism + Increase parallelism while processing one segment in update + Use partitionBy instead of groupby + Return directly for no-rows-update case + Add a property to configure the parallelism + Clean up local files after update (previous bugs) + Remove useless imports fix code style (cherry picked from commit 49d44b156a77d005d21123c886dc4332bf1f03cf) Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/90ce0d45 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/90ce0d45 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/90ce0d45 Branch: refs/heads/master Commit: 90ce0d459abc76573e24ba57ec1a58c85e77abdd Parents: e8df8ba Author: xuchuanyin <[email protected]> Authored: Fri Aug 11 23:00:20 2017 +0800 Committer: Jacky Li <[email protected]> Committed: Fri Aug 18 14:56:09 2017 +0800 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 18 +++++ .../carbondata/core/util/CarbonProperties.java | 32 ++++++++ docs/configuration-parameters.md | 2 +- .../iud/UpdateCarbonTableTestCase.scala | 24 ++++++ .../carbondata/spark/rdd/UpdateDataLoad.scala | 3 + .../spark/rdd/CarbonDataRDDFactory.scala | 77 ++++++++++++------- .../spark/rdd/CarbonDataRDDFactory.scala | 79 +++++++++++++------- 7 files changed, 177 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/90ce0d45/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index de8cdb1..8939a7e 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -544,6 +544,10 @@ public final class CarbonCommonConstants { */ public static final String UNDERSCORE = "_"; /** + * DASH + */ + public static final String DASH = "-"; + /** * POINT */ public static final String POINT = "."; @@ -1330,6 +1334,20 @@ public final class CarbonCommonConstants { */ public static final String CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT = "MEMORY_ONLY"; + /** + * property for configuring parallelism per segment when doing an update. Increase this + * value will avoid data screw problem for a large segment. + * Refer to CARBONDATA-1373 for more details. + */ + @CarbonProperty + public static final String CARBON_UPDATE_SEGMENT_PARALLELISM = + "carbon.update.segment.parallelism"; + + /** + * In default we will not optimize the update + */ + public static final String CARBON_UPDATE_SEGMENT_PARALLELISM_DEFAULT = "1"; + private CarbonCommonConstants() { } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/90ce0d45/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index 2755990..669d3f2 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -895,6 +895,38 @@ public final class CarbonProperties { } /** + * Returns parallelism for segment update + * @return int + */ + public int getParallelismForSegmentUpdate() { + int parallelism = Integer.parseInt( + CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM_DEFAULT); + boolean isInvalidValue = false; + try { + String strParallelism = getProperty(CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM, + CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM_DEFAULT); + parallelism = Integer.parseInt(strParallelism); + if (parallelism <= 0 || parallelism > 1000) { + isInvalidValue = true; + } + } catch (NumberFormatException e) { + isInvalidValue = true; + } + + if (isInvalidValue) { + LOGGER.error("The specified value for property " + + CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM + + " is incorrect. Correct value should be in range of 0 - 1000." + + " Taking the default value: " + + CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM_DEFAULT); + parallelism = Integer.parseInt( + CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM_DEFAULT); + } + + return parallelism; + } + + /** * returns true if carbon property * @param key * @return http://git-wip-us.apache.org/repos/asf/carbondata/blob/90ce0d45/docs/configuration-parameters.md ---------------------------------------------------------------------- diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md index 0223688..bdd551a 100644 --- a/docs/configuration-parameters.md +++ b/docs/configuration-parameters.md @@ -74,7 +74,7 @@ This section provides the details of all the configurations required for CarbonD | carbon.horizontal.compaction.enable | true | This property is used to turn ON/OFF horizontal compaction. After every DELETE and UPDATE statement, horizontal compaction may occur in case the delta (DELETE/ UPDATE) files becomes more than specified threshold. | | | carbon.horizontal.UPDATE.compaction.threshold | 1 | This property specifies the threshold limit on number of UPDATE delta files within a segment. In case the number of delta files goes beyond the threshold, the UPDATE delta files within the segment becomes eligible for horizontal compaction and compacted into single UPDATE delta file. | Values between 1 to 10000. | | carbon.horizontal.DELETE.compaction.threshold | 1 | This property specifies the threshold limit on number of DELETE delta files within a block of a segment. In case the number of delta files goes beyond the threshold, the DELETE delta files for the particular block of the segment becomes eligible for horizontal compaction and compacted into single DELETE delta file. | Values between 1 to 10000. | - +| carbon.update.segment.parallelism | 1 | This property specifies the parallelism for each segment during update. If there are segments that contain too many records to update and the spark job encounter data-spill related errors, it is better to increase this property value. It is recommended to set this value to a multiple of the number of executors for balance. | Values between 1 to 1000. | * **Query Configuration** http://git-wip-us.apache.org/repos/asf/carbondata/blob/90ce0d45/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala index 364cb81..ff0aadf 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala @@ -114,6 +114,30 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { sql("""drop table if exists iud.dest33""") } + test("update carbon table with optimized parallelism for segment") { + sql("""drop table if exists iud.dest_opt_segment_parallelism""") + sql( + """create table iud.dest_opt_segment_parallelism (c1 string,c2 int,c3 string,c5 string) + | STORED BY 'org.apache.carbondata.format'""".stripMargin) + sql( + s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' + | INTO table iud.dest_opt_segment_parallelism""".stripMargin) + sql( + s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' + | INTO table iud.dest_opt_segment_parallelism""".stripMargin) + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM, "3") + sql( + """update iud.dest_opt_segment_parallelism d + | set (c3,c5 ) = (select s.c33 ,s.c55 from iud.source2 s where d.c1 = s.c11) + | where d.c1 = 'a'""".stripMargin).show() + checkAnswer( + sql("""select c3,c5 from iud.dest_opt_segment_parallelism where c1='a'"""), + Seq(Row("MGM","Disco"),Row("MGM","Disco")) + ) + sql("""drop table if exists iud.dest_opt_segment_parallelism""") + } + test("update carbon table without alias in set three columns") { sql("""drop table if exists iud.dest44""") sql("""create table iud.dest44 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") http://git-wip-us.apache.org/repos/asf/carbondata/blob/90ce0d45/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala index 4cf2135..f45dc83 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala @@ -28,6 +28,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.statusmanager.LoadMetadataDetails import org.apache.carbondata.processing.model.CarbonLoadModel import org.apache.carbondata.processing.newflow.DataLoadExecutor +import org.apache.carbondata.spark.load.CarbonLoaderUtil /** * Data load in case of update command . @@ -62,6 +63,8 @@ object UpdateDataLoad { case e: Exception => LOGGER.error(e) throw e + } finally { + CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(carbonLoadModel, false, false) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/90ce0d45/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 7c2bf22..669f942 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -19,7 +19,6 @@ package org.apache.carbondata.spark.rdd import java.text.SimpleDateFormat import java.util -import java.util.UUID import java.util.concurrent._ import scala.collection.JavaConverters._ @@ -32,8 +31,8 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.io.NullWritable import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} -import org.apache.spark.{SparkEnv, SparkException} -import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD, UpdateCoalescedRDD} +import org.apache.spark.{SparkEnv, SparkException, TaskContext} +import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD} import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext} import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, ExecutionErrors, UpdateTableModel} import org.apache.spark.sql.hive.DistributionUtil @@ -51,14 +50,14 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.scan.partition.PartitionUtil -import org.apache.carbondata.core.statusmanager.LoadMetadataDetails +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties} import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.processing.csvload.{BlockDetails, CSVInputFormat, StringArrayWritable} import org.apache.carbondata.processing.etl.DataLoadingException import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType} import org.apache.carbondata.processing.model.CarbonLoadModel -import org.apache.carbondata.processing.newflow.exception.{BadRecordFoundException, CarbonDataLoadingException} +import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException import org.apache.carbondata.processing.newflow.sort.SortScopeOptions @@ -583,7 +582,9 @@ object CarbonDataRDDFactory { } def loadDataFrameForUpdate(): Unit = { - def triggerDataLoadForSegment(key: String, + val segmentUpdateParallelism = CarbonProperties.getInstance().getParallelismForSegmentUpdate + + def triggerDataLoadForSegment(key: String, taskNo: Int, iter: Iterator[Row]): Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] = { val rddResult = new updateResultImpl() val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) @@ -594,11 +595,7 @@ object CarbonDataRDDFactory { var uniqueLoadStatusId = "" try { val segId = key - val taskNo = CarbonUpdateUtil - .getLatestTaskIdForSegment(segId, - CarbonStorePath.getCarbonTablePath(carbonLoadModel.getStorePath, - carbonTable.getCarbonTableIdentifier)) - val index = taskNo + 1 + val index = taskNo uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + (index + "_0") @@ -621,8 +618,6 @@ object CarbonDataRDDFactory { // storeLocation = CarbonDataLoadRDD.initialize(carbonLoadModel, index) loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS) - val rddIteratorKey = CarbonCommonConstants.RDDUTIL_UPDATE_KEY + - UUID.randomUUID().toString UpdateDataLoad.DataLoadForUpdate(segId, index, iter, @@ -657,26 +652,52 @@ object CarbonDataRDDFactory { val updateRdd = dataFrame.get.rdd + // return directly if no rows to update + val noRowsToUpdate = updateRdd.isEmpty() + if (noRowsToUpdate) { + res = Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]]() + return + } + // splitting as (key, value) i.e., (segment, updatedRows) val keyRDD = updateRdd.map(row => - // splitting as (key, value) i.e., (segment, updatedRows) - (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*)) - ) - val groupBySegmentRdd = keyRDD.groupByKey() + (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*))) + + val loadMetadataDetails = SegmentStatusManager.readLoadMetadata( + carbonTable.getMetaDataFilepath) + val segmentIds = loadMetadataDetails.map(_.getLoadName) + val segmentIdIndex = segmentIds.zipWithIndex.toMap + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonLoadModel.getStorePath, + carbonTable.getCarbonTableIdentifier) + val segmentId2maxTaskNo = segmentIds.map { segId => + (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, carbonTablePath)) + }.toMap + + class SegmentPartitioner(segIdIndex: Map[String, Int], parallelism: Int) + extends org.apache.spark.Partitioner { + override def numPartitions: Int = segmentIdIndex.size * parallelism + + override def getPartition(key: Any): Int = { + val segId = key.asInstanceOf[String] + // partitionId + segmentIdIndex(segId) * parallelism + Random.nextInt(parallelism) + } + } - val nodeNumOfData = groupBySegmentRdd.partitions.flatMap[String, Array[String]] { p => - DataLoadPartitionCoalescer.getPreferredLocs(groupBySegmentRdd, p).map(_.host) - }.distinct.size - val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData, - sqlContext.sparkContext) - val groupBySegmentAndNodeRdd = - new UpdateCoalescedRDD[(String, scala.Iterable[Row])](groupBySegmentRdd, - nodes.distinct.toArray) + val partitionByRdd = keyRDD.partitionBy(new SegmentPartitioner(segmentIdIndex, + segmentUpdateParallelism)) - res = groupBySegmentAndNodeRdd.map(x => - triggerDataLoadForSegment(x._1, x._2.toIterator).toList - ).collect() + // because partitionId=segmentIdIndex*parallelism+RandomPart and RandomPart<parallelism, + // so segmentIdIndex=partitionId/parallelism, this has been verified. + res = partitionByRdd.map(_._2).mapPartitions { partition => + val partitionId = TaskContext.getPartitionId() + val segIdIndex = partitionId / segmentUpdateParallelism + val randomPart = partitionId - segIdIndex * segmentUpdateParallelism + val segId = segmentIds(segIdIndex) + val newTaskNo = segmentId2maxTaskNo(segId) + randomPart + 1 + List(triggerDataLoadForSegment(segId, newTaskNo, partition).toList).toIterator + }.collect() } def loadDataForPartitionTable(): Unit = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/90ce0d45/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index f556a05..34872b2 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -19,7 +19,6 @@ package org.apache.carbondata.spark.rdd import java.text.SimpleDateFormat import java.util -import java.util.UUID import java.util.concurrent._ import scala.collection.JavaConverters._ @@ -32,10 +31,10 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.io.NullWritable import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} -import org.apache.spark.{SparkEnv, SparkException} -import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD, UpdateCoalescedRDD} +import org.apache.spark.{SparkEnv, SparkException, TaskContext} +import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD} import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext} -import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, ExecutionErrors, UpdateTableModel} import org.apache.spark.sql.hive.DistributionUtil import org.apache.spark.util.SparkUtil @@ -59,8 +58,8 @@ import org.apache.carbondata.processing.csvload.{BlockDetails, CSVInputFormat, S import org.apache.carbondata.processing.etl.DataLoadingException import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType} import org.apache.carbondata.processing.model.CarbonLoadModel -import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder -import org.apache.carbondata.processing.newflow.exception.{BadRecordFoundException, CarbonDataLoadingException} +import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException import org.apache.carbondata.processing.newflow.sort.SortScopeOptions import org.apache.carbondata.processing.util.CarbonDataProcessorUtil import org.apache.carbondata.spark.{DataLoadResultImpl, PartitionFactory, _} @@ -682,7 +681,9 @@ object CarbonDataRDDFactory { } def loadDataFrameForUpdate(): Unit = { - def triggerDataLoadForSegment(key: String, + val segmentUpdateParallelism = CarbonProperties.getInstance().getParallelismForSegmentUpdate + + def triggerDataLoadForSegment(key: String, taskNo: Int, iter: Iterator[Row]): Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] = { val rddResult = new updateResultImpl() val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) @@ -693,11 +694,7 @@ object CarbonDataRDDFactory { var uniqueLoadStatusId = "" try { val segId = key - val taskNo = CarbonUpdateUtil - .getLatestTaskIdForSegment(segId, - CarbonStorePath.getCarbonTablePath(carbonLoadModel.getStorePath, - carbonTable.getCarbonTableIdentifier)) - val index = taskNo + 1 + val index = taskNo uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + (index + "_0") @@ -720,8 +717,6 @@ object CarbonDataRDDFactory { // storeLocation = CarbonDataLoadRDD.initialize(carbonLoadModel, index) loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS) - val rddIteratorKey = CarbonCommonConstants.RDDUTIL_UPDATE_KEY + - UUID.randomUUID().toString UpdateDataLoad.DataLoadForUpdate(segId, index, iter, @@ -756,26 +751,52 @@ object CarbonDataRDDFactory { val updateRdd = dataFrame.get.rdd + // return directly if no rows to update + val noRowsToUpdate = updateRdd.isEmpty() + if (noRowsToUpdate) { + res = Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]]() + return + } + // splitting as (key, value) i.e., (segment, updatedRows) val keyRDD = updateRdd.map(row => - // splitting as (key, value) i.e., (segment, updatedRows) - (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*)) - ) - val groupBySegmentRdd = keyRDD.groupByKey() + (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*))) + + val loadMetadataDetails = SegmentStatusManager.readLoadMetadata( + carbonTable.getMetaDataFilepath) + val segmentIds = loadMetadataDetails.map(_.getLoadName) + val segmentIdIndex = segmentIds.zipWithIndex.toMap + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonLoadModel.getStorePath, + carbonTable.getCarbonTableIdentifier) + val segmentId2maxTaskNo = segmentIds.map { segId => + (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, carbonTablePath)) + }.toMap + + class SegmentPartitioner(segIdIndex: Map[String, Int], parallelism: Int) + extends org.apache.spark.Partitioner { + override def numPartitions: Int = segmentIdIndex.size * parallelism + + override def getPartition(key: Any): Int = { + val segId = key.asInstanceOf[String] + // partitionId + segmentIdIndex(segId) * parallelism + Random.nextInt(parallelism) + } + } - val nodeNumOfData = groupBySegmentRdd.partitions.flatMap[String, Array[String]] { p => - DataLoadPartitionCoalescer.getPreferredLocs(groupBySegmentRdd, p).map(_.host) - }.distinct.size - val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData, - sqlContext.sparkContext) - val groupBySegmentAndNodeRdd = - new UpdateCoalescedRDD[(String, scala.Iterable[Row])](groupBySegmentRdd, - nodes.distinct.toArray) + val partitionByRdd = keyRDD.partitionBy(new SegmentPartitioner(segmentIdIndex, + segmentUpdateParallelism)) - res = groupBySegmentAndNodeRdd.map(x => - triggerDataLoadForSegment(x._1, x._2.toIterator).toList - ).collect() + // because partitionId=segmentIdIndex*parallelism+RandomPart and RandomPart<parallelism, + // so segmentIdIndex=partitionId/parallelism, this has been verified. + res = partitionByRdd.map(_._2).mapPartitions { partition => + val partitionId = TaskContext.getPartitionId() + val segIdIndex = partitionId / segmentUpdateParallelism + val randomPart = partitionId - segIdIndex * segmentUpdateParallelism + val segId = segmentIds(segIdIndex) + val newTaskNo = segmentId2maxTaskNo(segId) + randomPart + 1 + List(triggerDataLoadForSegment(segId, newTaskNo, partition).toList).toIterator + }.collect() } def loadDataForPartitionTable(): Unit = {
