This is an automated email from the ASF dual-hosted git repository.

yufei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new f5346cacf1 Spark 3.4: Output the net changes across snapshots for 
carryover rows in CDC (#7950)
f5346cacf1 is described below

commit f5346cacf17d63257ab433215e1661ed221750d0
Author: Yufei Gu <[email protected]>
AuthorDate: Fri Jun 30 23:36:09 2023 -0700

    Spark 3.4: Output the net changes across snapshots for carryover rows in 
CDC (#7950)
---
 .../TestCreateChangelogViewProcedure.java          |  91 ++++++--
 .../apache/iceberg/spark/ChangelogIterator.java    |  42 ++++
 .../iceberg/spark/ComputeUpdateIterator.java       |   8 +-
 .../iceberg/spark/RemoveCarryoverIterator.java     |  33 +--
 .../iceberg/spark/RemoveNetCarryoverIterator.java  | 129 ++++++++++++
 .../procedures/CreateChangelogViewProcedure.java   |  69 ++++--
 .../iceberg/spark/TestChangelogIterator.java       | 231 +++++++++++++--------
 7 files changed, 456 insertions(+), 147 deletions(-)

diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
index dc12b0145d..0a4a1073c3 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.spark.extensions;
 
+import static org.junit.Assert.assertThrows;
+
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.ChangelogOperation;
@@ -45,13 +47,13 @@ public class TestCreateChangelogViewProcedure extends 
SparkExtensionsTestBase {
     sql("DROP TABLE IF EXISTS %s", tableName);
   }
 
-  public void createTableWith2Columns() {
+  public void createTableWithTwoColumns() {
     sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
     sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, 
1);
     sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName);
   }
 
-  private void createTableWith3Columns() {
+  private void createTableWithThreeColumns() {
     sql("CREATE TABLE %s (id INT, data STRING, age INT) USING iceberg", 
tableName);
     sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, 
1);
     sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName);
@@ -65,7 +67,7 @@ public class TestCreateChangelogViewProcedure extends 
SparkExtensionsTestBase {
 
   @Test
   public void testCustomizedViewName() {
-    createTableWith2Columns();
+    createTableWithTwoColumns();
     sql("INSERT INTO %s VALUES (1, 'a')", tableName);
     sql("INSERT INTO %s VALUES (2, 'b')", tableName);
 
@@ -98,7 +100,7 @@ public class TestCreateChangelogViewProcedure extends 
SparkExtensionsTestBase {
 
   @Test
   public void testNoSnapshotIdInput() {
-    createTableWith2Columns();
+    createTableWithTwoColumns();
     sql("INSERT INTO %s VALUES (1, 'a')", tableName);
     Table table = validationCatalog.loadTable(tableIdent);
     Snapshot snap0 = table.currentSnapshot();
@@ -129,7 +131,7 @@ public class TestCreateChangelogViewProcedure extends 
SparkExtensionsTestBase {
 
   @Test
   public void testTimestampsBasedQuery() {
-    createTableWith2Columns();
+    createTableWithTwoColumns();
     long beginning = System.currentTimeMillis();
 
     sql("INSERT INTO %s VALUES (1, 'a')", tableName);
@@ -189,7 +191,7 @@ public class TestCreateChangelogViewProcedure extends 
SparkExtensionsTestBase {
 
   @Test
   public void testWithCarryovers() {
-    createTableWith2Columns();
+    createTableWithTwoColumns();
     sql("INSERT INTO %s VALUES (1, 'a')", tableName);
     Table table = validationCatalog.loadTable(tableIdent);
     Snapshot snap0 = table.currentSnapshot();
@@ -224,7 +226,7 @@ public class TestCreateChangelogViewProcedure extends 
SparkExtensionsTestBase {
 
   @Test
   public void testUpdate() {
-    createTableWith2Columns();
+    createTableWithTwoColumns();
     sql("ALTER TABLE %s DROP PARTITION FIELD data", tableName);
     sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName);
 
@@ -283,7 +285,7 @@ public class TestCreateChangelogViewProcedure extends 
SparkExtensionsTestBase {
 
   @Test
   public void testUpdateWithFilter() {
-    createTableWith2Columns();
+    createTableWithTwoColumns();
     sql("ALTER TABLE %s DROP PARTITION FIELD data", tableName);
     sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName);
 
@@ -315,7 +317,7 @@ public class TestCreateChangelogViewProcedure extends 
SparkExtensionsTestBase {
 
   @Test
   public void testUpdateWithMultipleIdentifierColumns() {
-    createTableWith3Columns();
+    createTableWithThreeColumns();
 
     sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11)", tableName);
     Table table = validationCatalog.loadTable(tableIdent);
@@ -347,7 +349,7 @@ public class TestCreateChangelogViewProcedure extends 
SparkExtensionsTestBase {
 
   @Test
   public void testRemoveCarryOvers() {
-    createTableWith3Columns();
+    createTableWithThreeColumns();
 
     sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", 
tableName);
     Table table = validationCatalog.loadTable(tableIdent);
@@ -381,7 +383,7 @@ public class TestCreateChangelogViewProcedure extends 
SparkExtensionsTestBase {
 
   @Test
   public void testRemoveCarryOversWithoutUpdatedRows() {
-    createTableWith3Columns();
+    createTableWithThreeColumns();
 
     sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", 
tableName);
     Table table = validationCatalog.loadTable(tableIdent);
@@ -411,9 +413,74 @@ public class TestCreateChangelogViewProcedure extends 
SparkExtensionsTestBase {
         sql("select * from %s order by _change_ordinal, id, data", viewName));
   }
 
+  @Test
+  public void testNetChangesWithRemoveCarryOvers() {
+    // partitioned by id
+    createTableWithThreeColumns();
+
+    // insert rows: (1, 'a', 12) (2, 'b', 11) (2, 'e', 12)
+    sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", 
tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    // delete rows: (2, 'b', 11) (2, 'e', 12)
+    // insert rows: (3, 'c', 13) (2, 'd', 11) (2, 'e', 12)
+    sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", 
tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    // delete rows: (2, 'd', 11) (2, 'e', 12) (3, 'c', 13)
+    // insert rows: (3, 'c', 15) (2, 'e', 12)
+    sql("INSERT OVERWRITE %s VALUES (3, 'c', 15), (2, 'e', 12)", tableName);
+    table.refresh();
+    Snapshot snap3 = table.currentSnapshot();
+
+    // test with all snapshots
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view(table => '%s', net_changes 
=> true)",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
+            row(3, "c", 15, INSERT, 2, snap3.snapshotId()),
+            row(2, "e", 12, INSERT, 2, snap3.snapshotId())),
+        sql("select * from %s order by _change_ordinal, data", viewName));
+
+    // test with snap2 and snap3
+    sql(
+        "CALL %s.system.create_changelog_view(table => '%s', "
+            + "options => map('start-snapshot-id','%s'), "
+            + "net_changes => true)",
+        catalogName, tableName, snap1.snapshotId());
+
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(2, "b", 11, DELETE, 0, snap2.snapshotId()),
+            row(3, "c", 15, INSERT, 1, snap3.snapshotId())),
+        sql("select * from %s order by _change_ordinal, data", viewName));
+  }
+
+  @Test
+  public void testNetChangesWithComputeUpdates() {
+    createTableWithTwoColumns();
+    assertThrows(
+        "Should fail because net_changes is not supported with computing 
updates",
+        IllegalArgumentException.class,
+        () ->
+            sql(
+                "CALL %s.system.create_changelog_view(table => '%s', 
identifier_columns => array('id'), net_changes => true)",
+                catalogName, tableName));
+  }
+
   @Test
   public void testNotRemoveCarryOvers() {
-    createTableWith3Columns();
+    createTableWithThreeColumns();
 
     sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", 
tableName);
     Table table = validationCatalog.loadTable(tableIdent);
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java
index 5f30c5fd4e..cc44b1f399 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java
@@ -20,8 +20,10 @@ package org.apache.iceberg.spark;
 
 import java.util.Iterator;
 import java.util.Objects;
+import java.util.Set;
 import org.apache.iceberg.ChangelogOperation;
 import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.types.StructType;
@@ -35,9 +37,11 @@ public abstract class ChangelogIterator implements 
Iterator<Row> {
 
   private final Iterator<Row> rowIterator;
   private final int changeTypeIndex;
+  private final StructType rowType;
 
   protected ChangelogIterator(Iterator<Row> rowIterator, StructType rowType) {
     this.rowIterator = rowIterator;
+    this.rowType = rowType;
     this.changeTypeIndex = 
rowType.fieldIndex(MetadataColumns.CHANGE_TYPE.name());
   }
 
@@ -45,6 +49,16 @@ public abstract class ChangelogIterator implements 
Iterator<Row> {
     return changeTypeIndex;
   }
 
+  protected StructType rowType() {
+    return rowType;
+  }
+
+  protected String changeType(Row row) {
+    String changeType = row.getString(changeTypeIndex());
+    Preconditions.checkNotNull(changeType, "Change type should not be null");
+    return changeType;
+  }
+
   protected Iterator<Row> rowIterator() {
     return rowIterator;
   }
@@ -79,7 +93,35 @@ public abstract class ChangelogIterator implements 
Iterator<Row> {
     return Iterators.filter(changelogIterator, Objects::nonNull);
   }
 
+  public static Iterator<Row> removeNetCarryovers(Iterator<Row> rowIterator, 
StructType rowType) {
+    ChangelogIterator changelogIterator = new 
RemoveNetCarryoverIterator(rowIterator, rowType);
+    return Iterators.filter(changelogIterator, Objects::nonNull);
+  }
+
+  protected boolean isSameRecord(Row currentRow, Row nextRow, int[] 
indicesToIdentifySameRow) {
+    for (int idx : indicesToIdentifySameRow) {
+      if (isDifferentValue(currentRow, nextRow, idx)) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
   protected boolean isDifferentValue(Row currentRow, Row nextRow, int idx) {
     return !Objects.equals(nextRow.get(idx), currentRow.get(idx));
   }
+
+  protected static int[] generateIndicesToIdentifySameRow(
+      int totalColumnCount, Set<Integer> metadataColumnIndices) {
+    int[] indices = new int[totalColumnCount - metadataColumnIndices.size()];
+
+    for (int i = 0, j = 0; i < indices.length; i++) {
+      if (!metadataColumnIndices.contains(i)) {
+        indices[j] = i;
+        j++;
+      }
+    }
+    return indices;
+  }
 }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ComputeUpdateIterator.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ComputeUpdateIterator.java
index 23e6a19a17..6951c33e51 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ComputeUpdateIterator.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ComputeUpdateIterator.java
@@ -81,15 +81,13 @@ public class ComputeUpdateIterator extends 
ChangelogIterator {
     // either a cached record which is not an UPDATE or the next record in the 
iterator.
     Row currentRow = currentRow();
 
-    if (currentRow.getString(changeTypeIndex()).equals(DELETE) && 
rowIterator().hasNext()) {
+    if (changeType(currentRow).equals(DELETE) && rowIterator().hasNext()) {
       Row nextRow = rowIterator().next();
       cachedRow = nextRow;
 
       if (sameLogicalRow(currentRow, nextRow)) {
-        String nextRowChangeType = nextRow.getString(changeTypeIndex());
-
         Preconditions.checkState(
-            nextRowChangeType.equals(INSERT),
+            changeType(nextRow).equals(INSERT),
             "Cannot compute updates because there are multiple rows with the 
same identifier"
                 + " fields([%s]). Please make sure the rows are unique.",
             String.join(",", identifierFields));
@@ -118,7 +116,7 @@ public class ComputeUpdateIterator extends 
ChangelogIterator {
   }
 
   private boolean cachedUpdateRecord() {
-    return cachedRow != null && 
cachedRow.getString(changeTypeIndex()).equals(UPDATE_AFTER);
+    return cachedRow != null && changeType(cachedRow).equals(UPDATE_AFTER);
   }
 
   private Row currentRow() {
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java
index 70b160e13f..2e90dc7749 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java
@@ -19,6 +19,8 @@
 package org.apache.iceberg.spark;
 
 import java.util.Iterator;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.types.StructType;
 
@@ -55,7 +57,7 @@ class RemoveCarryoverIterator extends ChangelogIterator {
 
   RemoveCarryoverIterator(Iterator<Row> rowIterator, StructType rowType) {
     super(rowIterator, rowType);
-    this.indicesToIdentifySameRow = 
generateIndicesToIdentifySameRow(rowType.size());
+    this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow();
   }
 
   @Override
@@ -88,7 +90,7 @@ class RemoveCarryoverIterator extends ChangelogIterator {
     }
 
     // If the current row is a delete row, drain all identical delete rows
-    if (currentRow.getString(changeTypeIndex()).equals(DELETE) && 
rowIterator().hasNext()) {
+    if (changeType(currentRow).equals(DELETE) && rowIterator().hasNext()) {
       cachedDeletedRow = currentRow;
       deletedRowCount = 1;
 
@@ -98,8 +100,8 @@ class RemoveCarryoverIterator extends ChangelogIterator {
       // row is the same record
       while (nextRow != null
           && cachedDeletedRow != null
-          && isSameRecord(cachedDeletedRow, nextRow)) {
-        if (nextRow.getString(changeTypeIndex()).equals(INSERT)) {
+          && isSameRecord(cachedDeletedRow, nextRow, 
indicesToIdentifySameRow)) {
+        if (changeType(nextRow).equals(INSERT)) {
           deletedRowCount--;
           if (deletedRowCount == 0) {
             cachedDeletedRow = null;
@@ -139,25 +141,8 @@ class RemoveCarryoverIterator extends ChangelogIterator {
     return cachedDeletedRow != null;
   }
 
-  private int[] generateIndicesToIdentifySameRow(int columnSize) {
-    int[] indices = new int[columnSize - 1];
-    for (int i = 0; i < indices.length; i++) {
-      if (i < changeTypeIndex()) {
-        indices[i] = i;
-      } else {
-        indices[i] = i + 1;
-      }
-    }
-    return indices;
-  }
-
-  private boolean isSameRecord(Row currentRow, Row nextRow) {
-    for (int idx : indicesToIdentifySameRow) {
-      if (isDifferentValue(currentRow, nextRow, idx)) {
-        return false;
-      }
-    }
-
-    return true;
+  private int[] generateIndicesToIdentifySameRow() {
+    Set<Integer> metadataColumnIndices = Sets.newHashSet(changeTypeIndex());
+    return generateIndicesToIdentifySameRow(rowType().size(), 
metadataColumnIndices);
   }
 }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java
new file mode 100644
index 0000000000..941e4a4731
--- /dev/null
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.Iterator;
+import java.util.Set;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * This class computes the net changes across multiple snapshots. It is 
different from {@link
+ * org.apache.iceberg.spark.RemoveCarryoverIterator}, which only removes 
carry-over rows within a
+ * single snapshot. It takes a row iterator, and assumes the following:
+ *
+ * <ul>
+ *   <li>The row iterator is partitioned by all columns.
+ *   <li>The row iterator is sorted by all columns, change order, and change 
type. The change order
+ *       is 1-to-1 mapping to snapshot id.
+ * </ul>
+ */
+public class RemoveNetCarryoverIterator extends ChangelogIterator {
+
+  private final int[] indicesToIdentifySameRow;
+
+  private Row cachedNextRow;
+  private Row cachedRow;
+  private long cachedRowCount;
+
+  protected RemoveNetCarryoverIterator(Iterator<Row> rowIterator, StructType 
rowType) {
+    super(rowIterator, rowType);
+    this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow();
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (cachedRowCount > 0) {
+      return true;
+    }
+
+    if (cachedNextRow != null) {
+      return true;
+    }
+
+    return rowIterator().hasNext();
+  }
+
+  @Override
+  public Row next() {
+    // if there are cached rows, return one of them from the beginning
+    if (cachedRowCount > 0) {
+      cachedRowCount--;
+      return cachedRow;
+    }
+
+    cachedRow = getCurrentRow();
+    // return it directly if there is no more rows
+    if (!rowIterator().hasNext()) {
+      return cachedRow;
+    }
+    cachedRowCount = 1;
+
+    cachedNextRow = rowIterator().next();
+
+    // pull rows from the iterator until two consecutive rows are different
+    while (isSameRecord(cachedRow, cachedNextRow, indicesToIdentifySameRow)) {
+      if (oppositeChangeType(cachedRow, cachedNextRow)) {
+        // two rows with opposite change types means no net changes, remove 
both
+        cachedRowCount--;
+      } else {
+        // two rows with same change types means potential net changes, cache 
the next row
+        cachedRowCount++;
+      }
+
+      // stop pulling rows if there is no more rows or the next row is 
different
+      if (cachedRowCount <= 0 || !rowIterator().hasNext()) {
+        // reset the cached next row if there is no more rows
+        cachedNextRow = null;
+        break;
+      }
+
+      cachedNextRow = rowIterator().next();
+    }
+
+    return null;
+  }
+
+  private Row getCurrentRow() {
+    Row currentRow;
+    if (cachedNextRow != null) {
+      currentRow = cachedNextRow;
+      cachedNextRow = null;
+    } else {
+      currentRow = rowIterator().next();
+    }
+    return currentRow;
+  }
+
+  private boolean oppositeChangeType(Row currentRow, Row nextRow) {
+    return (changeType(nextRow).equals(INSERT) && 
changeType(currentRow).equals(DELETE))
+        || (changeType(nextRow).equals(DELETE) && 
changeType(currentRow).equals(INSERT));
+  }
+
+  private int[] generateIndicesToIdentifySameRow() {
+    Set<Integer> metadataColumnIndices =
+        Sets.newHashSet(
+            rowType().fieldIndex(MetadataColumns.CHANGE_ORDINAL.name()),
+            rowType().fieldIndex(MetadataColumns.COMMIT_SNAPSHOT_ID.name()),
+            changeTypeIndex());
+    return generateIndicesToIdentifySameRow(rowType().size(), 
metadataColumnIndices);
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
index 85043d2df3..259254aa2d 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
@@ -21,11 +21,14 @@ package org.apache.iceberg.spark.procedures;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.ChangelogIterator;
 import org.apache.iceberg.spark.source.SparkChangelogTable;
 import org.apache.spark.api.java.function.MapPartitionsFunction;
@@ -88,10 +91,22 @@ public class CreateChangelogViewProcedure extends 
BaseProcedure {
       ProcedureParameter.optional("options", STRING_MAP);
   private static final ProcedureParameter COMPUTE_UPDATES_PARAM =
       ProcedureParameter.optional("compute_updates", DataTypes.BooleanType);
+
+  /**
+   * Enable or disable the remove carry-over rows.
+   *
+   * @deprecated since 1.4.0, will be removed in 1.5.0; The procedure will 
always remove carry-over
+   *     rows. Please query {@link SparkChangelogTable} instead for the use 
cases doesn't remove
+   *     carry-over rows.
+   */
+  @Deprecated
   private static final ProcedureParameter REMOVE_CARRYOVERS_PARAM =
       ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType);
+
   private static final ProcedureParameter IDENTIFIER_COLUMNS_PARAM =
       ProcedureParameter.optional("identifier_columns", STRING_ARRAY);
+  private static final ProcedureParameter NET_CHANGES =
+      ProcedureParameter.optional("net_changes", DataTypes.BooleanType);
 
   private static final ProcedureParameter[] PARAMETERS =
       new ProcedureParameter[] {
@@ -101,6 +116,7 @@ public class CreateChangelogViewProcedure extends 
BaseProcedure {
         COMPUTE_UPDATES_PARAM,
         REMOVE_CARRYOVERS_PARAM,
         IDENTIFIER_COLUMNS_PARAM,
+        NET_CHANGES,
       };
 
   private static final StructType OUTPUT_TYPE =
@@ -142,10 +158,13 @@ public class CreateChangelogViewProcedure extends 
BaseProcedure {
     Identifier changelogTableIdent = changelogTableIdent(tableIdent);
     Dataset<Row> df = loadRows(changelogTableIdent, options(input));
 
+    boolean netChanges = input.asBoolean(NET_CHANGES, false);
+
     if (shouldComputeUpdateImages(input)) {
+      Preconditions.checkArgument(!netChanges, "Not support net changes with 
update images");
       df = computeUpdateImages(identifierColumns(input, tableIdent), df);
     } else if (shouldRemoveCarryoverRows(input)) {
-      df = removeCarryoverRows(df);
+      df = removeCarryoverRows(df, netChanges);
     }
 
     String viewName = viewName(input, tableIdent.name());
@@ -164,6 +183,7 @@ public class CreateChangelogViewProcedure extends 
BaseProcedure {
     for (int i = 0; i < identifierColumns.length; i++) {
       repartitionSpec[i] = df.col(identifierColumns[i]);
     }
+
     repartitionSpec[repartitionSpec.length - 1] = 
df.col(MetadataColumns.CHANGE_ORDINAL.name());
 
     return applyChangelogIterator(df, repartitionSpec);
@@ -179,13 +199,23 @@ public class CreateChangelogViewProcedure extends 
BaseProcedure {
     return input.asBoolean(REMOVE_CARRYOVERS_PARAM, true);
   }
 
-  private Dataset<Row> removeCarryoverRows(Dataset<Row> df) {
+  private Dataset<Row> removeCarryoverRows(Dataset<Row> df, boolean 
netChanges) {
+    Predicate<String> columnsToKeep;
+    if (netChanges) {
+      Set<String> metadataColumn =
+          Sets.newHashSet(
+              MetadataColumns.CHANGE_TYPE.name(),
+              MetadataColumns.CHANGE_ORDINAL.name(),
+              MetadataColumns.COMMIT_SNAPSHOT_ID.name());
+
+      columnsToKeep = column -> !metadataColumn.contains(column);
+    } else {
+      columnsToKeep = column -> 
!column.equals(MetadataColumns.CHANGE_TYPE.name());
+    }
+
     Column[] repartitionSpec =
-        Arrays.stream(df.columns())
-            .filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name()))
-            .map(df::col)
-            .toArray(Column[]::new);
-    return applyCarryoverRemoveIterator(df, repartitionSpec);
+        
Arrays.stream(df.columns()).filter(columnsToKeep).map(df::col).toArray(Column[]::new);
+    return applyCarryoverRemoveIterator(df, repartitionSpec, netChanges);
   }
 
   private String[] identifierColumns(ProcedureInput input, Identifier 
tableIdent) {
@@ -214,7 +244,7 @@ public class CreateChangelogViewProcedure extends 
BaseProcedure {
   }
 
   private Dataset<Row> applyChangelogIterator(Dataset<Row> df, Column[] 
repartitionSpec) {
-    Column[] sortSpec = sortSpec(df, repartitionSpec);
+    Column[] sortSpec = sortSpec(df, repartitionSpec, false);
     StructType schema = df.schema();
     String[] identifierFields =
         
Arrays.stream(repartitionSpec).map(Column::toString).toArray(String[]::new);
@@ -228,22 +258,33 @@ public class CreateChangelogViewProcedure extends 
BaseProcedure {
             RowEncoder.apply(schema));
   }
 
-  private Dataset<Row> applyCarryoverRemoveIterator(Dataset<Row> df, Column[] 
repartitionSpec) {
-    Column[] sortSpec = sortSpec(df, repartitionSpec);
+  private Dataset<Row> applyCarryoverRemoveIterator(
+      Dataset<Row> df, Column[] repartitionSpec, boolean netChanges) {
+    Column[] sortSpec = sortSpec(df, repartitionSpec, netChanges);
     StructType schema = df.schema();
 
     return df.repartition(repartitionSpec)
         .sortWithinPartitions(sortSpec)
         .mapPartitions(
             (MapPartitionsFunction<Row, Row>)
-                rowIterator -> ChangelogIterator.removeCarryovers(rowIterator, 
schema),
+                rowIterator ->
+                    netChanges
+                        ? ChangelogIterator.removeNetCarryovers(rowIterator, 
schema)
+                        : ChangelogIterator.removeCarryovers(rowIterator, 
schema),
             RowEncoder.apply(schema));
   }
 
-  private static Column[] sortSpec(Dataset<Row> df, Column[] repartitionSpec) {
-    Column[] sortSpec = new Column[repartitionSpec.length + 1];
+  private static Column[] sortSpec(Dataset<Row> df, Column[] repartitionSpec, 
boolean netChanges) {
+    Column changeType = df.col(MetadataColumns.CHANGE_TYPE.name());
+    Column changeOrdinal = df.col(MetadataColumns.CHANGE_ORDINAL.name());
+    Column[] extraColumns =
+        netChanges ? new Column[] {changeOrdinal, changeType} : new Column[] 
{changeType};
+
+    Column[] sortSpec = new Column[repartitionSpec.length + 
extraColumns.length];
+
     System.arraycopy(repartitionSpec, 0, sortSpec, 0, repartitionSpec.length);
-    sortSpec[sortSpec.length - 1] = df.col(MetadataColumns.CHANGE_TYPE.name());
+    System.arraycopy(extraColumns, 0, sortSpec, repartitionSpec.length, 
extraColumns.length);
+
     return sortSpec;
   }
 
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java
index bf98bebb9d..0539598f14 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java
@@ -49,7 +49,17 @@ public class TestChangelogIterator extends 
SparkTestHelperBase {
             new StructField("name", DataTypes.StringType, false, 
Metadata.empty()),
             new StructField("data", DataTypes.StringType, true, 
Metadata.empty()),
             new StructField(
-                MetadataColumns.CHANGE_TYPE.name(), DataTypes.StringType, 
false, Metadata.empty())
+                MetadataColumns.CHANGE_TYPE.name(), DataTypes.StringType, 
false, Metadata.empty()),
+            new StructField(
+                MetadataColumns.CHANGE_ORDINAL.name(),
+                DataTypes.IntegerType,
+                false,
+                Metadata.empty()),
+            new StructField(
+                MetadataColumns.COMMIT_SNAPSHOT_ID.name(),
+                DataTypes.LongType,
+                false,
+                Metadata.empty())
           });
   private static final String[] IDENTIFIER_FIELDS = new String[] {"id", 
"name"};
 
@@ -93,18 +103,18 @@ public class TestChangelogIterator extends 
SparkTestHelperBase {
     switch (rowType) {
       case DELETED:
         return Lists.newArrayList(
-            new GenericRowWithSchema(new Object[] {index, "b", "data", 
DELETE}, null));
+            new GenericRowWithSchema(new Object[] {index, "b", "data", DELETE, 
0, 0}, null));
       case INSERTED:
         return Lists.newArrayList(
-            new GenericRowWithSchema(new Object[] {index, "c", "data", 
INSERT}, null));
+            new GenericRowWithSchema(new Object[] {index, "c", "data", INSERT, 
0, 0}, null));
       case CARRY_OVER:
         return Lists.newArrayList(
-            new GenericRowWithSchema(new Object[] {index, "d", "data", 
DELETE}, null),
-            new GenericRowWithSchema(new Object[] {index, "d", "data", 
INSERT}, null));
+            new GenericRowWithSchema(new Object[] {index, "d", "data", DELETE, 
0, 0}, null),
+            new GenericRowWithSchema(new Object[] {index, "d", "data", INSERT, 
0, 0}, null));
       case UPDATED:
         return Lists.newArrayList(
-            new GenericRowWithSchema(new Object[] {index, "a", "data", 
DELETE}, null),
-            new GenericRowWithSchema(new Object[] {index, "a", "new_data", 
INSERT}, null));
+            new GenericRowWithSchema(new Object[] {index, "a", "data", DELETE, 
0, 0}, null),
+            new GenericRowWithSchema(new Object[] {index, "a", "new_data", 
INSERT, 0, 0}, null));
       default:
         throw new IllegalArgumentException("Unknown row type: " + rowType);
     }
@@ -114,18 +124,18 @@ public class TestChangelogIterator extends 
SparkTestHelperBase {
     switch (rowType) {
       case DELETED:
         List<Object[]> rows = Lists.newArrayList();
-        rows.add(new Object[] {order, "b", "data", DELETE});
+        rows.add(new Object[] {order, "b", "data", DELETE, 0, 0});
         return rows;
       case INSERTED:
         List<Object[]> insertedRows = Lists.newArrayList();
-        insertedRows.add(new Object[] {order, "c", "data", INSERT});
+        insertedRows.add(new Object[] {order, "c", "data", INSERT, 0, 0});
         return insertedRows;
       case CARRY_OVER:
         return Lists.newArrayList();
       case UPDATED:
         return Lists.newArrayList(
-            new Object[] {order, "a", "data", UPDATE_BEFORE},
-            new Object[] {order, "a", "new_data", UPDATE_AFTER});
+            new Object[] {order, "a", "data", UPDATE_BEFORE, 0, 0},
+            new Object[] {order, "a", "new_data", UPDATE_AFTER, 0, 0});
       default:
         throw new IllegalArgumentException("Unknown row type: " + rowType);
     }
@@ -146,16 +156,16 @@ public class TestChangelogIterator extends 
SparkTestHelperBase {
   public void testRowsWithNullValue() {
     final List<Row> rowsWithNull =
         Lists.newArrayList(
-            new GenericRowWithSchema(new Object[] {2, null, null, DELETE}, 
null),
-            new GenericRowWithSchema(new Object[] {3, null, null, INSERT}, 
null),
-            new GenericRowWithSchema(new Object[] {4, null, null, DELETE}, 
null),
-            new GenericRowWithSchema(new Object[] {4, null, null, INSERT}, 
null),
+            new GenericRowWithSchema(new Object[] {2, null, null, DELETE, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {3, null, null, INSERT, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {4, null, null, DELETE, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {4, null, null, INSERT, 0, 
0}, null),
             // mixed null and non-null value in non-identifier columns
-            new GenericRowWithSchema(new Object[] {5, null, null, DELETE}, 
null),
-            new GenericRowWithSchema(new Object[] {5, null, "data", INSERT}, 
null),
+            new GenericRowWithSchema(new Object[] {5, null, null, DELETE, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {5, null, "data", INSERT, 0, 
0}, null),
             // mixed null and non-null value in identifier columns
-            new GenericRowWithSchema(new Object[] {6, null, null, DELETE}, 
null),
-            new GenericRowWithSchema(new Object[] {6, "name", null, INSERT}, 
null));
+            new GenericRowWithSchema(new Object[] {6, null, null, DELETE, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {6, "name", null, INSERT, 0, 
0}, null));
 
     Iterator<Row> iterator =
         ChangelogIterator.computeUpdates(rowsWithNull.iterator(), SCHEMA, 
IDENTIFIER_FIELDS);
@@ -164,12 +174,12 @@ public class TestChangelogIterator extends 
SparkTestHelperBase {
     assertEquals(
         "Rows should match",
         Lists.newArrayList(
-            new Object[] {2, null, null, DELETE},
-            new Object[] {3, null, null, INSERT},
-            new Object[] {5, null, null, UPDATE_BEFORE},
-            new Object[] {5, null, "data", UPDATE_AFTER},
-            new Object[] {6, null, null, DELETE},
-            new Object[] {6, "name", null, INSERT}),
+            new Object[] {2, null, null, DELETE, 0, 0},
+            new Object[] {3, null, null, INSERT, 0, 0},
+            new Object[] {5, null, null, UPDATE_BEFORE, 0, 0},
+            new Object[] {5, null, "data", UPDATE_AFTER, 0, 0},
+            new Object[] {6, null, null, DELETE, 0, 0},
+            new Object[] {6, "name", null, INSERT, 0, 0}),
         rowsToJava(result));
   }
 
@@ -178,10 +188,10 @@ public class TestChangelogIterator extends 
SparkTestHelperBase {
     List<Row> rowsWithDuplication =
         Lists.newArrayList(
             // two rows with same identifier fields(id, name)
-            new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE}, 
null),
-            new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE}, 
null),
-            new GenericRowWithSchema(new Object[] {1, "a", "new_data", 
INSERT}, null),
-            new GenericRowWithSchema(new Object[] {1, "a", "new_data", 
INSERT}, null));
+            new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {1, "a", "new_data", INSERT, 
0, 0}, null),
+            new GenericRowWithSchema(new Object[] {1, "a", "new_data", INSERT, 
0, 0}, null));
 
     Iterator<Row> iterator =
         ChangelogIterator.computeUpdates(rowsWithDuplication.iterator(), 
SCHEMA, IDENTIFIER_FIELDS);
@@ -194,9 +204,9 @@ public class TestChangelogIterator extends 
SparkTestHelperBase {
     // still allow extra insert rows
     rowsWithDuplication =
         Lists.newArrayList(
-            new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE}, 
null),
-            new GenericRowWithSchema(new Object[] {1, "a", "new_data1", 
INSERT}, null),
-            new GenericRowWithSchema(new Object[] {1, "a", "new_data2", 
INSERT}, null));
+            new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {1, "a", "new_data1", 
INSERT, 0, 0}, null),
+            new GenericRowWithSchema(new Object[] {1, "a", "new_data2", 
INSERT, 0, 0}, null));
 
     Iterator<Row> iterator1 =
         ChangelogIterator.computeUpdates(rowsWithDuplication.iterator(), 
SCHEMA, IDENTIFIER_FIELDS);
@@ -204,9 +214,9 @@ public class TestChangelogIterator extends 
SparkTestHelperBase {
     assertEquals(
         "Rows should match.",
         Lists.newArrayList(
-            new Object[] {1, "a", "data", UPDATE_BEFORE},
-            new Object[] {1, "a", "new_data1", UPDATE_AFTER},
-            new Object[] {1, "a", "new_data2", INSERT}),
+            new Object[] {1, "a", "data", UPDATE_BEFORE, 0, 0},
+            new Object[] {1, "a", "new_data1", UPDATE_AFTER, 0, 0},
+            new Object[] {1, "a", "new_data2", INSERT, 0, 0}),
         rowsToJava(Lists.newArrayList(iterator1)));
   }
 
@@ -216,32 +226,28 @@ public class TestChangelogIterator extends 
SparkTestHelperBase {
     List<Row> rowsWithDuplication =
         Lists.newArrayList(
             // keep all delete rows for id 0 and id 1 since there is no insert 
row for them
-            new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE}, 
null),
-            new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE}, 
null),
-            new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE}, 
null),
-            new GenericRowWithSchema(new Object[] {1, "a", "old_data", 
DELETE}, null),
-            new GenericRowWithSchema(new Object[] {1, "a", "old_data", 
DELETE}, null),
+            new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {1, "a", "old_data", DELETE, 
0, 0}, null),
+            new GenericRowWithSchema(new Object[] {1, "a", "old_data", DELETE, 
0, 0}, null),
             // the same number of delete and insert rows for id 2
-            new GenericRowWithSchema(new Object[] {2, "a", "data", DELETE}, 
null),
-            new GenericRowWithSchema(new Object[] {2, "a", "data", DELETE}, 
null),
-            new GenericRowWithSchema(new Object[] {2, "a", "data", INSERT}, 
null),
-            new GenericRowWithSchema(new Object[] {2, "a", "data", INSERT}, 
null),
-            new GenericRowWithSchema(new Object[] {3, "a", "new_data", 
INSERT}, null));
-
-    Iterator<Row> iterator =
-        ChangelogIterator.removeCarryovers(rowsWithDuplication.iterator(), 
SCHEMA);
-    List<Row> result = Lists.newArrayList(iterator);
+            new GenericRowWithSchema(new Object[] {2, "a", "data", DELETE, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {2, "a", "data", DELETE, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {2, "a", "data", INSERT, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {2, "a", "data", INSERT, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {3, "a", "new_data", INSERT, 
0, 0}, null));
 
-    assertEquals(
-        "Rows should match.",
+    List<Object[]> expectedRows =
         Lists.newArrayList(
-            new Object[] {0, "a", "data", DELETE},
-            new Object[] {0, "a", "data", DELETE},
-            new Object[] {0, "a", "data", DELETE},
-            new Object[] {1, "a", "old_data", DELETE},
-            new Object[] {1, "a", "old_data", DELETE},
-            new Object[] {3, "a", "new_data", INSERT}),
-        rowsToJava(result));
+            new Object[] {0, "a", "data", DELETE, 0, 0},
+            new Object[] {0, "a", "data", DELETE, 0, 0},
+            new Object[] {0, "a", "data", DELETE, 0, 0},
+            new Object[] {1, "a", "old_data", DELETE, 0, 0},
+            new Object[] {1, "a", "old_data", DELETE, 0, 0},
+            new Object[] {3, "a", "new_data", INSERT, 0, 0});
+
+    validateIterators(rowsWithDuplication, expectedRows);
   }
 
   @Test
@@ -249,45 +255,39 @@ public class TestChangelogIterator extends 
SparkTestHelperBase {
     // less insert rows than delete rows
     List<Row> rowsWithDuplication =
         Lists.newArrayList(
-            new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, 
null),
-            new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, 
null),
-            new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT}, 
null),
-            new GenericRowWithSchema(new Object[] {2, "d", "data", INSERT}, 
null));
+            new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {2, "d", "data", INSERT, 0, 
0}, null));
 
-    Iterator<Row> iterator =
-        ChangelogIterator.removeCarryovers(rowsWithDuplication.iterator(), 
SCHEMA);
-    List<Row> result = Lists.newArrayList(iterator);
-
-    assertEquals(
-        "Rows should match.",
+    List<Object[]> expectedRows =
         Lists.newArrayList(
-            new Object[] {1, "d", "data", DELETE}, new Object[] {2, "d", 
"data", INSERT}),
-        rowsToJava(result));
+            new Object[] {1, "d", "data", DELETE, 0, 0},
+            new Object[] {2, "d", "data", INSERT, 0, 0});
+
+    validateIterators(rowsWithDuplication, expectedRows);
   }
 
   @Test
   public void testCarryRowsRemoveMoreInsertRows() {
     List<Row> rowsWithDuplication =
         Lists.newArrayList(
-            new GenericRowWithSchema(new Object[] {0, "d", "data", DELETE}, 
null),
-            new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, 
null),
-            new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, 
null),
-            new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, 
null),
+            new GenericRowWithSchema(new Object[] {0, "d", "data", DELETE, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 
0}, null),
             // more insert rows than delete rows, should keep extra insert rows
-            new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT}, 
null),
-            new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT}, 
null),
-            new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT}, 
null),
-            new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT}, 
null));
-
-    Iterator<Row> iterator =
-        ChangelogIterator.removeCarryovers(rowsWithDuplication.iterator(), 
SCHEMA);
-    List<Row> result = Lists.newArrayList(iterator);
+            new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 
0}, null));
 
-    assertEquals(
-        "Rows should match.",
+    List<Object[]> expectedRows =
         Lists.newArrayList(
-            new Object[] {0, "d", "data", DELETE}, new Object[] {1, "d", 
"data", INSERT}),
-        rowsToJava(result));
+            new Object[] {0, "d", "data", DELETE, 0, 0},
+            new Object[] {1, "d", "data", INSERT, 0, 0});
+
+    validateIterators(rowsWithDuplication, expectedRows);
   }
 
   @Test
@@ -296,17 +296,64 @@ public class TestChangelogIterator extends 
SparkTestHelperBase {
     List<Row> rowsWithDuplication =
         Lists.newArrayList(
             // next two rows are identical
-            new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, 
null),
-            new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE}, 
null));
+            new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 
0}, null));
+
+    List<Object[]> expectedRows =
+        Lists.newArrayList(
+            new Object[] {1, "d", "data", DELETE, 0, 0},
+            new Object[] {1, "d", "data", DELETE, 0, 0});
 
+    validateIterators(rowsWithDuplication, expectedRows);
+  }
+
+  private void validateIterators(List<Row> rowsWithDuplication, List<Object[]> 
expectedRows) {
     Iterator<Row> iterator =
         ChangelogIterator.removeCarryovers(rowsWithDuplication.iterator(), 
SCHEMA);
     List<Row> result = Lists.newArrayList(iterator);
 
-    assertEquals(
-        "Duplicate rows should not be removed",
+    assertEquals("Rows should match.", expectedRows, rowsToJava(result));
+
+    iterator = 
ChangelogIterator.removeNetCarryovers(rowsWithDuplication.iterator(), SCHEMA);
+    result = Lists.newArrayList(iterator);
+
+    assertEquals("Rows should match.", expectedRows, rowsToJava(result));
+  }
+
+  @Test
+  public void testRemoveNetCarryovers() {
+    List<Row> rowsWithDuplication =
         Lists.newArrayList(
-            new Object[] {1, "d", "data", DELETE}, new Object[] {1, "d", 
"data", DELETE}),
-        rowsToJava(result));
+            // this row are different from other rows, it is a net change, 
should be kept
+            new GenericRowWithSchema(new Object[] {0, "d", "data", DELETE, 0, 
0}, null),
+            // a pair of delete and insert rows, should be removed
+            new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0, 
0}, null),
+            new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0, 
0}, null),
+            // 2 delete rows and 2 insert rows, should be removed
+            new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 1, 
1}, null),
+            new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 1, 
1}, null),
+            new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 1, 
1}, null),
+            new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 1, 
1}, null),
+            // a pair of insert and delete rows across snapshots, should be 
removed
+            new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 2, 
2}, null),
+            new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 3, 
3}, null),
+            // extra insert rows, they are net changes, should be kept
+            new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 4, 
4}, null),
+            new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 4, 
4}, null),
+            // different key, net changes, should be kept
+            new GenericRowWithSchema(new Object[] {2, "d", "data", DELETE, 4, 
4}, null));
+
+    List<Object[]> expectedRows =
+        Lists.newArrayList(
+            new Object[] {0, "d", "data", DELETE, 0, 0},
+            new Object[] {1, "d", "data", INSERT, 4, 4},
+            new Object[] {1, "d", "data", INSERT, 4, 4},
+            new Object[] {2, "d", "data", DELETE, 4, 4});
+
+    Iterator<Row> iterator =
+        ChangelogIterator.removeNetCarryovers(rowsWithDuplication.iterator(), 
SCHEMA);
+    List<Row> result = Lists.newArrayList(iterator);
+
+    assertEquals("Rows should match.", expectedRows, rowsToJava(result));
   }
 }


Reply via email to