This is an automated email from the ASF dual-hosted git repository. qiangcai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 0f6dff4 [CARBONDATA-3763] Fix wrong insert result during insert stage command 0f6dff4 is described below commit 0f6dff4a712714d8b15f7e9a75cd7e87e34c9d5e Author: ajantha-bhat <ajanthab...@gmail.com> AuthorDate: Fri Apr 3 20:21:52 2020 +0530 [CARBONDATA-3763] Fix wrong insert result during insert stage command Why is this PR needed? For insertStageCommand, spark is reusing the internalRow as two times we transform from RDD[InternalRow] -> dataframe -> logical Plan -> RDD[InternalRow]. So, same data is inserted on other rows What changes were proposed in this PR? Copy the internalRow after the last transform. This closes #3694 --- .../carbon/flink/TestCarbonPartitionWriter.scala | 3 +++ .../spark/rdd/CarbonDataRDDFactory.scala | 5 +++- .../management/CarbonInsertFromStageCommand.scala | 7 +++-- .../command/management/CommonLoadUtils.scala | 30 +++++++++++++++++----- .../constants/DataLoadProcessorConstants.java | 3 +++ 5 files changed, 39 insertions(+), 9 deletions(-) diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala index 6ca877c..73284ff 100644 --- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala +++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala @@ -194,6 +194,9 @@ class TestCarbonPartitionWriter extends QueryTest { checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000))) + checkAnswer(sql(s"SELECT stringField FROM $tableName order by stringField limit 2"), + Seq(Row("test0"), Row("test1"))) + val rows = sql(s"SELECT * FROM $tableName limit 1").collect() assertResult(1)(rows.length) assertResult(Array[Byte](2, 3, 4))(rows(0).get(rows(0).fieldIndex("binaryfield")).asInstanceOf[GenericRowWithSchema](0)) diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index d57546c..8709d11 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -375,7 +375,10 @@ object CarbonDataRDDFactory { .getFactTable .getListOfColumns .asScala.filterNot(col => col.isInvisible || col.getColumnName.contains(".")) - val convertedRdd = CommonLoadUtils.getConvertedInternalRow(colSchema, scanResultRdd.get) + val convertedRdd = CommonLoadUtils.getConvertedInternalRow( + colSchema, + scanResultRdd.get, + isInsertFromStageCommand = false) if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { DataLoadProcessBuilderOnSpark.insertDataUsingGlobalSortWithInternalRow(sqlContext .sparkSession, diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala index dd1efe8..c323f10 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala @@ -44,6 +44,7 @@ import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusMan import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop.CarbonInputSplit import org.apache.carbondata.processing.loading.FailureCauses +import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark @@ -352,8 +353,10 @@ case class CarbonInsertFromStageCommand( CarbonInsertIntoCommand( databaseNameOp = Option(table.getDatabaseName), tableName = table.getTableName, - options = scala.collection.immutable.Map("fileheader" -> header, - "binary_decoder" -> "base64"), + options = scala.collection.immutable.Map( + "fileheader" -> header, + "binary_decoder" -> "base64", + DataLoadProcessorConstants.IS_INSERT_STAGE_COMMAND -> "true"), isOverwriteTable = false, logicalPlan = selectedDataFrame.queryExecution.analyzed, tableInfo = table.getTableInfo, diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala index 20222fe..a7cc48a 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala @@ -551,7 +551,8 @@ object CommonLoadUtils { curAttributes: Seq[AttributeReference], sortScope: SortScopeOptions.SortScope, table: CarbonTable, - partition: Map[String, Option[String]]): (LogicalPlan, Int, Option[RDD[InternalRow]]) = { + partition: Map[String, Option[String]], + isInsertFromStageCommand: Boolean): (LogicalPlan, Int, Option[RDD[InternalRow]]) = { // keep partition column to end if exists var colSchema = table.getTableInfo .getFactTable @@ -571,7 +572,10 @@ object CommonLoadUtils { colSchema = colSchema.filterNot(x => x.isInvisible || x.getColumnName.contains(".") || x.getSchemaOrdinal == -1) } - val updatedRdd: RDD[InternalRow] = CommonLoadUtils.getConvertedInternalRow(colSchema, rdd) + val updatedRdd: RDD[InternalRow] = CommonLoadUtils.getConvertedInternalRow( + colSchema, + rdd, + isInsertFromStageCommand) transformQuery(updatedRdd, sparkSession, loadModel, @@ -722,8 +726,10 @@ object CommonLoadUtils { } } - def getConvertedInternalRow(columnSchema: Seq[ColumnSchema], - rdd: RDD[InternalRow]): RDD[InternalRow] = { + def getConvertedInternalRow( + columnSchema: Seq[ColumnSchema], + rdd: RDD[InternalRow], + isInsertFromStageCommand: Boolean): RDD[InternalRow] = { // Converts the data as per the loading steps before give it to writer or sorter var timeStampIndex = scala.collection.mutable.Set[Int]() var dateIndex = scala.collection.mutable.Set[Int]() @@ -740,7 +746,17 @@ object CommonLoadUtils { } i = i + 1 } - val updatedRdd: RDD[InternalRow] = rdd.map { internalRow => + val updatedRdd: RDD[InternalRow] = rdd.map { internalRowOriginal => + val internalRow = if (isInsertFromStageCommand) { + // Insert stage command, logical plan already consist of LogicalRDD of internalRow. + // When it is converted to DataFrame, spark is reusing the same internalRow. + // So, need to have a copy before the last transformation. + // TODO: Even though copying internalRow is faster, we should avoid it + // by finding a better way + internalRowOriginal.copy() + } else { + internalRowOriginal + } for (index <- timeStampIndex) { if (internalRow.getLong(index) == 0) { internalRow.setNullAt(index) @@ -961,7 +977,9 @@ object CommonLoadUtils { attributes, sortScope, table, - loadParams.finalPartition) + loadParams.finalPartition, + loadParams.optionsOriginal + .contains(DataLoadProcessorConstants.IS_INSERT_STAGE_COMMAND)) partitionsLen = partitions persistedRDD = persistedRDDLocal transformedPlan diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java b/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java index c7ef81b..c3cf1a4 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java @@ -40,4 +40,7 @@ public final class DataLoadProcessorConstants { // to indicate that it is optimized insert flow without rearrange of each data rows public static final String NO_REARRANGE_OF_ROWS = "NO_REARRANGE_OF_ROWS"; + + // to indicate CarbonInsertFromStageCommand flow + public static final String IS_INSERT_STAGE_COMMAND = "IS_INSERT_STAGE_COMMAND"; }