This is an automated email from the ASF dual-hosted git repository.
yufei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new f5346cacf1 Spark 3.4: Output the net changes across snapshots for
carryover rows in CDC (#7950)
f5346cacf1 is described below
commit f5346cacf17d63257ab433215e1661ed221750d0
Author: Yufei Gu <[email protected]>
AuthorDate: Fri Jun 30 23:36:09 2023 -0700
Spark 3.4: Output the net changes across snapshots for carryover rows in
CDC (#7950)
---
.../TestCreateChangelogViewProcedure.java | 91 ++++++--
.../apache/iceberg/spark/ChangelogIterator.java | 42 ++++
.../iceberg/spark/ComputeUpdateIterator.java | 8 +-
.../iceberg/spark/RemoveCarryoverIterator.java | 33 +--
.../iceberg/spark/RemoveNetCarryoverIterator.java | 129 ++++++++++++
.../procedures/CreateChangelogViewProcedure.java | 69 ++++--
.../iceberg/spark/TestChangelogIterator.java | 231 +++++++++++++--------
7 files changed, 456 insertions(+), 147 deletions(-)
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
index dc12b0145d..0a4a1073c3 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.spark.extensions;
+import static org.junit.Assert.assertThrows;
+
import java.util.List;
import java.util.Map;
import org.apache.iceberg.ChangelogOperation;
@@ -45,13 +47,13 @@ public class TestCreateChangelogViewProcedure extends
SparkExtensionsTestBase {
sql("DROP TABLE IF EXISTS %s", tableName);
}
- public void createTableWith2Columns() {
+ public void createTableWithTwoColumns() {
sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName,
1);
sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName);
}
- private void createTableWith3Columns() {
+ private void createTableWithThreeColumns() {
sql("CREATE TABLE %s (id INT, data STRING, age INT) USING iceberg",
tableName);
sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName,
1);
sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName);
@@ -65,7 +67,7 @@ public class TestCreateChangelogViewProcedure extends
SparkExtensionsTestBase {
@Test
public void testCustomizedViewName() {
- createTableWith2Columns();
+ createTableWithTwoColumns();
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
sql("INSERT INTO %s VALUES (2, 'b')", tableName);
@@ -98,7 +100,7 @@ public class TestCreateChangelogViewProcedure extends
SparkExtensionsTestBase {
@Test
public void testNoSnapshotIdInput() {
- createTableWith2Columns();
+ createTableWithTwoColumns();
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap0 = table.currentSnapshot();
@@ -129,7 +131,7 @@ public class TestCreateChangelogViewProcedure extends
SparkExtensionsTestBase {
@Test
public void testTimestampsBasedQuery() {
- createTableWith2Columns();
+ createTableWithTwoColumns();
long beginning = System.currentTimeMillis();
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
@@ -189,7 +191,7 @@ public class TestCreateChangelogViewProcedure extends
SparkExtensionsTestBase {
@Test
public void testWithCarryovers() {
- createTableWith2Columns();
+ createTableWithTwoColumns();
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap0 = table.currentSnapshot();
@@ -224,7 +226,7 @@ public class TestCreateChangelogViewProcedure extends
SparkExtensionsTestBase {
@Test
public void testUpdate() {
- createTableWith2Columns();
+ createTableWithTwoColumns();
sql("ALTER TABLE %s DROP PARTITION FIELD data", tableName);
sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName);
@@ -283,7 +285,7 @@ public class TestCreateChangelogViewProcedure extends
SparkExtensionsTestBase {
@Test
public void testUpdateWithFilter() {
- createTableWith2Columns();
+ createTableWithTwoColumns();
sql("ALTER TABLE %s DROP PARTITION FIELD data", tableName);
sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName);
@@ -315,7 +317,7 @@ public class TestCreateChangelogViewProcedure extends
SparkExtensionsTestBase {
@Test
public void testUpdateWithMultipleIdentifierColumns() {
- createTableWith3Columns();
+ createTableWithThreeColumns();
sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11)", tableName);
Table table = validationCatalog.loadTable(tableIdent);
@@ -347,7 +349,7 @@ public class TestCreateChangelogViewProcedure extends
SparkExtensionsTestBase {
@Test
public void testRemoveCarryOvers() {
- createTableWith3Columns();
+ createTableWithThreeColumns();
sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)",
tableName);
Table table = validationCatalog.loadTable(tableIdent);
@@ -381,7 +383,7 @@ public class TestCreateChangelogViewProcedure extends
SparkExtensionsTestBase {
@Test
public void testRemoveCarryOversWithoutUpdatedRows() {
- createTableWith3Columns();
+ createTableWithThreeColumns();
sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)",
tableName);
Table table = validationCatalog.loadTable(tableIdent);
@@ -411,9 +413,74 @@ public class TestCreateChangelogViewProcedure extends
SparkExtensionsTestBase {
sql("select * from %s order by _change_ordinal, id, data", viewName));
}
+ @Test
+ public void testNetChangesWithRemoveCarryOvers() {
+ // partitioned by id
+ createTableWithThreeColumns();
+
+ // insert rows: (1, 'a', 12) (2, 'b', 11) (2, 'e', 12)
+ sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)",
tableName);
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snap1 = table.currentSnapshot();
+
+ // delete rows: (2, 'b', 11) (2, 'e', 12)
+ // insert rows: (3, 'c', 13) (2, 'd', 11) (2, 'e', 12)
+ sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)",
tableName);
+ table.refresh();
+ Snapshot snap2 = table.currentSnapshot();
+
+ // delete rows: (2, 'd', 11) (2, 'e', 12) (3, 'c', 13)
+ // insert rows: (3, 'c', 15) (2, 'e', 12)
+ sql("INSERT OVERWRITE %s VALUES (3, 'c', 15), (2, 'e', 12)", tableName);
+ table.refresh();
+ Snapshot snap3 = table.currentSnapshot();
+
+ // test with all snapshots
+ List<Object[]> returns =
+ sql(
+ "CALL %s.system.create_changelog_view(table => '%s', net_changes
=> true)",
+ catalogName, tableName);
+
+ String viewName = (String) returns.get(0)[0];
+
+ assertEquals(
+ "Rows should match",
+ ImmutableList.of(
+ row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
+ row(3, "c", 15, INSERT, 2, snap3.snapshotId()),
+ row(2, "e", 12, INSERT, 2, snap3.snapshotId())),
+ sql("select * from %s order by _change_ordinal, data", viewName));
+
+ // test with snap2 and snap3
+ sql(
+ "CALL %s.system.create_changelog_view(table => '%s', "
+ + "options => map('start-snapshot-id','%s'), "
+ + "net_changes => true)",
+ catalogName, tableName, snap1.snapshotId());
+
+ assertEquals(
+ "Rows should match",
+ ImmutableList.of(
+ row(2, "b", 11, DELETE, 0, snap2.snapshotId()),
+ row(3, "c", 15, INSERT, 1, snap3.snapshotId())),
+ sql("select * from %s order by _change_ordinal, data", viewName));
+ }
+
+ @Test
+ public void testNetChangesWithComputeUpdates() {
+ createTableWithTwoColumns();
+ assertThrows(
+ "Should fail because net_changes is not supported with computing
updates",
+ IllegalArgumentException.class,
+ () ->
+ sql(
+ "CALL %s.system.create_changelog_view(table => '%s',
identifier_columns => array('id'), net_changes => true)",
+ catalogName, tableName));
+ }
+
@Test
public void testNotRemoveCarryOvers() {
- createTableWith3Columns();
+ createTableWithThreeColumns();
sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)",
tableName);
Table table = validationCatalog.loadTable(tableIdent);
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java
index 5f30c5fd4e..cc44b1f399 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java
@@ -20,8 +20,10 @@ package org.apache.iceberg.spark;
import java.util.Iterator;
import java.util.Objects;
+import java.util.Set;
import org.apache.iceberg.ChangelogOperation;
import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
@@ -35,9 +37,11 @@ public abstract class ChangelogIterator implements
Iterator<Row> {
private final Iterator<Row> rowIterator;
private final int changeTypeIndex;
+ private final StructType rowType;
protected ChangelogIterator(Iterator<Row> rowIterator, StructType rowType) {
this.rowIterator = rowIterator;
+ this.rowType = rowType;
this.changeTypeIndex =
rowType.fieldIndex(MetadataColumns.CHANGE_TYPE.name());
}
@@ -45,6 +49,16 @@ public abstract class ChangelogIterator implements
Iterator<Row> {
return changeTypeIndex;
}
+ protected StructType rowType() {
+ return rowType;
+ }
+
+ protected String changeType(Row row) {
+ String changeType = row.getString(changeTypeIndex());
+ Preconditions.checkNotNull(changeType, "Change type should not be null");
+ return changeType;
+ }
+
protected Iterator<Row> rowIterator() {
return rowIterator;
}
@@ -79,7 +93,35 @@ public abstract class ChangelogIterator implements
Iterator<Row> {
return Iterators.filter(changelogIterator, Objects::nonNull);
}
+ public static Iterator<Row> removeNetCarryovers(Iterator<Row> rowIterator,
StructType rowType) {
+ ChangelogIterator changelogIterator = new
RemoveNetCarryoverIterator(rowIterator, rowType);
+ return Iterators.filter(changelogIterator, Objects::nonNull);
+ }
+
+ protected boolean isSameRecord(Row currentRow, Row nextRow, int[]
indicesToIdentifySameRow) {
+ for (int idx : indicesToIdentifySameRow) {
+ if (isDifferentValue(currentRow, nextRow, idx)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
protected boolean isDifferentValue(Row currentRow, Row nextRow, int idx) {
return !Objects.equals(nextRow.get(idx), currentRow.get(idx));
}
+
+ protected static int[] generateIndicesToIdentifySameRow(
+ int totalColumnCount, Set<Integer> metadataColumnIndices) {
+ int[] indices = new int[totalColumnCount - metadataColumnIndices.size()];
+
+ for (int i = 0, j = 0; i < indices.length; i++) {
+ if (!metadataColumnIndices.contains(i)) {
+ indices[j] = i;
+ j++;
+ }
+ }
+ return indices;
+ }
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ComputeUpdateIterator.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ComputeUpdateIterator.java
index 23e6a19a17..6951c33e51 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ComputeUpdateIterator.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ComputeUpdateIterator.java
@@ -81,15 +81,13 @@ public class ComputeUpdateIterator extends
ChangelogIterator {
// either a cached record which is not an UPDATE or the next record in the
iterator.
Row currentRow = currentRow();
- if (currentRow.getString(changeTypeIndex()).equals(DELETE) &&
rowIterator().hasNext()) {
+ if (changeType(currentRow).equals(DELETE) && rowIterator().hasNext()) {
Row nextRow = rowIterator().next();
cachedRow = nextRow;
if (sameLogicalRow(currentRow, nextRow)) {
- String nextRowChangeType = nextRow.getString(changeTypeIndex());
-
Preconditions.checkState(
- nextRowChangeType.equals(INSERT),
+ changeType(nextRow).equals(INSERT),
"Cannot compute updates because there are multiple rows with the
same identifier"
+ " fields([%s]). Please make sure the rows are unique.",
String.join(",", identifierFields));
@@ -118,7 +116,7 @@ public class ComputeUpdateIterator extends
ChangelogIterator {
}
private boolean cachedUpdateRecord() {
- return cachedRow != null &&
cachedRow.getString(changeTypeIndex()).equals(UPDATE_AFTER);
+ return cachedRow != null && changeType(cachedRow).equals(UPDATE_AFTER);
}
private Row currentRow() {
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java
index 70b160e13f..2e90dc7749 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java
@@ -19,6 +19,8 @@
package org.apache.iceberg.spark;
import java.util.Iterator;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
@@ -55,7 +57,7 @@ class RemoveCarryoverIterator extends ChangelogIterator {
RemoveCarryoverIterator(Iterator<Row> rowIterator, StructType rowType) {
super(rowIterator, rowType);
- this.indicesToIdentifySameRow =
generateIndicesToIdentifySameRow(rowType.size());
+ this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow();
}
@Override
@@ -88,7 +90,7 @@ class RemoveCarryoverIterator extends ChangelogIterator {
}
// If the current row is a delete row, drain all identical delete rows
- if (currentRow.getString(changeTypeIndex()).equals(DELETE) &&
rowIterator().hasNext()) {
+ if (changeType(currentRow).equals(DELETE) && rowIterator().hasNext()) {
cachedDeletedRow = currentRow;
deletedRowCount = 1;
@@ -98,8 +100,8 @@ class RemoveCarryoverIterator extends ChangelogIterator {
// row is the same record
while (nextRow != null
&& cachedDeletedRow != null
- && isSameRecord(cachedDeletedRow, nextRow)) {
- if (nextRow.getString(changeTypeIndex()).equals(INSERT)) {
+ && isSameRecord(cachedDeletedRow, nextRow,
indicesToIdentifySameRow)) {
+ if (changeType(nextRow).equals(INSERT)) {
deletedRowCount--;
if (deletedRowCount == 0) {
cachedDeletedRow = null;
@@ -139,25 +141,8 @@ class RemoveCarryoverIterator extends ChangelogIterator {
return cachedDeletedRow != null;
}
- private int[] generateIndicesToIdentifySameRow(int columnSize) {
- int[] indices = new int[columnSize - 1];
- for (int i = 0; i < indices.length; i++) {
- if (i < changeTypeIndex()) {
- indices[i] = i;
- } else {
- indices[i] = i + 1;
- }
- }
- return indices;
- }
-
- private boolean isSameRecord(Row currentRow, Row nextRow) {
- for (int idx : indicesToIdentifySameRow) {
- if (isDifferentValue(currentRow, nextRow, idx)) {
- return false;
- }
- }
-
- return true;
+ private int[] generateIndicesToIdentifySameRow() {
+ Set<Integer> metadataColumnIndices = Sets.newHashSet(changeTypeIndex());
+ return generateIndicesToIdentifySameRow(rowType().size(),
metadataColumnIndices);
}
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java
new file mode 100644
index 0000000000..941e4a4731
--- /dev/null
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.Iterator;
+import java.util.Set;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * This class computes the net changes across multiple snapshots. It is
different from {@link
+ * org.apache.iceberg.spark.RemoveCarryoverIterator}, which only removes
carry-over rows within a
+ * single snapshot. It takes a row iterator, and assumes the following:
+ *
+ * <ul>
+ * <li>The row iterator is partitioned by all columns.
+ * <li>The row iterator is sorted by all columns, change order, and change
type. The change order
+ * is 1-to-1 mapping to snapshot id.
+ * </ul>
+ */
+public class RemoveNetCarryoverIterator extends ChangelogIterator {
+
+ private final int[] indicesToIdentifySameRow;
+
+ private Row cachedNextRow;
+ private Row cachedRow;
+ private long cachedRowCount;
+
+ protected RemoveNetCarryoverIterator(Iterator<Row> rowIterator, StructType
rowType) {
+ super(rowIterator, rowType);
+ this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (cachedRowCount > 0) {
+ return true;
+ }
+
+ if (cachedNextRow != null) {
+ return true;
+ }
+
+ return rowIterator().hasNext();
+ }
+
+ @Override
+ public Row next() {
+ // if there are cached rows, return one of them from the beginning
+ if (cachedRowCount > 0) {
+ cachedRowCount--;
+ return cachedRow;
+ }
+
+ cachedRow = getCurrentRow();
+ // return it directly if there is no more rows
+ if (!rowIterator().hasNext()) {
+ return cachedRow;
+ }
+ cachedRowCount = 1;
+
+ cachedNextRow = rowIterator().next();
+
+ // pull rows from the iterator until two consecutive rows are different
+ while (isSameRecord(cachedRow, cachedNextRow, indicesToIdentifySameRow)) {
+ if (oppositeChangeType(cachedRow, cachedNextRow)) {
+ // two rows with opposite change types means no net changes, remove
both
+ cachedRowCount--;
+ } else {
+ // two rows with same change types means potential net changes, cache
the next row
+ cachedRowCount++;
+ }
+
+ // stop pulling rows if there is no more rows or the next row is
different
+ if (cachedRowCount <= 0 || !rowIterator().hasNext()) {
+ // reset the cached next row if there is no more rows
+ cachedNextRow = null;
+ break;
+ }
+
+ cachedNextRow = rowIterator().next();
+ }
+
+ return null;
+ }
+
+ private Row getCurrentRow() {
+ Row currentRow;
+ if (cachedNextRow != null) {
+ currentRow = cachedNextRow;
+ cachedNextRow = null;
+ } else {
+ currentRow = rowIterator().next();
+ }
+ return currentRow;
+ }
+
+ private boolean oppositeChangeType(Row currentRow, Row nextRow) {
+ return (changeType(nextRow).equals(INSERT) &&
changeType(currentRow).equals(DELETE))
+ || (changeType(nextRow).equals(DELETE) &&
changeType(currentRow).equals(INSERT));
+ }
+
+ private int[] generateIndicesToIdentifySameRow() {
+ Set<Integer> metadataColumnIndices =
+ Sets.newHashSet(
+ rowType().fieldIndex(MetadataColumns.CHANGE_ORDINAL.name()),
+ rowType().fieldIndex(MetadataColumns.COMMIT_SNAPSHOT_ID.name()),
+ changeTypeIndex());
+ return generateIndicesToIdentifySameRow(rowType().size(),
metadataColumnIndices);
+ }
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
index 85043d2df3..259254aa2d 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
@@ -21,11 +21,14 @@ package org.apache.iceberg.spark.procedures;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.ChangelogIterator;
import org.apache.iceberg.spark.source.SparkChangelogTable;
import org.apache.spark.api.java.function.MapPartitionsFunction;
@@ -88,10 +91,22 @@ public class CreateChangelogViewProcedure extends
BaseProcedure {
ProcedureParameter.optional("options", STRING_MAP);
private static final ProcedureParameter COMPUTE_UPDATES_PARAM =
ProcedureParameter.optional("compute_updates", DataTypes.BooleanType);
+
+ /**
+ * Enable or disable the remove carry-over rows.
+ *
+ * @deprecated since 1.4.0, will be removed in 1.5.0; The procedure will
always remove carry-over
+ * rows. Please query {@link SparkChangelogTable} instead for the use
cases doesn't remove
+ * carry-over rows.
+ */
+ @Deprecated
private static final ProcedureParameter REMOVE_CARRYOVERS_PARAM =
ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType);
+
private static final ProcedureParameter IDENTIFIER_COLUMNS_PARAM =
ProcedureParameter.optional("identifier_columns", STRING_ARRAY);
+ private static final ProcedureParameter NET_CHANGES =
+ ProcedureParameter.optional("net_changes", DataTypes.BooleanType);
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
@@ -101,6 +116,7 @@ public class CreateChangelogViewProcedure extends
BaseProcedure {
COMPUTE_UPDATES_PARAM,
REMOVE_CARRYOVERS_PARAM,
IDENTIFIER_COLUMNS_PARAM,
+ NET_CHANGES,
};
private static final StructType OUTPUT_TYPE =
@@ -142,10 +158,13 @@ public class CreateChangelogViewProcedure extends
BaseProcedure {
Identifier changelogTableIdent = changelogTableIdent(tableIdent);
Dataset<Row> df = loadRows(changelogTableIdent, options(input));
+ boolean netChanges = input.asBoolean(NET_CHANGES, false);
+
if (shouldComputeUpdateImages(input)) {
+ Preconditions.checkArgument(!netChanges, "Not support net changes with
update images");
df = computeUpdateImages(identifierColumns(input, tableIdent), df);
} else if (shouldRemoveCarryoverRows(input)) {
- df = removeCarryoverRows(df);
+ df = removeCarryoverRows(df, netChanges);
}
String viewName = viewName(input, tableIdent.name());
@@ -164,6 +183,7 @@ public class CreateChangelogViewProcedure extends
BaseProcedure {
for (int i = 0; i < identifierColumns.length; i++) {
repartitionSpec[i] = df.col(identifierColumns[i]);
}
+
repartitionSpec[repartitionSpec.length - 1] =
df.col(MetadataColumns.CHANGE_ORDINAL.name());
return applyChangelogIterator(df, repartitionSpec);
@@ -179,13 +199,23 @@ public class CreateChangelogViewProcedure extends
BaseProcedure {
return input.asBoolean(REMOVE_CARRYOVERS_PARAM, true);
}
- private Dataset<Row> removeCarryoverRows(Dataset<Row> df) {
+ private Dataset<Row> removeCarryoverRows(Dataset<Row> df, boolean
netChanges) {
+ Predicate<String> columnsToKeep;
+ if (netChanges) {
+ Set<String> metadataColumn =
+ Sets.newHashSet(
+ MetadataColumns.CHANGE_TYPE.name(),
+ MetadataColumns.CHANGE_ORDINAL.name(),
+ MetadataColumns.COMMIT_SNAPSHOT_ID.name());
+
+ columnsToKeep = column -> !metadataColumn.contains(column);
+ } else {
+ columnsToKeep = column ->
!column.equals(MetadataColumns.CHANGE_TYPE.name());
+ }
+
Column[] repartitionSpec =
- Arrays.stream(df.columns())
- .filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name()))
- .map(df::col)
- .toArray(Column[]::new);
- return applyCarryoverRemoveIterator(df, repartitionSpec);
+
Arrays.stream(df.columns()).filter(columnsToKeep).map(df::col).toArray(Column[]::new);
+ return applyCarryoverRemoveIterator(df, repartitionSpec, netChanges);
}
private String[] identifierColumns(ProcedureInput input, Identifier
tableIdent) {
@@ -214,7 +244,7 @@ public class CreateChangelogViewProcedure extends
BaseProcedure {
}
private Dataset<Row> applyChangelogIterator(Dataset<Row> df, Column[]
repartitionSpec) {
- Column[] sortSpec = sortSpec(df, repartitionSpec);
+ Column[] sortSpec = sortSpec(df, repartitionSpec, false);
StructType schema = df.schema();
String[] identifierFields =
Arrays.stream(repartitionSpec).map(Column::toString).toArray(String[]::new);
@@ -228,22 +258,33 @@ public class CreateChangelogViewProcedure extends
BaseProcedure {
RowEncoder.apply(schema));
}
- private Dataset<Row> applyCarryoverRemoveIterator(Dataset<Row> df, Column[]
repartitionSpec) {
- Column[] sortSpec = sortSpec(df, repartitionSpec);
+ private Dataset<Row> applyCarryoverRemoveIterator(
+ Dataset<Row> df, Column[] repartitionSpec, boolean netChanges) {
+ Column[] sortSpec = sortSpec(df, repartitionSpec, netChanges);
StructType schema = df.schema();
return df.repartition(repartitionSpec)
.sortWithinPartitions(sortSpec)
.mapPartitions(
(MapPartitionsFunction<Row, Row>)
- rowIterator -> ChangelogIterator.removeCarryovers(rowIterator,
schema),
+ rowIterator ->
+ netChanges
+ ? ChangelogIterator.removeNetCarryovers(rowIterator,
schema)
+ : ChangelogIterator.removeCarryovers(rowIterator,
schema),
RowEncoder.apply(schema));
}
- private static Column[] sortSpec(Dataset<Row> df, Column[] repartitionSpec) {
- Column[] sortSpec = new Column[repartitionSpec.length + 1];
+ private static Column[] sortSpec(Dataset<Row> df, Column[] repartitionSpec,
boolean netChanges) {
+ Column changeType = df.col(MetadataColumns.CHANGE_TYPE.name());
+ Column changeOrdinal = df.col(MetadataColumns.CHANGE_ORDINAL.name());
+ Column[] extraColumns =
+ netChanges ? new Column[] {changeOrdinal, changeType} : new Column[]
{changeType};
+
+ Column[] sortSpec = new Column[repartitionSpec.length +
extraColumns.length];
+
System.arraycopy(repartitionSpec, 0, sortSpec, 0, repartitionSpec.length);
- sortSpec[sortSpec.length - 1] = df.col(MetadataColumns.CHANGE_TYPE.name());
+ System.arraycopy(extraColumns, 0, sortSpec, repartitionSpec.length,
extraColumns.length);
+
return sortSpec;
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java
index bf98bebb9d..0539598f14 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java
@@ -49,7 +49,17 @@ public class TestChangelogIterator extends
SparkTestHelperBase {
new StructField("name", DataTypes.StringType, false,
Metadata.empty()),
new StructField("data", DataTypes.StringType, true,
Metadata.empty()),
new StructField(
- MetadataColumns.CHANGE_TYPE.name(), DataTypes.StringType,
false, Metadata.empty())
+ MetadataColumns.CHANGE_TYPE.name(), DataTypes.StringType,
false, Metadata.empty()),
+ new StructField(
+ MetadataColumns.CHANGE_ORDINAL.name(),
+ DataTypes.IntegerType,
+ false,
+ Metadata.empty()),
+ new StructField(
+ MetadataColumns.COMMIT_SNAPSHOT_ID.name(),
+ DataTypes.LongType,
+ false,
+ Metadata.empty())
});
private static final String[] IDENTIFIER_FIELDS = new String[] {"id",
"name"};
@@ -93,18 +103,18 @@ public class TestChangelogIterator extends
SparkTestHelperBase {
switch (rowType) {
case DELETED:
return Lists.newArrayList(
- new GenericRowWithSchema(new Object[] {index, "b", "data",
DELETE}, null));
+ new GenericRowWithSchema(new Object[] {index, "b", "data", DELETE,
0, 0}, null));
case INSERTED:
return Lists.newArrayList(
- new GenericRowWithSchema(new Object[] {index, "c", "data",
INSERT}, null));
+ new GenericRowWithSchema(new Object[] {index, "c", "data", INSERT,
0, 0}, null));
case CARRY_OVER:
return Lists.newArrayList(
- new GenericRowWithSchema(new Object[] {index, "d", "data",
DELETE}, null),
- new GenericRowWithSchema(new Object[] {index, "d", "data",
INSERT}, null));
+ new GenericRowWithSchema(new Object[] {index, "d", "data", DELETE,
0, 0}, null),
+ new GenericRowWithSchema(new Object[] {index, "d", "data", INSERT,
0, 0}, null));
case UPDATED:
return Lists.newArrayList(
- new GenericRowWithSchema(new Object[] {index, "a", "data",
DELETE}, null),
- new GenericRowWithSchema(new Object[] {index, "a", "new_data",
INSERT}, null));
+ new GenericRowWithSchema(new Object[] {index, "a", "data", DELETE,
0, 0}, null),
+ new GenericRowWithSchema(new Object[] {index, "a", "new_data",
INSERT, 0, 0}, null));
default:
throw new IllegalArgumentException("Unknown row type: " + rowType);
}
@@ -114,18 +124,18 @@ public class TestChangelogIterator extends
SparkTestHelperBase {
switch (rowType) {
case DELETED:
List<Object[]> rows = Lists.newArrayList();
- rows.add(new Object[] {order, "b", "data", DELETE});
+ rows.add(new Object[] {order, "b", "data", DELETE, 0, 0});
return rows;
case INSERTED:
List<Object[]> insertedRows = Lists.newArrayList();
- insertedRows.add(new Object[] {order, "c", "data", INSERT});
+ insertedRows.add(new Object[] {order, "c", "data", INSERT, 0, 0});
return insertedRows;
case CARRY_OVER:
return Lists.newArrayList();
case UPDATED:
return Lists.newArrayList(
- new Object[] {order, "a", "data", UPDATE_BEFORE},
- new Object[] {order, "a", "new_data", UPDATE_AFTER});
+ new Object[] {order, "a", "data", UPDATE_BEFORE, 0, 0},
+ new Object[] {order, "a", "new_data", UPDATE_AFTER, 0, 0});
default:
throw new IllegalArgumentException("Unknown row type: " + rowType);
}
@@ -146,16 +156,16 @@ public class TestChangelogIterator extends
SparkTestHelperBase {
public void testRowsWithNullValue() {
final List<Row> rowsWithNull =
Lists.newArrayList(
- new GenericRowWithSchema(new Object[] {2, null, null, DELETE},
null),
- new GenericRowWithSchema(new Object[] {3, null, null, INSERT},
null),
- new GenericRowWithSchema(new Object[] {4, null, null, DELETE},
null),
- new GenericRowWithSchema(new Object[] {4, null, null, INSERT},
null),
+ new GenericRowWithSchema(new Object[] {2, null, null, DELETE, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {3, null, null, INSERT, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {4, null, null, DELETE, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {4, null, null, INSERT, 0,
0}, null),
// mixed null and non-null value in non-identifier columns
- new GenericRowWithSchema(new Object[] {5, null, null, DELETE},
null),
- new GenericRowWithSchema(new Object[] {5, null, "data", INSERT},
null),
+ new GenericRowWithSchema(new Object[] {5, null, null, DELETE, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {5, null, "data", INSERT, 0,
0}, null),
// mixed null and non-null value in identifier columns
- new GenericRowWithSchema(new Object[] {6, null, null, DELETE},
null),
- new GenericRowWithSchema(new Object[] {6, "name", null, INSERT},
null));
+ new GenericRowWithSchema(new Object[] {6, null, null, DELETE, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {6, "name", null, INSERT, 0,
0}, null));
Iterator<Row> iterator =
ChangelogIterator.computeUpdates(rowsWithNull.iterator(), SCHEMA,
IDENTIFIER_FIELDS);
@@ -164,12 +174,12 @@ public class TestChangelogIterator extends
SparkTestHelperBase {
assertEquals(
"Rows should match",
Lists.newArrayList(
- new Object[] {2, null, null, DELETE},
- new Object[] {3, null, null, INSERT},
- new Object[] {5, null, null, UPDATE_BEFORE},
- new Object[] {5, null, "data", UPDATE_AFTER},
- new Object[] {6, null, null, DELETE},
- new Object[] {6, "name", null, INSERT}),
+ new Object[] {2, null, null, DELETE, 0, 0},
+ new Object[] {3, null, null, INSERT, 0, 0},
+ new Object[] {5, null, null, UPDATE_BEFORE, 0, 0},
+ new Object[] {5, null, "data", UPDATE_AFTER, 0, 0},
+ new Object[] {6, null, null, DELETE, 0, 0},
+ new Object[] {6, "name", null, INSERT, 0, 0}),
rowsToJava(result));
}
@@ -178,10 +188,10 @@ public class TestChangelogIterator extends
SparkTestHelperBase {
List<Row> rowsWithDuplication =
Lists.newArrayList(
// two rows with same identifier fields(id, name)
- new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE},
null),
- new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE},
null),
- new GenericRowWithSchema(new Object[] {1, "a", "new_data",
INSERT}, null),
- new GenericRowWithSchema(new Object[] {1, "a", "new_data",
INSERT}, null));
+ new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {1, "a", "new_data", INSERT,
0, 0}, null),
+ new GenericRowWithSchema(new Object[] {1, "a", "new_data", INSERT,
0, 0}, null));
Iterator<Row> iterator =
ChangelogIterator.computeUpdates(rowsWithDuplication.iterator(),
SCHEMA, IDENTIFIER_FIELDS);
@@ -194,9 +204,9 @@ public class TestChangelogIterator extends
SparkTestHelperBase {
// still allow extra insert rows
rowsWithDuplication =
Lists.newArrayList(
- new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE},
null),
- new GenericRowWithSchema(new Object[] {1, "a", "new_data1",
INSERT}, null),
- new GenericRowWithSchema(new Object[] {1, "a", "new_data2",
INSERT}, null));
+ new GenericRowWithSchema(new Object[] {1, "a", "data", DELETE, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {1, "a", "new_data1",
INSERT, 0, 0}, null),
+ new GenericRowWithSchema(new Object[] {1, "a", "new_data2",
INSERT, 0, 0}, null));
Iterator<Row> iterator1 =
ChangelogIterator.computeUpdates(rowsWithDuplication.iterator(),
SCHEMA, IDENTIFIER_FIELDS);
@@ -204,9 +214,9 @@ public class TestChangelogIterator extends
SparkTestHelperBase {
assertEquals(
"Rows should match.",
Lists.newArrayList(
- new Object[] {1, "a", "data", UPDATE_BEFORE},
- new Object[] {1, "a", "new_data1", UPDATE_AFTER},
- new Object[] {1, "a", "new_data2", INSERT}),
+ new Object[] {1, "a", "data", UPDATE_BEFORE, 0, 0},
+ new Object[] {1, "a", "new_data1", UPDATE_AFTER, 0, 0},
+ new Object[] {1, "a", "new_data2", INSERT, 0, 0}),
rowsToJava(Lists.newArrayList(iterator1)));
}
@@ -216,32 +226,28 @@ public class TestChangelogIterator extends
SparkTestHelperBase {
List<Row> rowsWithDuplication =
Lists.newArrayList(
// keep all delete rows for id 0 and id 1 since there is no insert
row for them
- new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE},
null),
- new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE},
null),
- new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE},
null),
- new GenericRowWithSchema(new Object[] {1, "a", "old_data",
DELETE}, null),
- new GenericRowWithSchema(new Object[] {1, "a", "old_data",
DELETE}, null),
+ new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {0, "a", "data", DELETE, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {1, "a", "old_data", DELETE,
0, 0}, null),
+ new GenericRowWithSchema(new Object[] {1, "a", "old_data", DELETE,
0, 0}, null),
// the same number of delete and insert rows for id 2
- new GenericRowWithSchema(new Object[] {2, "a", "data", DELETE},
null),
- new GenericRowWithSchema(new Object[] {2, "a", "data", DELETE},
null),
- new GenericRowWithSchema(new Object[] {2, "a", "data", INSERT},
null),
- new GenericRowWithSchema(new Object[] {2, "a", "data", INSERT},
null),
- new GenericRowWithSchema(new Object[] {3, "a", "new_data",
INSERT}, null));
-
- Iterator<Row> iterator =
- ChangelogIterator.removeCarryovers(rowsWithDuplication.iterator(),
SCHEMA);
- List<Row> result = Lists.newArrayList(iterator);
+ new GenericRowWithSchema(new Object[] {2, "a", "data", DELETE, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {2, "a", "data", DELETE, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {2, "a", "data", INSERT, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {2, "a", "data", INSERT, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {3, "a", "new_data", INSERT,
0, 0}, null));
- assertEquals(
- "Rows should match.",
+ List<Object[]> expectedRows =
Lists.newArrayList(
- new Object[] {0, "a", "data", DELETE},
- new Object[] {0, "a", "data", DELETE},
- new Object[] {0, "a", "data", DELETE},
- new Object[] {1, "a", "old_data", DELETE},
- new Object[] {1, "a", "old_data", DELETE},
- new Object[] {3, "a", "new_data", INSERT}),
- rowsToJava(result));
+ new Object[] {0, "a", "data", DELETE, 0, 0},
+ new Object[] {0, "a", "data", DELETE, 0, 0},
+ new Object[] {0, "a", "data", DELETE, 0, 0},
+ new Object[] {1, "a", "old_data", DELETE, 0, 0},
+ new Object[] {1, "a", "old_data", DELETE, 0, 0},
+ new Object[] {3, "a", "new_data", INSERT, 0, 0});
+
+ validateIterators(rowsWithDuplication, expectedRows);
}
@Test
@@ -249,45 +255,39 @@ public class TestChangelogIterator extends
SparkTestHelperBase {
// less insert rows than delete rows
List<Row> rowsWithDuplication =
Lists.newArrayList(
- new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE},
null),
- new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE},
null),
- new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT},
null),
- new GenericRowWithSchema(new Object[] {2, "d", "data", INSERT},
null));
+ new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {2, "d", "data", INSERT, 0,
0}, null));
- Iterator<Row> iterator =
- ChangelogIterator.removeCarryovers(rowsWithDuplication.iterator(),
SCHEMA);
- List<Row> result = Lists.newArrayList(iterator);
-
- assertEquals(
- "Rows should match.",
+ List<Object[]> expectedRows =
Lists.newArrayList(
- new Object[] {1, "d", "data", DELETE}, new Object[] {2, "d",
"data", INSERT}),
- rowsToJava(result));
+ new Object[] {1, "d", "data", DELETE, 0, 0},
+ new Object[] {2, "d", "data", INSERT, 0, 0});
+
+ validateIterators(rowsWithDuplication, expectedRows);
}
@Test
public void testCarryRowsRemoveMoreInsertRows() {
List<Row> rowsWithDuplication =
Lists.newArrayList(
- new GenericRowWithSchema(new Object[] {0, "d", "data", DELETE},
null),
- new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE},
null),
- new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE},
null),
- new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE},
null),
+ new GenericRowWithSchema(new Object[] {0, "d", "data", DELETE, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0,
0}, null),
// more insert rows than delete rows, should keep extra insert rows
- new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT},
null),
- new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT},
null),
- new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT},
null),
- new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT},
null));
-
- Iterator<Row> iterator =
- ChangelogIterator.removeCarryovers(rowsWithDuplication.iterator(),
SCHEMA);
- List<Row> result = Lists.newArrayList(iterator);
+ new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0,
0}, null));
- assertEquals(
- "Rows should match.",
+ List<Object[]> expectedRows =
Lists.newArrayList(
- new Object[] {0, "d", "data", DELETE}, new Object[] {1, "d",
"data", INSERT}),
- rowsToJava(result));
+ new Object[] {0, "d", "data", DELETE, 0, 0},
+ new Object[] {1, "d", "data", INSERT, 0, 0});
+
+ validateIterators(rowsWithDuplication, expectedRows);
}
@Test
@@ -296,17 +296,64 @@ public class TestChangelogIterator extends
SparkTestHelperBase {
List<Row> rowsWithDuplication =
Lists.newArrayList(
// next two rows are identical
- new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE},
null),
- new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE},
null));
+ new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0,
0}, null));
+
+ List<Object[]> expectedRows =
+ Lists.newArrayList(
+ new Object[] {1, "d", "data", DELETE, 0, 0},
+ new Object[] {1, "d", "data", DELETE, 0, 0});
+ validateIterators(rowsWithDuplication, expectedRows);
+ }
+
+ private void validateIterators(List<Row> rowsWithDuplication, List<Object[]>
expectedRows) {
Iterator<Row> iterator =
ChangelogIterator.removeCarryovers(rowsWithDuplication.iterator(),
SCHEMA);
List<Row> result = Lists.newArrayList(iterator);
- assertEquals(
- "Duplicate rows should not be removed",
+ assertEquals("Rows should match.", expectedRows, rowsToJava(result));
+
+ iterator =
ChangelogIterator.removeNetCarryovers(rowsWithDuplication.iterator(), SCHEMA);
+ result = Lists.newArrayList(iterator);
+
+ assertEquals("Rows should match.", expectedRows, rowsToJava(result));
+ }
+
+ @Test
+ public void testRemoveNetCarryovers() {
+ List<Row> rowsWithDuplication =
Lists.newArrayList(
- new Object[] {1, "d", "data", DELETE}, new Object[] {1, "d",
"data", DELETE}),
- rowsToJava(result));
+ // this row are different from other rows, it is a net change,
should be kept
+ new GenericRowWithSchema(new Object[] {0, "d", "data", DELETE, 0,
0}, null),
+ // a pair of delete and insert rows, should be removed
+ new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 0,
0}, null),
+ new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 0,
0}, null),
+ // 2 delete rows and 2 insert rows, should be removed
+ new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 1,
1}, null),
+ new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 1,
1}, null),
+ new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 1,
1}, null),
+ new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 1,
1}, null),
+ // a pair of insert and delete rows across snapshots, should be
removed
+ new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 2,
2}, null),
+ new GenericRowWithSchema(new Object[] {1, "d", "data", DELETE, 3,
3}, null),
+ // extra insert rows, they are net changes, should be kept
+ new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 4,
4}, null),
+ new GenericRowWithSchema(new Object[] {1, "d", "data", INSERT, 4,
4}, null),
+ // different key, net changes, should be kept
+ new GenericRowWithSchema(new Object[] {2, "d", "data", DELETE, 4,
4}, null));
+
+ List<Object[]> expectedRows =
+ Lists.newArrayList(
+ new Object[] {0, "d", "data", DELETE, 0, 0},
+ new Object[] {1, "d", "data", INSERT, 4, 4},
+ new Object[] {1, "d", "data", INSERT, 4, 4},
+ new Object[] {2, "d", "data", DELETE, 4, 4});
+
+ Iterator<Row> iterator =
+ ChangelogIterator.removeNetCarryovers(rowsWithDuplication.iterator(),
SCHEMA);
+ List<Row> result = Lists.newArrayList(iterator);
+
+ assertEquals("Rows should match.", expectedRows, rowsToJava(result));
}
}