This is an automated email from the ASF dual-hosted git repository.
qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 4b59468 [CARBONDATA-3788] Fix insert failure during global sort with
huge data in new insert flow
4b59468 is described below
commit 4b594686071185b910f6dcc7e51708b43e62d541
Author: ajantha-bhat <[email protected]>
AuthorDate: Thu Apr 30 09:26:10 2020 +0530
[CARBONDATA-3788] Fix insert failure during global sort with huge data in
new insert flow
Why is this PR needed?
Spark is resuing the internalRow in global sort partition flow with huge
data.
As RDD of Internal row is persisted for global sort.
What changes were proposed in this PR?
Need to have a copy and work on the internalRow before the last transform
for global sort partition flow.
Already this was doing for insert stage command (which uses global sort
partition)
This closes #3732
---
.../carbondata/spark/rdd/CarbonDataRDDFactory.scala | 2 +-
.../management/CarbonInsertFromStageCommand.scala | 7 ++-----
.../execution/command/management/CommonLoadUtils.scala | 16 +++++++---------
.../loading/constants/DataLoadProcessorConstants.java | 3 ---
4 files changed, 10 insertions(+), 18 deletions(-)
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8b67bd4..a8fda4d 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -378,7 +378,7 @@ object CarbonDataRDDFactory {
val convertedRdd = CommonLoadUtils.getConvertedInternalRow(
colSchema,
scanResultRdd.get,
- isInsertFromStageCommand = false)
+ isGlobalSortPartition = false)
if (isSortTable &&
sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
DataLoadProcessBuilderOnSpark.insertDataUsingGlobalSortWithInternalRow(sqlContext
.sparkSession,
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index c323f10..dd1efe8 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -44,7 +44,6 @@ import
org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusMan
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.CarbonInputSplit
import org.apache.carbondata.processing.loading.FailureCauses
-import
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
@@ -353,10 +352,8 @@ case class CarbonInsertFromStageCommand(
CarbonInsertIntoCommand(
databaseNameOp = Option(table.getDatabaseName),
tableName = table.getTableName,
- options = scala.collection.immutable.Map(
- "fileheader" -> header,
- "binary_decoder" -> "base64",
- DataLoadProcessorConstants.IS_INSERT_STAGE_COMMAND -> "true"),
+ options = scala.collection.immutable.Map("fileheader" -> header,
+ "binary_decoder" -> "base64"),
isOverwriteTable = false,
logicalPlan = selectedDataFrame.queryExecution.analyzed,
tableInfo = table.getTableInfo,
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index e62078a..b7388f0 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -550,8 +550,7 @@ object CommonLoadUtils {
curAttributes: Seq[AttributeReference],
sortScope: SortScopeOptions.SortScope,
table: CarbonTable,
- partition: Map[String, Option[String]],
- isInsertFromStageCommand: Boolean): (LogicalPlan, Int,
Option[RDD[InternalRow]]) = {
+ partition: Map[String, Option[String]]): (LogicalPlan, Int,
Option[RDD[InternalRow]]) = {
// keep partition column to end if exists
var colSchema = table.getTableInfo
.getFactTable
@@ -574,7 +573,7 @@ object CommonLoadUtils {
val updatedRdd: RDD[InternalRow] = CommonLoadUtils.getConvertedInternalRow(
colSchema,
rdd,
- isInsertFromStageCommand)
+ sortScope == SortScopeOptions.SortScope.GLOBAL_SORT)
transformQuery(updatedRdd,
sparkSession,
loadModel,
@@ -728,7 +727,7 @@ object CommonLoadUtils {
def getConvertedInternalRow(
columnSchema: Seq[ColumnSchema],
rdd: RDD[InternalRow],
- isInsertFromStageCommand: Boolean): RDD[InternalRow] = {
+ isGlobalSortPartition: Boolean): RDD[InternalRow] = {
// Converts the data as per the loading steps before give it to writer or
sorter
var timeStampIndex = scala.collection.mutable.Set[Int]()
var dateIndex = scala.collection.mutable.Set[Int]()
@@ -746,8 +745,9 @@ object CommonLoadUtils {
i = i + 1
}
val updatedRdd: RDD[InternalRow] = rdd.map { internalRowOriginal =>
- val internalRow = if (isInsertFromStageCommand) {
- // Insert stage command, logical plan already consist of LogicalRDD of
internalRow.
+ val internalRow = if (isGlobalSortPartition) {
+ // Insert stage command & global sort partition flow (where we persist
rdd[internalRow]),
+ // logical plan already consist of LogicalRDD of internalRow.
// When it is converted to DataFrame, spark is reusing the same
internalRow.
// So, need to have a copy before the last transformation.
// TODO: Even though copying internalRow is faster, we should avoid it
@@ -977,9 +977,7 @@ object CommonLoadUtils {
attributes,
sortScope,
table,
- loadParams.finalPartition,
- loadParams.optionsOriginal
- .contains(DataLoadProcessorConstants.IS_INSERT_STAGE_COMMAND))
+ loadParams.finalPartition)
partitionsLen = partitions
persistedRDD = persistedRDDLocal
transformedPlan
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
index c3cf1a4..c7ef81b 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
@@ -40,7 +40,4 @@ public final class DataLoadProcessorConstants {
// to indicate that it is optimized insert flow without rearrange of each
data rows
public static final String NO_REARRANGE_OF_ROWS = "NO_REARRANGE_OF_ROWS";
-
- // to indicate CarbonInsertFromStageCommand flow
- public static final String IS_INSERT_STAGE_COMMAND =
"IS_INSERT_STAGE_COMMAND";
}