This is an automated email from the ASF dual-hosted git repository.

qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b59468  [CARBONDATA-3788] Fix insert failure during global sort with 
huge data in new insert flow
4b59468 is described below

commit 4b594686071185b910f6dcc7e51708b43e62d541
Author: ajantha-bhat <[email protected]>
AuthorDate: Thu Apr 30 09:26:10 2020 +0530

    [CARBONDATA-3788] Fix insert failure during global sort with huge data in 
new insert flow
    
    Why is this PR needed?
    Spark is resuing the internalRow in global sort partition flow with huge 
data.
    As RDD of Internal row is persisted for global sort.
    
    What changes were proposed in this PR?
    Need to have a copy and work on the internalRow before the last transform 
for global sort partition flow.
    Already this was doing for insert stage command (which uses global sort 
partition)
    
    This closes  #3732
---
 .../carbondata/spark/rdd/CarbonDataRDDFactory.scala      |  2 +-
 .../management/CarbonInsertFromStageCommand.scala        |  7 ++-----
 .../execution/command/management/CommonLoadUtils.scala   | 16 +++++++---------
 .../loading/constants/DataLoadProcessorConstants.java    |  3 ---
 4 files changed, 10 insertions(+), 18 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8b67bd4..a8fda4d 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -378,7 +378,7 @@ object CarbonDataRDDFactory {
             val convertedRdd = CommonLoadUtils.getConvertedInternalRow(
               colSchema,
               scanResultRdd.get,
-              isInsertFromStageCommand = false)
+              isGlobalSortPartition = false)
             if (isSortTable && 
sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
               
DataLoadProcessBuilderOnSpark.insertDataUsingGlobalSortWithInternalRow(sqlContext
                 .sparkSession,
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index c323f10..dd1efe8 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -44,7 +44,6 @@ import 
org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusMan
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.CarbonInputSplit
 import org.apache.carbondata.processing.loading.FailureCauses
-import 
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
@@ -353,10 +352,8 @@ case class CarbonInsertFromStageCommand(
         CarbonInsertIntoCommand(
           databaseNameOp = Option(table.getDatabaseName),
           tableName = table.getTableName,
-          options = scala.collection.immutable.Map(
-            "fileheader" -> header,
-            "binary_decoder" -> "base64",
-            DataLoadProcessorConstants.IS_INSERT_STAGE_COMMAND -> "true"),
+          options = scala.collection.immutable.Map("fileheader" -> header,
+            "binary_decoder" -> "base64"),
           isOverwriteTable = false,
           logicalPlan = selectedDataFrame.queryExecution.analyzed,
           tableInfo = table.getTableInfo,
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index e62078a..b7388f0 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -550,8 +550,7 @@ object CommonLoadUtils {
       curAttributes: Seq[AttributeReference],
       sortScope: SortScopeOptions.SortScope,
       table: CarbonTable,
-      partition: Map[String, Option[String]],
-      isInsertFromStageCommand: Boolean): (LogicalPlan, Int, 
Option[RDD[InternalRow]]) = {
+      partition: Map[String, Option[String]]): (LogicalPlan, Int, 
Option[RDD[InternalRow]]) = {
     // keep partition column to end if exists
     var colSchema = table.getTableInfo
       .getFactTable
@@ -574,7 +573,7 @@ object CommonLoadUtils {
     val updatedRdd: RDD[InternalRow] = CommonLoadUtils.getConvertedInternalRow(
       colSchema,
       rdd,
-      isInsertFromStageCommand)
+      sortScope == SortScopeOptions.SortScope.GLOBAL_SORT)
     transformQuery(updatedRdd,
       sparkSession,
       loadModel,
@@ -728,7 +727,7 @@ object CommonLoadUtils {
   def getConvertedInternalRow(
       columnSchema: Seq[ColumnSchema],
       rdd: RDD[InternalRow],
-      isInsertFromStageCommand: Boolean): RDD[InternalRow] = {
+      isGlobalSortPartition: Boolean): RDD[InternalRow] = {
     // Converts the data as per the loading steps before give it to writer or 
sorter
     var timeStampIndex = scala.collection.mutable.Set[Int]()
     var dateIndex = scala.collection.mutable.Set[Int]()
@@ -746,8 +745,9 @@ object CommonLoadUtils {
       i = i + 1
     }
     val updatedRdd: RDD[InternalRow] = rdd.map { internalRowOriginal =>
-      val internalRow = if (isInsertFromStageCommand) {
-        // Insert stage command, logical plan already consist of LogicalRDD of 
internalRow.
+      val internalRow = if (isGlobalSortPartition) {
+        // Insert stage command & global sort partition flow (where we persist 
rdd[internalRow]),
+        // logical plan already consist of LogicalRDD of internalRow.
         // When it is converted to DataFrame, spark is reusing the same 
internalRow.
         // So, need to have a copy before the last transformation.
         // TODO: Even though copying internalRow is faster, we should avoid it
@@ -977,9 +977,7 @@ object CommonLoadUtils {
               attributes,
               sortScope,
               table,
-              loadParams.finalPartition,
-              loadParams.optionsOriginal
-                .contains(DataLoadProcessorConstants.IS_INSERT_STAGE_COMMAND))
+              loadParams.finalPartition)
           partitionsLen = partitions
           persistedRDD = persistedRDDLocal
           transformedPlan
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
index c3cf1a4..c7ef81b 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
@@ -40,7 +40,4 @@ public final class DataLoadProcessorConstants {
 
   // to indicate that it is optimized insert flow without rearrange of each 
data rows
   public static final String NO_REARRANGE_OF_ROWS = "NO_REARRANGE_OF_ROWS";
-
-  // to indicate CarbonInsertFromStageCommand flow
-  public static final String IS_INSERT_STAGE_COMMAND = 
"IS_INSERT_STAGE_COMMAND";
 }

Reply via email to