ajantha-bhat commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r676327406



##########
File path: 
core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java
##########
@@ -330,6 +339,11 @@ public void readFields(DataInput in) throws IOException {
         missingSISegments.add(in.readUTF());
       }
     }
+    this.isCDCJob = in.readBoolean();

Review comment:
       Will there be any compatibility issue if the index server has older 
version of this object (IndexInputFormat) in cache and reading it 
(deserializing) can give EOF exception as Boolean field was never written in 
the old version?

##########
File path: 
core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
##########
@@ -163,20 +171,36 @@ public void setColumnSchema(List<ColumnSchema> 
columnSchema) {
    * Method to serialize extended blocklet and input split for index server
    * DataFormat
    * <Extended Blocklet data><Carbon input split serializeData 
length><CarbonInputSplitData>
-   * @param out
-   * @param uniqueLocation
+   * @param out data output to write the primitives to extended blocklet
+   * @param uniqueLocation location to write the blocklet in case of 
distributed pruning, ex: Lucene
+   * @param isExternalPath identification for the externam segment
    * @throws IOException
    */
-  public void serializeData(DataOutput out, Map<String, Short> uniqueLocation, 
boolean isCountJob,
-      boolean isExternalPath)
+  public void serializeData(DataOutput out, Map<String, Short> uniqueLocation,
+      IndexInputFormat indexInputFormat, boolean isExternalPath)
       throws IOException {
     super.write(out);
-    if (isCountJob) {
+    if (indexInputFormat.isCountStarJob()) {
       // In CarbonInputSplit, getDetailInfo() is a lazy call. we want to avoid 
this during
       // countStar query. As rowCount is filled inside getDetailInfo(). In 
countStar case we may
       // not have proper row count. So, always take row count from indexRow.
       
out.writeLong(inputSplit.getIndexRow().getInt(BlockletIndexRowIndexes.ROW_COUNT_INDEX));
       out.writeUTF(inputSplit.getSegmentId());
+    } else if (indexInputFormat.getCdcVO() != null) {
+      // In case of CDC, we ust need the filepath and the min max of the 
blocklet, so just serialize

Review comment:
       ```suggestion
         // In case of CDC, we just need the filepath and the min max of the 
blocklet, so just serialize
   ```

##########
File path: core/src/main/java/org/apache/carbondata/core/range/MinMaxNode.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.range;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Each node to be inserted in BlockMinMaxTree for pruning.
+ */
+public class MinMaxNode implements Serializable {
+
+  // list of files present in same range of min max of this node
+  private List<String> filePaths = new ArrayList<>();
+
+  private Object min;
+
+  private Object max;
+
+  private MinMaxNode leftSubTree;

Review comment:
       `leftSubTree` itself has `min` and `max` stored inside, what again 
storing it in parent node ?

##########
File path: core/src/main/java/org/apache/carbondata/core/mutate/CdcVO.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.mutate;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * VO object which contains the info used in CDC case during cache loading in 
Index server
+ */
+public class CdcVO implements Serializable, Writable {
+
+  /**
+   * This collection contains column to index mapping which give info about 
the index for a column
+   * in IndexRow object to fetch min max
+   */
+  private Map<String, Integer> columnToIndexMap;
+
+  private List<Integer> indexesToFetch;

Review comment:
       so, when `indexesToFetch` is filled by deserialization, 
`columnToIndexMap` will be null ?
   
   If any point of time, if both columnToIndexMap and indexesToFetch is filled, 
then maybe storing once in columnToIndexMap is enough ?

##########
File path: 
core/src/main/java/org/apache/carbondata/core/scan/filter/executer/CDCBlockImplicitExecutorImpl.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.filter.executer;
+
+import java.util.BitSet;
+import java.util.Set;
+
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
+import org.apache.carbondata.core.util.BitSetGroup;
+
+/**
+ * This filter executor class will be called when the CDC pruning is enabled.
+ */
+public class CDCBlockImplicitExecutorImpl implements FilterExecutor, 
ImplicitColumnFilterExecutor {
+
+  private final Set<String> blocksToScan;
+
+  public CDCBlockImplicitExecutorImpl(Set<String> blocksToScan) {
+    this.blocksToScan = blocksToScan;
+  }
+
+  @Override
+  public BitSet isFilterValuesPresentInBlockOrBlocklet(byte[][] maxValue, 
byte[][] minValue,
+      String uniqueBlockPath, boolean[] isMinMaxSet) {
+    boolean isScanRequired = false;

Review comment:
       no need for this variable, directly set the bitmap if the path is in the 
set and return.

##########
File path: 
core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/CDCBlockImplicitExpression.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.expression.conditional;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.ExpressionResult;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+
+/**
+ * This expression will be added to Index filter when CDC pruning is enabled.
+ */
+public class CDCBlockImplicitExpression extends Expression {
+
+  Set<String> blocksToScan;
+
+  public CDCBlockImplicitExpression(String blockPathValues) {
+    blocksToScan =
+        
Arrays.stream(blockPathValues.split(",")).map(String::trim).collect(Collectors.toSet());
+  }
+
+  @Override
+  public ExpressionResult evaluate(RowIntf value) {
+    throw new UnsupportedOperationException("Not allowed on Implicit 
expression");
+  }
+
+  @Override
+  public ExpressionType getFilterExpressionType() {
+    return ExpressionType.IMPLICIT;
+  }
+
+  @Override
+  public void findAndSetChild(Expression oldExpr, Expression newExpr) {
+

Review comment:
       better to throw unsupported operations ?

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
##########
@@ -79,113 +87,514 @@ case class CarbonMergeDataSetCommand(
    */
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val relations = 
CarbonSparkUtil.collectCarbonRelation(targetDsOri.logicalPlan)
-    // Target dataset must be backed by carbondata table.
+    val st = System.currentTimeMillis()
+    val targetDsAliasName = targetDsOri.logicalPlan match {
+      case alias: SubqueryAlias =>
+        alias.alias
+      case _ => null
+    }
+    val sourceAliasName = srcDS.logicalPlan match {
+      case alias: SubqueryAlias =>
+        alias.alias
+      case _ => null
+    }
     if (relations.length != 1) {
       throw new UnsupportedOperationException(
         "Carbon table supposed to be present in merge dataset")
     }
-    // validate the merge matches and actions.
-    validateMergeActions(mergeMatches, targetDsOri, sparkSession)
-    val carbonTable = relations.head.carbonRelation.carbonTable
-    val hasDelAction = mergeMatches.matchList
-      .exists(_.getActions.exists(_.isInstanceOf[DeleteAction]))
-    val hasUpdateAction = mergeMatches.matchList
-      .exists(_.getActions.exists(_.isInstanceOf[UpdateAction]))
-    val (insertHistOfUpdate, insertHistOfDelete) = 
getInsertHistoryStatus(mergeMatches)
-    // Get all the required columns of targetDS by going through all match 
conditions and actions.
-    val columns = getSelectExpressionsOnExistingDF(targetDsOri, mergeMatches, 
sparkSession)
+    // Target dataset must be backed by carbondata table.
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
     // select only the required columns, it can avoid lot of and shuffling.
-    val targetDs = targetDsOri.select(columns: _*)
-    // Update the update mapping with unfilled columns.From here on system 
assumes all mappings
-    // are existed.
-    mergeMatches = updateMappingIfNotExists(mergeMatches, targetDs)
-    // Lets generate all conditions combinations as one column and add them as 
'status'.
-    val condition = generateStatusColumnWithAllCombinations(mergeMatches)
-
-    // decide join type based on match conditions
-    val joinType = decideJoinType
+    val targetDs = if (mergeMatches == null && operationType != null) {
+      targetDsOri.select(keyColumn)
+    } else {
+      // Get all the required columns of targetDS by going through all match 
conditions and actions.
+      val columns = getSelectExpressionsOnExistingDF(targetDsOri, 
mergeMatches, sparkSession)
+      targetDsOri.select(columns: _*)
+    }
+    // decide join type based on match conditions or based on merge operation 
type
+    val joinType = if (mergeMatches == null && operationType != null) {
+      MergeOperationType.withName(operationType.toUpperCase) match {
+        case MergeOperationType.UPDATE | MergeOperationType.DELETE =>
+          "inner"
+        case MergeOperationType.UPSERT =>
+          "right_outer"
+        case MergeOperationType.INSERT =>
+          null
+      }
+    } else {
+      decideJoinType
+    }
 
-    val joinColumns = mergeMatches.joinExpr.expr.collect {
-      case unresolvedAttribute: UnresolvedAttribute if 
unresolvedAttribute.nameParts.nonEmpty =>
-        // Let's say the join condition will be something like A.id = B.id, 
then it will be an
-        // EqualTo expression, with left expression as 
UnresolvedAttribute(A.id) and right will
-        // be a Literal(B.id). Since we need the column name here, we can 
directly check the left
-        // which is UnresolvedAttribute. We take nameparts from 
UnresolvedAttribute which is an
-        // ArrayBuffer containing "A" and "id", since "id" is column name, we 
take
-        // nameparts.tail.head which gives us "id" column name.
-        unresolvedAttribute.nameParts.tail.head
-    }.distinct
+    val joinColumns = if (mergeMatches == null) {
+      Seq(keyColumn)
+    } else {
+      mergeMatches.joinExpr.expr.collect {
+        case unresolvedAttribute: UnresolvedAttribute if 
unresolvedAttribute.nameParts.nonEmpty =>
+          // Let's say the join condition will be something like A.id = B.id, 
then it will be an
+          // EqualTo expression, with left expression as 
UnresolvedAttribute(A.id) and right will
+          // be a Literal(B.id). Since we need the column name here, we can 
directly check the left
+          // which is UnresolvedAttribute. We take nameparts from 
UnresolvedAttribute which is an
+          // ArrayBuffer containing "A" and "id", since "id" is column name, 
we take
+          // nameparts.tail.head which gives us "id" column name.
+          unresolvedAttribute.nameParts.tail.head
+      }.distinct
+    }
 
     // repartition the srsDs, if the target has bucketing and the bucketing 
columns contains join
     // columns
     val repartitionedSrcDs =
-      if (carbonTable.getBucketingInfo != null &&
-          carbonTable.getBucketingInfo
+      if (targetCarbonTable.getBucketingInfo != null &&
+          targetCarbonTable.getBucketingInfo
             .getListOfColumns
             .asScala
             .map(_.getColumnName).containsSlice(joinColumns)) {
-        srcDS.repartition(carbonTable.getBucketingInfo.getNumOfRanges,
+        srcDS.repartition(targetCarbonTable.getBucketingInfo.getNumOfRanges,
           joinColumns.map(srcDS.col): _*)
       } else {
       srcDS
+      }
+
+    // cache the source data as we will be scanning multiple times
+    repartitionedSrcDs.cache()

Review comment:
       can you break this function? currently, it is too long and hard to 
maintain.

##########
File path: 
core/src/main/java/org/apache/carbondata/core/range/BlockMinMaxTree.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.range;
+
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.Set;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
+
+import static 
org.apache.carbondata.core.constants.CarbonCommonConstants.DEFAULT_CHARSET;
+
+/**
+ * This class prepares a tree for pruning using min-max of block
+ */
+public class BlockMinMaxTree implements Serializable {
+
+  private MinMaxNode root;
+
+  private final boolean isPrimitiveAndNotDate;
+  private final boolean isDimensionColumn;
+  private final DataType joinDataType;
+  private final SerializableComparator comparator;
+
+  public BlockMinMaxTree(boolean isPrimitiveAndNotDate, boolean 
isDimensionColumn,
+      DataType joinDataType, SerializableComparator comparator) {
+    this.isPrimitiveAndNotDate = isPrimitiveAndNotDate;
+    this.isDimensionColumn = isDimensionColumn;
+    this.joinDataType = joinDataType;
+    this.comparator = comparator;
+  }
+
+  public MinMaxNode getRoot() {
+    return root;
+  }
+
+  public void insert(MinMaxNode newMinMaxNode) {
+    root = insert(getRoot(), newMinMaxNode);
+  }
+
+  private MinMaxNode insert(MinMaxNode root, MinMaxNode newMinMaxNode) {
+    /* 1. check if the root null, then insert and make new node
+     * 2. check if the new node completely overlaps with the root, where 
minCompare and maxCompare
+     * both are zero, if yes add the filepaths and return
+     * 3. if root is less than new node, check if the root has right subtree,
+     *    if(yes) {
+     *       replace the right node with the newnode's min and max based on 
comparison and then
+     *        call insert with right node as new root and newnode
+     *         insert(root.getRight, newnode)
+     *     } else {
+     *       make the new node as right node and set right node and return
+     *     }
+     * 4. if root is more than new node, check if the root has left subtree,
+     *    if(yes) {
+     *       replace the left node with the newnode's min and max based on 
comparison and then
+     *        call insert with left node as new root and newnode
+     *         insert(root.getLeft, newnode)
+     *     } else {
+     *       make the new node as left node and set left node and return
+     *     }
+     * */
+    if (root == null) {
+      root = newMinMaxNode;
+      return root;
+    }
+
+    if (compareNodesBasedOnMinMax(root, newMinMaxNode) == 0) {
+      root.addFilePats(newMinMaxNode.getFilePaths());
+      return root;
+    }
+
+    if (compareNodesBasedOnMinMax(root, newMinMaxNode) < 0) {
+      if (root.getRightSubTree() == null) {
+        root.setRightSubTree(newMinMaxNode);
+        root.setRightSubTreeMax(newMinMaxNode.getMax());
+        root.setRightSubTreeMin(newMinMaxNode.getMin());
+      } else {
+        if (compareMinMax(root.getRightSubTreeMax(), newMinMaxNode.getMax()) < 
0) {
+          root.setRightSubTreeMax(newMinMaxNode.getMax());
+        }
+        if (compareMinMax(root.getRightSubTreeMin(), newMinMaxNode.getMin()) > 
0) {
+          root.setRightSubTreeMin(newMinMaxNode.getMin());
+        }
+        insert(root.getRightSubTree(), newMinMaxNode);
+      }
+    } else {
+      if (root.getLeftSubTree() == null) {
+        root.setLeftSubTree(newMinMaxNode);
+        root.setLeftSubTreeMax(newMinMaxNode.getMax());
+        root.setLeftSubTreeMin(newMinMaxNode.getMin());
+      } else {
+        if (compareMinMax(root.getLeftSubTreeMax(), newMinMaxNode.getMax()) < 
0) {
+          root.setLeftSubTreeMax(newMinMaxNode.getMax());
+        }
+        if (compareMinMax(root.getLeftSubTreeMin(), newMinMaxNode.getMin()) > 
0) {
+          root.setLeftSubTreeMin(newMinMaxNode.getMin());
+        }
+        insert(root.getLeftSubTree(), newMinMaxNode);
+      }
+    }
+    return root;
+  }
+
+  private int compareNodesBasedOnMinMax(MinMaxNode root, MinMaxNode 
newMinMaxNode) {
+    int minCompare = compareMinMax(root.getMin(), newMinMaxNode.getMin());
+    int maxCompare = compareMinMax(root.getMax(), newMinMaxNode.getMax());
+    if (minCompare == 0) {
+      return maxCompare;
+    } else {
+      return minCompare;
+    }
+  }
+
+  private int compareMinMax(Object key1, Object key2) {
+    if (isDimensionColumn) {
+      if (isPrimitiveAndNotDate) {
+        return comparator.compare(key1, key2);
+      } else {
+        return ByteUtil.UnsafeComparer.INSTANCE

Review comment:
       why date also using unsafe byte array comparator?
   
   Also if string use unsafe comparator , if not, use provided comparator. can 
combine below else and above if. 

##########
File path: 
core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
##########
@@ -207,19 +231,35 @@ public void serializeData(DataOutput out, Map<String, 
Short> uniqueLocation, boo
 
   /**
    * Method to deserialize extended blocklet and input split for index server
-   * @param in
-   * @param locations
-   * @param tablePath
+   * @param in data input stream to read the primitives of extended blocklet
+   * @param locations locations of the input split
+   * @param tablePath carbon table path
    * @throws IOException
    */
   public void deserializeFields(DataInput in, String[] locations, String 
tablePath,
-      boolean isCountJob)
+      boolean isCountJob, CdcVO cdcVO)
       throws IOException {
     super.readFields(in);
     if (isCountJob) {
       count = in.readLong();
       segmentNo = in.readUTF();
       return;
+    } else if (cdcVO != null) {
+      filePath = in.readUTF();
+      this.columnToMinMaxMapping = new HashMap<>();
+      for (Map.Entry<String, Integer> entry : 
cdcVO.getColumnToIndexMap().entrySet()) {

Review comment:
       As you just need keys, please use `cdcVO.getColumnToIndexMap().keySet()`
   

##########
File path: 
examples/spark/src/main/scala/org/apache/carbondata/examples/DataUPSERTExample.scala
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.examples
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{Row, SaveMode, SparkSession}
+import org.apache.spark.sql.CarbonSession.DataSetMerge
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+import org.apache.carbondata.examples.util.ExampleUtils
+
+/**
+ * Example for UPSERT APIs
+ */
+object UPSERTExample {
+
+  def main(args: Array[String]): Unit = {
+    val spark = ExampleUtils.createSparkSession("DataUPSERTExample")
+    performUPSERT(spark)
+  }
+
+  def performUPSERT(spark: SparkSession): Unit = {
+    spark.sql("drop table if exists target")
+    val initframe = spark.createDataFrame(Seq(
+      Row("a", "0"),
+      Row("b", "1"),
+      Row("c", "2"),
+      Row("d", "3")
+    ).asJava, StructType(Seq(StructField("key", StringType), 
StructField("value", StringType))))
+    initframe.write
+      .format("carbondata")
+      .option("tableName", "target")
+      .mode(SaveMode.Overwrite)
+      .save()
+    val target = spark.read.format("carbondata").option("tableName", 
"target").load()
+    var cdc =
+      spark.createDataFrame(Seq(
+        Row("a", "7"),
+        Row("b", null),
+        Row("g", null),
+        Row("e", "3")
+      ).asJava,
+        StructType(Seq(StructField("key", StringType),
+          StructField("value", StringType))))
+    spark.sql("select * from target").show(false)
+    // upsert API updates a and b, inserts e and g
+    target.as("A").merge(cdc.as("B"), "key", "upsert").execute()

Review comment:
       Merge is supported by SQL also, you can cover that also in the testcase




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to