This is an automated email from the ASF dual-hosted git repository.
qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new ab00397 [CARBONDATA-3929] Improve CDC performance
ab00397 is described below
commit ab003979d9f67f7b9183768eb2d799a872d3561e
Author: akashrn5 <[email protected]>
AuthorDate: Fri May 29 00:08:25 2020 +0530
[CARBONDATA-3929] Improve CDC performance
Why is this PR needed?
This PR is to improve the CDC merge performance. CDC is currently very slow
in the case of full outer joins and slow in normal cases. Identified pain
points are as below:
1. currently we are writing the intermediate delete data to carbon format,
which is the columnar format, and we do a full scan which is slow. Here since
intermediate, we do full scan, compression, columnar format, its all-time
taking.
2. Full outer join case is very slow.
3. when we insert new data into new segments, we follow the old insert flow
with the converter step.
4. since we write the intermediate data carbon format, we use coalesce to
limit the partition to number of active executors.
What changes were proposed in this PR?
Some improvements points are identified as below
1. Write the intermediate data to a faster row format like Avro.
2. use bucketing on join column and do the repartition of the Dataframe
before performing the join operation, which avoids the shuffle on one side as
shuffle is major time consuming part in join.
3. make the insert flow to the new flow without the converter step.
4. remove coalesce and can use resource to write the intermediate Avro data
in a faster way.
Performance results
DataSize -> 2GB target table data
230MB source table data
InnerJoin case - around 17000+ deleted rows, 70400 odd updated rows
Full outer Join case - 2million target data, 0.2million src data, 70400 odd
rows updated and some deleted
Old Time(sec) New Time(sec)
Join Type 1st time query 2nd time query 1st time query 2nd time query
Inner Join 20 9.6 14 4.6
Full Outer Join 43 17.8 26 7.7
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #3856
---
integration/spark/pom.xml | 22 ++++
.../management/CarbonInsertIntoCommand.scala | 31 ++++--
.../mutation/merge/CarbonMergeDataSetCommand.scala | 115 +++++++++++----------
.../com/databricks/spark/avro/AvroWriter.scala | 51 +++++++++
.../spark/sql/avro/AvroFileFormatFactory.scala | 49 +++++++++
.../spark/sql/avro/AvroFileFormatFactory.scala | 48 +++++++++
.../spark/testsuite/merge/MergeTestCase.scala | 40 ++++++-
7 files changed, 295 insertions(+), 61 deletions(-)
diff --git a/integration/spark/pom.xml b/integration/spark/pom.xml
index b5f2b9d..d5f5772 100644
--- a/integration/spark/pom.xml
+++ b/integration/spark/pom.xml
@@ -154,6 +154,28 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>com.databricks</groupId>
+ <artifactId>spark-avro_${scala.binary.version}</artifactId>
+ <version>4.0.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-avro_${scala.binary.version}</artifactId>
+ <version>2.4.5</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<exclusions>
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 6a655e3..e622671 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.{AnalysisException, CarbonEnv,
CarbonToSparkAdapter,
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Literal,
NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
-import org.apache.spark.sql.execution.command.AtomicRunnableCommand
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand,
UpdateTableModel}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.util.CausedBy
@@ -64,7 +64,8 @@ case class CarbonInsertIntoCommand(databaseNameOp:
Option[String],
var tableInfo: TableInfo,
var internalOptions: Map[String, String] = Map.empty,
var partition: Map[String, Option[String]] = Map.empty,
- var operationContext: OperationContext = new OperationContext)
+ var operationContext: OperationContext = new OperationContext,
+ var updateModel: Option[UpdateTableModel] = None)
extends AtomicRunnableCommand {
var table: CarbonTable = _
@@ -200,11 +201,20 @@ case class CarbonInsertIntoCommand(databaseNameOp:
Option[String],
options = options.asJava,
isOverwriteTable = isOverwriteTable,
isDataFrame = true,
- updateModel = None,
+ updateModel = updateModel,
operationContext = operationContext)
// add the start entry for the new load in the table status file
- if (!table.isHivePartitionTable) {
+ if ((updateModel.isEmpty || updateModel.isDefined &&
updateModel.get.loadAsNewSegment)
+ && !table.isHivePartitionTable) {
+ if (updateModel.isDefined ) {
+ carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp)
+ }
+ CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
+ carbonLoadModel,
+ isOverwriteTable)
+ isUpdateTableStatusRequired = true
+ } else if (!table.isHivePartitionTable) {
CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
carbonLoadModel,
isOverwriteTable)
@@ -238,7 +248,7 @@ case class CarbonInsertIntoCommand(databaseNameOp:
Option[String],
partitionStatus,
None,
Some(scanResultRdd),
- None,
+ updateModel,
operationContext)
LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope)
val (rows, loadResult) = insertData(loadParams)
@@ -439,6 +449,13 @@ case class CarbonInsertIntoCommand(databaseNameOp:
Option[String],
def insertData(loadParams: CarbonLoadParams): (Seq[Row],
LoadMetadataDetails) = {
var rows = Seq.empty[Row]
+ val loadDataFrame = if (updateModel.isDefined &&
!updateModel.get.loadAsNewSegment) {
+ // TODO: handle the update flow for new insert into flow without
converter step
+ throw new UnsupportedOperationException(
+ "Update flow is not supported without no converter step yet.")
+ } else {
+ Some(dataFrame)
+ }
val table =
loadParams.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
var loadResult : LoadMetadataDetails = null
if (table.isHivePartitionTable) {
@@ -449,9 +466,9 @@ case class CarbonInsertIntoCommand(databaseNameOp:
Option[String],
loadParams.partitionStatus,
isOverwriteTable,
loadParams.hadoopConf,
- None,
+ loadDataFrame,
loadParams.scanResultRDD,
- None,
+ updateModel,
operationContext)
}
(rows, loadResult)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
index bdc3043..5e75c56 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
@@ -26,18 +26,17 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{Job, JobID, TaskAttemptID, TaskID,
TaskType}
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.sql.{AnalysisException,
CarbonDatasourceHadoopRelation, CarbonUtils, Column, DataFrame, Dataset, Row,
SparkSession}
-import
org.apache.spark.sql.carbondata.execution.datasources.SparkCarbonFileFormat
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonUtils, Column,
DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.avro.AvroFileFormatFactory
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
GenericInternalRow, GenericRowWithSchema}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions.{Attribute, EqualTo,
Expression, GenericInternalRow, GenericRowWithSchema}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.command.{DataCommand, ExecutionErrors,
UpdateTableModel}
-import org.apache.spark.sql.execution.command.management.CarbonInsertIntoWithDf
+import
org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand
import org.apache.spark.sql.execution.command.mutation.HorizontalCompaction
-import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.sql.types.{IntegerType, StringType, StructField,
StructType}
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.unsafe.types.UTF8String
@@ -52,6 +51,7 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.OperationContext
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.spark.util.CarbonSparkUtil
@@ -79,15 +79,15 @@ case class CarbonMergeDataSetCommand(
*
*/
override def processData(sparkSession: SparkSession): Seq[Row] = {
- val rltn = CarbonUtils.collectCarbonRelation(targetDsOri.logicalPlan)
+ val relations = CarbonUtils.collectCarbonRelation(targetDsOri.logicalPlan)
// Target dataset must be backed by carbondata table.
- if (rltn.length != 1) {
+ if (relations.length != 1) {
throw new UnsupportedOperationException(
"Carbon table supposed to be present in merge dataset")
}
// validate the merge matches and actions.
validateMergeActions(mergeMatches, targetDsOri, sparkSession)
- val carbonTable = rltn.head.carbonRelation.carbonTable
+ val carbonTable = relations.head.carbonRelation.carbonTable
val hasDelAction = mergeMatches.matchList
.exists(_.getActions.exists(_.isInstanceOf[DeleteAction]))
val hasUpdateAction = mergeMatches.matchList
@@ -106,18 +106,34 @@ case class CarbonMergeDataSetCommand(
// decide join type based on match conditions
val joinType = decideJoinType
+ val joinColumn = mergeMatches.joinExpr.expr.asInstanceOf[EqualTo].left
+ .asInstanceOf[UnresolvedAttribute].nameParts.tail.head
+ // repartition the srsDs, if the target has bucketing and the bucketing
column and join column
+ // are same
+ val repartitionedSrcDs =
+ if (carbonTable.getBucketingInfo != null &&
+ carbonTable.getBucketingInfo
+ .getListOfColumns
+ .asScala
+ .exists(_.getColumnName.equalsIgnoreCase(joinColumn))) {
+ srcDS.repartition(carbonTable.getBucketingInfo.getNumOfRanges,
srcDS.col(joinColumn))
+ } else {
+ srcDS
+ }
// Add the getTupleId() udf to get the tuple id to generate delete delta.
val frame =
targetDs
.withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
expr("getTupleId()"))
.withColumn("exist_on_target", lit(1))
- .join(srcDS.withColumn("exist_on_src", lit(1)), mergeMatches.joinExpr,
joinType)
+ .join(repartitionedSrcDs.withColumn("exist_on_src", lit(1)),
+ mergeMatches.joinExpr,
+ joinType)
.withColumn(status_on_mergeds, condition)
if (LOGGER.isDebugEnabled) {
frame.explain()
}
val tableCols =
-
carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala.map(_.getColumnName).
+ carbonTable.getCreateOrderColumn.asScala.map(_.getColName).
filterNot(_.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE))
val header = tableCols.mkString(",")
@@ -126,19 +142,19 @@ case class CarbonMergeDataSetCommand(
case u: UpdateAction => MergeProjection(tableCols,
status_on_mergeds,
frame,
- rltn.head,
+ relations.head,
sparkSession,
u)
case i: InsertAction => MergeProjection(tableCols,
status_on_mergeds,
frame,
- rltn.head,
+ relations.head,
sparkSession,
i)
case d: DeleteAction => MergeProjection(tableCols,
status_on_mergeds,
frame,
- rltn.head,
+ relations.head,
sparkSession,
d)
case _ => null
@@ -151,7 +167,7 @@ case class CarbonMergeDataSetCommand(
createLongAccumulator("updatedRows"),
createLongAccumulator("deletedRows"))
val targetSchema = StructType(tableCols.map { f =>
- rltn.head.carbonRelation.schema.find(_.name.equalsIgnoreCase(f)).get
+ relations.head.carbonRelation.schema.find(_.name.equalsIgnoreCase(f)).get
} ++ Seq(StructField(status_on_mergeds, IntegerType)))
val (processedRDD, deltaPath) = processIUD(sparkSession, frame,
carbonTable, projections,
targetSchema, stats)
@@ -170,7 +186,7 @@ case class CarbonMergeDataSetCommand(
loadDF.cache()
val count = loadDF.count()
val updateTableModel = if (FileFactory.isFileExist(deltaPath)) {
- val deltaRdd = sparkSession.read.format("carbon").load(deltaPath).rdd
+ val deltaRdd = AvroFileFormatFactory.readAvro(sparkSession, deltaPath)
val tuple = mutationAction.handleAction(deltaRdd, executorErrors, trxMgr)
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(deltaPath))
if (!CarbonUpdateUtil.updateSegmentStatus(tuple._1.asScala.asJava,
@@ -194,29 +210,32 @@ case class CarbonMergeDataSetCommand(
tuple._2.asJava)
}
}
- Some(UpdateTableModel(true, trxMgr.getLatestTrx,
- executorErrors, tuple._2, true))
+ Some(UpdateTableModel(isUpdate = true, trxMgr.getLatestTrx,
+ executorErrors, tuple._2, loadAsNewSegment = true))
} else {
None
}
- CarbonInsertIntoWithDf(
- databaseNameOp = Some(carbonTable.getDatabaseName),
+ val dataFrame = loadDF.select(tableCols.map(col): _*)
+ CarbonInsertIntoCommand(databaseNameOp = Some(carbonTable.getDatabaseName),
tableName = carbonTable.getTableName,
- options = Map("fileheader" -> header, "sort_scope" -> "nosort"),
+ options = Map("fileheader" -> header, "sort_scope" -> "no_sort"),
isOverwriteTable = false,
- dataFrame = loadDF.select(tableCols.map(col): _*),
- updateModel = updateTableModel,
- tableInfoOp = Some(carbonTable.getTableInfo)).process(sparkSession)
-
+ dataFrame.queryExecution.logical,
+ carbonTable.getTableInfo,
+ Map.empty,
+ Map.empty,
+ new OperationContext,
+ updateTableModel
+ ).run(sparkSession)
LOGGER.info(s"Total inserted rows: ${stats.insertedRows.sum}")
LOGGER.info(s"Total updated rows: ${stats.updatedRows.sum}")
LOGGER.info(s"Total deleted rows: ${stats.deletedRows.sum}")
LOGGER.info(
" Time taken to merge data :: " + (System.currentTimeMillis() - st))
- // Load the history table if the insert history table action is added by
user.
- HistoryTableLoadHelper.loadHistoryTable(sparkSession, rltn.head,
carbonTable,
+ // Load the history table if the insert history table action is added by
user.
+ HistoryTableLoadHelper.loadHistoryTable(sparkSession, relations.head,
carbonTable,
trxMgr, mutationAction, mergeMatches)
// Do IUD Compaction.
HorizontalCompaction.tryHorizontalCompaction(
@@ -262,7 +281,7 @@ case class CarbonMergeDataSetCommand(
carbonTable: CarbonTable,
projections: Seq[Seq[MergeProjection]],
targetSchema: StructType,
- stats: Stats) = {
+ stats: Stats): (RDD[InternalRow], String) = {
val frameCols = frame.queryExecution.analyzed.output
val status = frameCols.length - 1
val tupleId = frameCols.zipWithIndex
@@ -275,37 +294,26 @@ case class CarbonMergeDataSetCommand(
job.setOutputValueClass(classOf[InternalRow])
val uuid = UUID.randomUUID.toString
job.setJobID(new JobID(uuid, 0))
- val path = carbonTable.getTablePath + "/" + job.getJobID
+ val path = carbonTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR
+ "avro"
FileOutputFormat.setOutputPath(job, new Path(path))
val schema =
org.apache.spark.sql.types.StructType(Seq(
StructField(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
StringType),
StructField(status_on_mergeds, IntegerType)))
- val factory =
- new SparkCarbonFileFormat().prepareWrite(sparkSession, job,
- Map(), schema)
+ val factory = AvroFileFormatFactory.getAvroWriter(sparkSession, job,
schema)
val config = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext,
job.getConfiguration)
-
(frame.rdd.coalesce(DistributionUtil.getConfiguredExecutors(sparkSession.sparkContext)).
- mapPartitionsWithIndex { case (index, iter) =>
- var directlyWriteDataToHdfs = CarbonProperties.getInstance()
- .getProperty(CarbonLoadOptionConstants
- .ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
CarbonLoadOptionConstants
- .ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH_DEFAULT)
- CarbonProperties.getInstance().addProperty(CarbonLoadOptionConstants
- .ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH, "true")
- val confB = config.value.value
- val task = new TaskID(new JobID(uuid, 0), TaskType.MAP, index)
- val attemptID = new TaskAttemptID(task, index)
- val context = new TaskAttemptContextImpl(confB, attemptID)
- val writer = factory.newInstance(path, schema, context)
- val projLen = projections.length
+ (frame.rdd.mapPartitionsWithIndex { case (index, iter) =>
+ val confB = config.value.value
+ val task = new TaskID(new JobID(uuid, 0), TaskType.MAP, index)
+ val attemptID = new TaskAttemptID(task, index)
+ val context = new TaskAttemptContextImpl(confB, attemptID)
+ val writer = factory.newInstance(path +
CarbonCommonConstants.FILE_SEPARATOR + task.toString,
+ schema, context)
+ val projLen = projections.length
new Iterator[InternalRow] {
val queue = new util.LinkedList[InternalRow]()
override def hasNext: Boolean = if (!queue.isEmpty || iter.hasNext)
true else {
writer.close()
- // revert load direct write to store path after insert
-
CarbonProperties.getInstance().addProperty(CarbonLoadOptionConstants
- .ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
directlyWriteDataToHdfs)
false
}
@@ -365,7 +373,8 @@ case class CarbonMergeDataSetCommand(
private def createLongAccumulator(name: String) = {
val acc = new LongAccumulator
acc.setValue(0)
- acc.metadata = AccumulatorMetadata(AccumulatorContext.newId(), Some(name),
false)
+ acc.metadata = AccumulatorMetadata(AccumulatorContext.newId(), Some(name),
countFailedValues
+ = false)
AccumulatorContext.register(acc)
acc
}
@@ -541,7 +550,7 @@ case class CarbonMergeDataSetCommand(
}.filter(_ != null)
}
- private def getInsertHistoryStatus(mergeMatches: MergeDataSetMatches) = {
+ private def getInsertHistoryStatus(mergeMatches: MergeDataSetMatches):
(Boolean, Boolean) = {
val insertHistOfUpdate = mergeMatches.matchList.exists(p =>
p.getActions.exists(_.isInstanceOf[InsertInHistoryTableAction])
&& p.getActions.exists(_.isInstanceOf[UpdateAction]))
@@ -577,7 +586,7 @@ case class CarbonMergeDataSetCommand(
"Not all source columns are mapped for insert action " +
value.insertMap)
}
value.insertMap.foreach { case (k, v) =>
- selectAttributes(v.expr, existingDs, sparkSession, true)
+ selectAttributes(v.expr, existingDs, sparkSession, throwError = true)
}
}
}
diff --git
a/integration/spark/src/main/spark2.3/com/databricks/spark/avro/AvroWriter.scala
b/integration/spark/src/main/spark2.3/com/databricks/spark/avro/AvroWriter.scala
new file mode 100644
index 0000000..8de31be
--- /dev/null
+++
b/integration/spark/src/main/spark2.3/com/databricks/spark/avro/AvroWriter.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.databricks.spark.avro
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.datasources.OutputWriterFactory
+
+/**
+ * This class is to get the avro writer from databricks avro module, as its
not present in spark2.3
+ * and spark-avro module is included in spark project from spark-2.4. So for
spark-2.4, we use Avro
+ * writer from spark project.
+ */
+object AvroWriter {
+
+ def getWriter(spark: org.apache.spark.sql.SparkSession,
+ job: org.apache.hadoop.mapreduce.Job,
+ dataSchema: org.apache.spark.sql.types.StructType,
+ options: scala.Predef.Map[scala.Predef.String, scala.Predef.String] =
Map.empty)
+ : OutputWriterFactory = {
+ new DefaultSource().prepareWrite(spark, job,
+ options, dataSchema)
+ }
+}
+
+/**
+ * This reads the avro files from the given path and return the RDD[Row]
+ */
+object AvroReader {
+
+ def readAvro(spark: org.apache.spark.sql.SparkSession, deltaPath: String):
RDD[Row] = {
+ spark.sparkContext
+ .hadoopConfiguration
+ .set("avro.mapred.ignore.inputs.without.extension", "false")
+ spark.read.avro(deltaPath).rdd
+ }
+}
diff --git
a/integration/spark/src/main/spark2.3/org/apache/spark/sql/avro/AvroFileFormatFactory.scala
b/integration/spark/src/main/spark2.3/org/apache/spark/sql/avro/AvroFileFormatFactory.scala
new file mode 100644
index 0000000..616f052
--- /dev/null
+++
b/integration/spark/src/main/spark2.3/org/apache/spark/sql/avro/AvroFileFormatFactory.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.avro
+
+import com.databricks.spark.avro.{AvroReader, AvroWriter}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.datasources.OutputWriterFactory
+
+object AvroFileFormatFactory {
+
+ /**
+ * return the avro writer to write the avro files
+ * @return avro writer
+ */
+ def getAvroWriter(spark: org.apache.spark.sql.SparkSession,
+ job: org.apache.hadoop.mapreduce.Job,
+ dataSchema: org.apache.spark.sql.types.StructType,
+ options: scala.Predef.Map[scala.Predef.String, scala.Predef.String] =
Map.empty)
+ : OutputWriterFactory = {
+ AvroWriter.getWriter(spark, job, dataSchema, options)
+ }
+
+ /**
+ * Reads the avro files present at the given path
+ * @param deltaPath path to read the avro files from.
+ * @return RDD[Row]
+ */
+ def readAvro(spark: org.apache.spark.sql.SparkSession, deltaPath: String):
RDD[Row] = {
+ spark.sparkContext
+ .hadoopConfiguration
+ .set("avro.mapred.ignore.inputs.without.extension", "false")
+ AvroReader.readAvro(spark, deltaPath)
+ }
+}
diff --git
a/integration/spark/src/main/spark2.4/org/apache/spark/sql/avro/AvroFileFormatFactory.scala
b/integration/spark/src/main/spark2.4/org/apache/spark/sql/avro/AvroFileFormatFactory.scala
new file mode 100644
index 0000000..7fcd5dc
--- /dev/null
+++
b/integration/spark/src/main/spark2.4/org/apache/spark/sql/avro/AvroFileFormatFactory.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.avro
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.datasources.OutputWriterFactory
+
+object AvroFileFormatFactory {
+
+ /**
+ * return the avro writer to write the avro files
+ * @return avro writer
+ */
+ def getAvroWriter(spark: org.apache.spark.sql.SparkSession,
+ job: org.apache.hadoop.mapreduce.Job,
+ dataSchema: org.apache.spark.sql.types.StructType,
+ options: scala.Predef.Map[scala.Predef.String, scala.Predef.String] =
Map.empty)
+ : OutputWriterFactory = {
+ new AvroFileFormat().prepareWrite(spark, job, options, dataSchema)
+ }
+
+ /**
+ * Reads the avro files present at the given path
+ * @param deltaPath path to read the avro files from.
+ * @return RDD[Row]
+ */
+ def readAvro(spark: org.apache.spark.sql.SparkSession, deltaPath: String):
RDD[Row] = {
+ spark.sparkContext
+ .hadoopConfiguration
+ .set("avro.mapred.ignore.inputs.without.extension", "false")
+ spark.read.format("avro").load(s"$deltaPath").rdd
+ }
+}
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
index 916846a..56b98a3 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
@@ -91,6 +91,11 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll
{
(dwSelframe, odsframe)
}
+ private def initializeWithBucketing = {
+ sql("create table order(id string, name string, c_name string, quantity
int, price int, state int) stored as carbondata
tblproperties('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='id')")
+ initialize
+ }
+
private def initializeGloabalSort = {
val initframe = generateData(10)
initframe.write
@@ -821,11 +826,14 @@ class MergeTestCase extends QueryTest with
BeforeAndAfterAll {
dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched().
insertExpr(insertMap).execute()
val sdf = new SimpleDateFormat("yyyy-MM-dd")
+ // in case of cdc, the insert into flow goes to no converter step to save
time as incoming data
+ // from source will be correct, we wont use the table level timestamp
format or load level for
+ // the insert into of cdc data.
checkAnswer(
sql("select date,time from order where id = 'id1'"),
Seq(
Row(new Date(sdf.parse("2015-07-23").getTime),
Timestamp.valueOf("2015-03-03 12:25:00")),
- Row(new Date(sdf.parse("2015-07-23").getTime),
Timestamp.valueOf("2015-05-23 10:30:00"))
+ Row(new Date(sdf.parse("2015-07-23").getTime),
Timestamp.valueOf("2015-05-23 10:30:30"))
))
checkAnswer(
sql("select date,time from order where id = 'id11'"),
@@ -834,6 +842,36 @@ class MergeTestCase extends QueryTest with
BeforeAndAfterAll {
))
}
+ test("test merge update and insert with condition and expression and delete
action with target table as bucketing") {
+ sql("drop table if exists order")
+ val (dwSelframe, odsframe) = initializeWithBucketing
+
+ var matches = Seq.empty[MergeMatch]
+ val updateMap = Map(col("id") -> col("A.id"),
+ col("price") -> expr("B.price + 1"),
+ col("state") -> col("B.state"))
+
+ val insertMap = Map(col("id") -> col("B.id"),
+ col("name") -> col("B.name"),
+ col("c_name") -> col("B.c_name"),
+ col("quantity") -> col("B.quantity"),
+ col("price") -> expr("B.price * 100"),
+ col("state") -> col("B.state"))
+
+ matches ++= Seq(WhenMatched(Some(col("A.state") =!=
col("B.state"))).addAction(UpdateAction(updateMap)))
+ matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
+ matches ++=
Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()))
+
+ CarbonMergeDataSetCommand(dwSelframe,
+ odsframe,
+ MergeDataSetMatches(col("A.id").equalTo(col("B.id")),
matches.toList)).run(sqlContext.sparkSession)
+ assert(getDeleteDeltaFileCount("order", "0") == 2)
+ checkAnswer(sql("select count(*) from order where id like 'newid%'"),
Seq(Row(2)))
+ checkAnswer(sql("select count(*) from order"), Seq(Row(10)))
+ checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+ checkAnswer(sql("select price from order where id = 'newid1'"),
Seq(Row(7500)))
+ }
+
case class Target (id: Int, value: String, remark: String, mdt: String)
case class Change (id: Int, value: String, change_type: String, mdt: String)
private val numInitialRows = 10