Repository: carbondata
Updated Branches:
  refs/heads/master e8df8ba51 -> 90ce0d459


Enhance update performance by increasing parallelism

+ Increase parallelism while processing one segment in update
+ Use partitionBy instead of groupby
+ Return directly for no-rows-update case
+ Add a property to configure the parallelism
+ Clean up local files after update (previous bugs)
+ Remove useless imports

fix code style

(cherry picked from commit 49d44b156a77d005d21123c886dc4332bf1f03cf)


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/90ce0d45
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/90ce0d45
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/90ce0d45

Branch: refs/heads/master
Commit: 90ce0d459abc76573e24ba57ec1a58c85e77abdd
Parents: e8df8ba
Author: xuchuanyin <[email protected]>
Authored: Fri Aug 11 23:00:20 2017 +0800
Committer: Jacky Li <[email protected]>
Committed: Fri Aug 18 14:56:09 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   | 18 +++++
 .../carbondata/core/util/CarbonProperties.java  | 32 ++++++++
 docs/configuration-parameters.md                |  2 +-
 .../iud/UpdateCarbonTableTestCase.scala         | 24 ++++++
 .../carbondata/spark/rdd/UpdateDataLoad.scala   |  3 +
 .../spark/rdd/CarbonDataRDDFactory.scala        | 77 ++++++++++++-------
 .../spark/rdd/CarbonDataRDDFactory.scala        | 79 +++++++++++++-------
 7 files changed, 177 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/90ce0d45/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index de8cdb1..8939a7e 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -544,6 +544,10 @@ public final class CarbonCommonConstants {
    */
   public static final String UNDERSCORE = "_";
   /**
+   * DASH
+   */
+  public static final String DASH = "-";
+  /**
    * POINT
    */
   public static final String POINT = ".";
@@ -1330,6 +1334,20 @@ public final class CarbonCommonConstants {
    */
   public static final String CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT = 
"MEMORY_ONLY";
 
+  /**
+   * property for configuring parallelism per segment when doing an update. 
Increase this
+   * value will avoid data screw problem for a large segment.
+   * Refer to CARBONDATA-1373 for more details.
+   */
+  @CarbonProperty
+  public static final String CARBON_UPDATE_SEGMENT_PARALLELISM =
+      "carbon.update.segment.parallelism";
+
+  /**
+   * In default we will not optimize the update
+   */
+  public static final String CARBON_UPDATE_SEGMENT_PARALLELISM_DEFAULT = "1";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90ce0d45/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 2755990..669d3f2 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -895,6 +895,38 @@ public final class CarbonProperties {
   }
 
   /**
+   * Returns parallelism for segment update
+   * @return int
+   */
+  public int getParallelismForSegmentUpdate() {
+    int parallelism = Integer.parseInt(
+        CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM_DEFAULT);
+    boolean isInvalidValue = false;
+    try {
+      String strParallelism = 
getProperty(CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM,
+          CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM_DEFAULT);
+      parallelism = Integer.parseInt(strParallelism);
+      if (parallelism <= 0 || parallelism > 1000) {
+        isInvalidValue = true;
+      }
+    } catch (NumberFormatException e) {
+      isInvalidValue = true;
+    }
+
+    if (isInvalidValue) {
+      LOGGER.error("The specified value for property "
+          + CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM
+          + " is incorrect. Correct value should be in range of 0 - 1000."
+          + " Taking the default value: "
+          + CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM_DEFAULT);
+      parallelism = Integer.parseInt(
+          CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM_DEFAULT);
+    }
+
+    return parallelism;
+  }
+
+  /**
    * returns true if carbon property
    * @param key
    * @return

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90ce0d45/docs/configuration-parameters.md
----------------------------------------------------------------------
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 0223688..bdd551a 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -74,7 +74,7 @@ This section provides the details of all the configurations 
required for CarbonD
 | carbon.horizontal.compaction.enable | true | This property is used to turn 
ON/OFF horizontal compaction. After every DELETE and UPDATE statement, 
horizontal compaction may occur in case the delta (DELETE/ UPDATE) files 
becomes more than specified threshold. |  |
 | carbon.horizontal.UPDATE.compaction.threshold | 1 | This property specifies 
the threshold limit on number of UPDATE delta files within a segment. In case 
the number of delta files goes beyond the threshold, the UPDATE delta files 
within the segment becomes eligible for horizontal compaction and compacted 
into single UPDATE delta file. | Values between 1 to 10000. |
 | carbon.horizontal.DELETE.compaction.threshold | 1 | This property specifies 
the threshold limit on number of DELETE delta files within a block of a 
segment. In case the number of delta files goes beyond the threshold, the 
DELETE delta files for the particular block of the segment becomes eligible for 
horizontal compaction and compacted into single DELETE delta file. | Values 
between 1 to 10000. |
-
+| carbon.update.segment.parallelism | 1 | This property specifies the 
parallelism for each segment during update. If there are segments that contain 
too many records to update and the spark job encounter data-spill related 
errors, it is better to increase this property value. It is recommended to set 
this value to a multiple of the number of executors for balance. | Values 
between 1 to 1000. |
   
 
 * **Query Configuration**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90ce0d45/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 364cb81..ff0aadf 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -114,6 +114,30 @@ class UpdateCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("""drop table if exists iud.dest33""")
   }
 
+  test("update carbon table with optimized parallelism for segment") {
+    sql("""drop table if exists iud.dest_opt_segment_parallelism""")
+    sql(
+      """create table iud.dest_opt_segment_parallelism (c1 string,c2 int,c3 
string,c5 string)
+        | STORED BY 'org.apache.carbondata.format'""".stripMargin)
+    sql(
+      s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv'
+         | INTO table iud.dest_opt_segment_parallelism""".stripMargin)
+    sql(
+      s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv'
+         | INTO table iud.dest_opt_segment_parallelism""".stripMargin)
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM, "3")
+    sql(
+      """update iud.dest_opt_segment_parallelism d
+        | set (c3,c5 ) = (select s.c33 ,s.c55 from iud.source2 s where d.c1 = 
s.c11)
+        | where d.c1 = 'a'""".stripMargin).show()
+    checkAnswer(
+      sql("""select c3,c5 from iud.dest_opt_segment_parallelism where 
c1='a'"""),
+      Seq(Row("MGM","Disco"),Row("MGM","Disco"))
+    )
+    sql("""drop table if exists iud.dest_opt_segment_parallelism""")
+  }
+
   test("update carbon table without alias in set three columns") {
     sql("""drop table if exists iud.dest44""")
     sql("""create table iud.dest44 (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90ce0d45/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
index 4cf2135..f45dc83 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
@@ -28,6 +28,7 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.newflow.DataLoadExecutor
+import org.apache.carbondata.spark.load.CarbonLoaderUtil
 
 /**
  * Data load in case of update command .
@@ -62,6 +63,8 @@ object UpdateDataLoad {
       case e: Exception =>
         LOGGER.error(e)
         throw e
+    } finally {
+      CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(carbonLoadModel, 
false, false)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90ce0d45/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 7c2bf22..669f942 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -19,7 +19,6 @@ package org.apache.carbondata.spark.rdd
 
 import java.text.SimpleDateFormat
 import java.util
-import java.util.UUID
 import java.util.concurrent._
 
 import scala.collection.JavaConverters._
@@ -32,8 +31,8 @@ import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.NullWritable
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
-import org.apache.spark.{SparkEnv, SparkException}
-import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, 
NewHadoopRDD, RDD, UpdateCoalescedRDD}
+import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, 
NewHadoopRDD, RDD}
 import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
 import org.apache.spark.sql.execution.command.{AlterTableModel, 
CompactionModel, ExecutionErrors, UpdateTableModel}
 import org.apache.spark.sql.hive.DistributionUtil
@@ -51,14 +50,14 @@ import 
org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.scan.partition.PartitionUtil
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatusManager}
 import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.processing.csvload.{BlockDetails, CSVInputFormat, 
StringArrayWritable}
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, 
CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.processing.model.CarbonLoadModel
-import 
org.apache.carbondata.processing.newflow.exception.{BadRecordFoundException, 
CarbonDataLoadingException}
+import 
org.apache.carbondata.processing.newflow.exception.BadRecordFoundException
 import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder
 import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
 import org.apache.carbondata.processing.newflow.sort.SortScopeOptions
@@ -583,7 +582,9 @@ object CarbonDataRDDFactory {
       }
 
       def loadDataFrameForUpdate(): Unit = {
-        def triggerDataLoadForSegment(key: String,
+        val segmentUpdateParallelism = 
CarbonProperties.getInstance().getParallelismForSegmentUpdate
+
+        def triggerDataLoadForSegment(key: String, taskNo: Int,
             iter: Iterator[Row]): Iterator[(String, (LoadMetadataDetails, 
ExecutionErrors))] = {
           val rddResult = new updateResultImpl()
           val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -594,11 +595,7 @@ object CarbonDataRDDFactory {
             var uniqueLoadStatusId = ""
             try {
               val segId = key
-              val taskNo = CarbonUpdateUtil
-                .getLatestTaskIdForSegment(segId,
-                  
CarbonStorePath.getCarbonTablePath(carbonLoadModel.getStorePath,
-                    carbonTable.getCarbonTableIdentifier))
-              val index = taskNo + 1
+              val index = taskNo
               uniqueLoadStatusId = carbonLoadModel.getTableName +
                                    CarbonCommonConstants.UNDERSCORE +
                                    (index + "_0")
@@ -621,8 +618,6 @@ object CarbonDataRDDFactory {
 
               // storeLocation = CarbonDataLoadRDD.initialize(carbonLoadModel, 
index)
               
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
-              val rddIteratorKey = CarbonCommonConstants.RDDUTIL_UPDATE_KEY +
-                                   UUID.randomUUID().toString
               UpdateDataLoad.DataLoadForUpdate(segId,
                 index,
                 iter,
@@ -657,26 +652,52 @@ object CarbonDataRDDFactory {
 
         val updateRdd = dataFrame.get.rdd
 
+        // return directly if no rows to update
+        val noRowsToUpdate = updateRdd.isEmpty()
+        if (noRowsToUpdate) {
+          res = Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]]()
+          return
+        }
 
+        // splitting as (key, value) i.e., (segment, updatedRows)
         val keyRDD = updateRdd.map(row =>
-          // splitting as (key, value) i.e., (segment, updatedRows)
-          (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 
1): _*))
-        )
-        val groupBySegmentRdd = keyRDD.groupByKey()
+          (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 
1): _*)))
+
+        val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
+          carbonTable.getMetaDataFilepath)
+        val segmentIds = loadMetadataDetails.map(_.getLoadName)
+        val segmentIdIndex = segmentIds.zipWithIndex.toMap
+        val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(carbonLoadModel.getStorePath,
+          carbonTable.getCarbonTableIdentifier)
+        val segmentId2maxTaskNo = segmentIds.map { segId =>
+          (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, 
carbonTablePath))
+        }.toMap
+
+        class SegmentPartitioner(segIdIndex: Map[String, Int], parallelism: 
Int)
+          extends org.apache.spark.Partitioner {
+          override def numPartitions: Int = segmentIdIndex.size * parallelism
+
+          override def getPartition(key: Any): Int = {
+            val segId = key.asInstanceOf[String]
+            // partitionId
+            segmentIdIndex(segId) * parallelism + Random.nextInt(parallelism)
+          }
+        }
 
-        val nodeNumOfData = groupBySegmentRdd.partitions.flatMap[String, 
Array[String]] { p =>
-          DataLoadPartitionCoalescer.getPreferredLocs(groupBySegmentRdd, 
p).map(_.host)
-        }.distinct.size
-        val nodes = 
DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData,
-          sqlContext.sparkContext)
-        val groupBySegmentAndNodeRdd =
-          new UpdateCoalescedRDD[(String, 
scala.Iterable[Row])](groupBySegmentRdd,
-            nodes.distinct.toArray)
+        val partitionByRdd = keyRDD.partitionBy(new 
SegmentPartitioner(segmentIdIndex,
+          segmentUpdateParallelism))
 
-        res = groupBySegmentAndNodeRdd.map(x =>
-          triggerDataLoadForSegment(x._1, x._2.toIterator).toList
-        ).collect()
+        // because partitionId=segmentIdIndex*parallelism+RandomPart and 
RandomPart<parallelism,
+        // so segmentIdIndex=partitionId/parallelism, this has been verified.
+        res = partitionByRdd.map(_._2).mapPartitions { partition =>
+          val partitionId = TaskContext.getPartitionId()
+          val segIdIndex = partitionId / segmentUpdateParallelism
+          val randomPart = partitionId - segIdIndex * segmentUpdateParallelism
+          val segId = segmentIds(segIdIndex)
+          val newTaskNo = segmentId2maxTaskNo(segId) + randomPart + 1
 
+          List(triggerDataLoadForSegment(segId, newTaskNo, 
partition).toList).toIterator
+        }.collect()
       }
 
       def loadDataForPartitionTable(): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90ce0d45/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index f556a05..34872b2 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -19,7 +19,6 @@ package org.apache.carbondata.spark.rdd
 
 import java.text.SimpleDateFormat
 import java.util
-import java.util.UUID
 import java.util.concurrent._
 
 import scala.collection.JavaConverters._
@@ -32,10 +31,10 @@ import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.NullWritable
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
-import org.apache.spark.{SparkEnv, SparkException}
-import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, 
NewHadoopRDD, RDD, UpdateCoalescedRDD}
+import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, 
NewHadoopRDD, RDD}
 import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
-import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.{AlterTableModel, 
CompactionModel, ExecutionErrors, UpdateTableModel}
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.util.SparkUtil
 
@@ -59,8 +58,8 @@ import 
org.apache.carbondata.processing.csvload.{BlockDetails, CSVInputFormat, S
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, 
CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder
-import 
org.apache.carbondata.processing.newflow.exception.{BadRecordFoundException, 
CarbonDataLoadingException}
+import 
org.apache.carbondata.processing.newflow.exception.BadRecordFoundException
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
 import org.apache.carbondata.processing.newflow.sort.SortScopeOptions
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
 import org.apache.carbondata.spark.{DataLoadResultImpl, PartitionFactory, _}
@@ -682,7 +681,9 @@ object CarbonDataRDDFactory {
       }
 
       def loadDataFrameForUpdate(): Unit = {
-        def triggerDataLoadForSegment(key: String,
+        val segmentUpdateParallelism = 
CarbonProperties.getInstance().getParallelismForSegmentUpdate
+
+        def triggerDataLoadForSegment(key: String, taskNo: Int,
             iter: Iterator[Row]): Iterator[(String, (LoadMetadataDetails, 
ExecutionErrors))] = {
           val rddResult = new updateResultImpl()
           val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -693,11 +694,7 @@ object CarbonDataRDDFactory {
             var uniqueLoadStatusId = ""
             try {
               val segId = key
-              val taskNo = CarbonUpdateUtil
-                .getLatestTaskIdForSegment(segId,
-                  
CarbonStorePath.getCarbonTablePath(carbonLoadModel.getStorePath,
-                    carbonTable.getCarbonTableIdentifier))
-              val index = taskNo + 1
+              val index = taskNo
               uniqueLoadStatusId = carbonLoadModel.getTableName +
                                    CarbonCommonConstants.UNDERSCORE +
                                    (index + "_0")
@@ -720,8 +717,6 @@ object CarbonDataRDDFactory {
 
               // storeLocation = CarbonDataLoadRDD.initialize(carbonLoadModel, 
index)
               
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
-              val rddIteratorKey = CarbonCommonConstants.RDDUTIL_UPDATE_KEY +
-                                   UUID.randomUUID().toString
               UpdateDataLoad.DataLoadForUpdate(segId,
                 index,
                 iter,
@@ -756,26 +751,52 @@ object CarbonDataRDDFactory {
 
         val updateRdd = dataFrame.get.rdd
 
+        // return directly if no rows to update
+        val noRowsToUpdate = updateRdd.isEmpty()
+        if (noRowsToUpdate) {
+          res = Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]]()
+          return
+        }
 
+        // splitting as (key, value) i.e., (segment, updatedRows)
         val keyRDD = updateRdd.map(row =>
-          // splitting as (key, value) i.e., (segment, updatedRows)
-          (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 
1): _*))
-        )
-        val groupBySegmentRdd = keyRDD.groupByKey()
+            (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 
1): _*)))
+
+        val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
+          carbonTable.getMetaDataFilepath)
+        val segmentIds = loadMetadataDetails.map(_.getLoadName)
+        val segmentIdIndex = segmentIds.zipWithIndex.toMap
+        val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(carbonLoadModel.getStorePath,
+          carbonTable.getCarbonTableIdentifier)
+        val segmentId2maxTaskNo = segmentIds.map { segId =>
+          (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, 
carbonTablePath))
+        }.toMap
+
+        class SegmentPartitioner(segIdIndex: Map[String, Int], parallelism: 
Int)
+          extends org.apache.spark.Partitioner {
+          override def numPartitions: Int = segmentIdIndex.size * parallelism
+
+          override def getPartition(key: Any): Int = {
+            val segId = key.asInstanceOf[String]
+            // partitionId
+            segmentIdIndex(segId) * parallelism + Random.nextInt(parallelism)
+          }
+        }
 
-        val nodeNumOfData = groupBySegmentRdd.partitions.flatMap[String, 
Array[String]] { p =>
-          DataLoadPartitionCoalescer.getPreferredLocs(groupBySegmentRdd, 
p).map(_.host)
-        }.distinct.size
-        val nodes = 
DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData,
-          sqlContext.sparkContext)
-        val groupBySegmentAndNodeRdd =
-          new UpdateCoalescedRDD[(String, 
scala.Iterable[Row])](groupBySegmentRdd,
-            nodes.distinct.toArray)
+        val partitionByRdd = keyRDD.partitionBy(new 
SegmentPartitioner(segmentIdIndex,
+          segmentUpdateParallelism))
 
-        res = groupBySegmentAndNodeRdd.map(x =>
-          triggerDataLoadForSegment(x._1, x._2.toIterator).toList
-        ).collect()
+        // because partitionId=segmentIdIndex*parallelism+RandomPart and 
RandomPart<parallelism,
+        // so segmentIdIndex=partitionId/parallelism, this has been verified.
+        res = partitionByRdd.map(_._2).mapPartitions { partition =>
+          val partitionId = TaskContext.getPartitionId()
+          val segIdIndex = partitionId / segmentUpdateParallelism
+          val randomPart = partitionId - segIdIndex * segmentUpdateParallelism
+          val segId = segmentIds(segIdIndex)
+          val newTaskNo = segmentId2maxTaskNo(segId) + randomPart + 1
 
+          List(triggerDataLoadForSegment(segId, newTaskNo, 
partition).toList).toIterator
+        }.collect()
       }
 
       def loadDataForPartitionTable(): Unit = {

Reply via email to