This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new fc5b2b336c Core: Use 'delete' if RowDelta only has delete files
(#10123)
fc5b2b336c is described below
commit fc5b2b336c774b0b8b032f7d87a1fb21e76b3f20
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Tue Apr 16 09:56:33 2024 +0200
Core: Use 'delete' if RowDelta only has delete files (#10123)
---
.../main/java/org/apache/iceberg/BaseRowDelta.java | 4 ++++
.../org/apache/iceberg/TestCommitReporting.java | 2 +-
.../test/java/org/apache/iceberg/TestRowDelta.java | 12 ++++++++++
.../SparkRowLevelOperationsTestBase.java | 14 +++++++----
.../iceberg/spark/extensions/TestDelete.java | 27 ++++++++++++++++++++++
.../spark/source/TestStructuredStreamingRead3.java | 13 ++++++++++-
.../SparkRowLevelOperationsTestBase.java | 13 +++++++----
.../iceberg/spark/extensions/TestDelete.java | 27 ++++++++++++++++++++++
.../spark/source/TestStructuredStreamingRead3.java | 13 ++++++++++-
.../SparkRowLevelOperationsTestBase.java | 14 +++++++----
.../iceberg/spark/extensions/TestDelete.java | 26 +++++++++++++++++++++
.../spark/source/TestStructuredStreamingRead3.java | 12 +++++++++-
12 files changed, 160 insertions(+), 17 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java
b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java
index a8e60045ac..42fd17f032 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java
@@ -43,6 +43,10 @@ class BaseRowDelta extends MergingSnapshotProducer<RowDelta>
implements RowDelta
@Override
protected String operation() {
+ if (addsDeleteFiles() && !addsDataFiles()) {
+ return DataOperations.DELETE;
+ }
+
return DataOperations.OVERWRITE;
}
diff --git a/core/src/test/java/org/apache/iceberg/TestCommitReporting.java
b/core/src/test/java/org/apache/iceberg/TestCommitReporting.java
index 48b9c8d72b..bbba192fab 100644
--- a/core/src/test/java/org/apache/iceberg/TestCommitReporting.java
+++ b/core/src/test/java/org/apache/iceberg/TestCommitReporting.java
@@ -102,7 +102,7 @@ public class TestCommitReporting extends TestBase {
CommitReport report = reporter.lastCommitReport();
assertThat(report).isNotNull();
- assertThat(report.operation()).isEqualTo("overwrite");
+ assertThat(report.operation()).isEqualTo("delete");
assertThat(report.snapshotId()).isEqualTo(1L);
assertThat(report.sequenceNumber()).isEqualTo(1L);
assertThat(report.tableName()).isEqualTo(tableName);
diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java
b/core/src/test/java/org/apache/iceberg/TestRowDelta.java
index 867e4b062f..a2a043e630 100644
--- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java
+++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java
@@ -55,6 +55,18 @@ public class TestRowDelta extends V2TableTestBase {
return Arrays.asList(new Object[] {2, "main"}, new Object[] {2,
"testBranch"});
}
+ @TestTemplate
+ public void addOnlyDeleteFilesProducesDeleteOperation() {
+ SnapshotUpdate<?> rowDelta =
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_B_DELETES);
+
+ commit(table, rowDelta, branch);
+ Snapshot snap = latestSnapshot(table, branch);
+ assertThat(snap.sequenceNumber()).isEqualTo(1);
+ assertThat(snap.operation()).isEqualTo(DataOperations.DELETE);
+ assertThat(snap.deleteManifests(table.io())).hasSize(1);
+ }
+
@TestTemplate
public void testAddDeleteFile() {
SnapshotUpdate<?> rowDelta =
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
index 9a1257ba39..7398e38300 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
@@ -30,6 +30,7 @@ import static
org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_HASH;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_NONE;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_RANGE;
+import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -255,8 +256,9 @@ public abstract class SparkRowLevelOperationsTestBase
extends SparkExtensionsTes
String changedPartitionCount,
String addedDeleteFiles,
String addedDataFiles) {
+ String operation = null == addedDataFiles && null != addedDeleteFiles ?
DELETE : OVERWRITE;
validateSnapshot(
- snapshot, OVERWRITE, changedPartitionCount, null, addedDeleteFiles,
addedDataFiles);
+ snapshot, operation, changedPartitionCount, null, addedDeleteFiles,
addedDataFiles);
}
protected void validateSnapshot(
@@ -286,9 +288,13 @@ public abstract class SparkRowLevelOperationsTestBase
extends SparkExtensionsTes
}
protected void validateProperty(Snapshot snapshot, String property, String
expectedValue) {
- String actual = snapshot.summary().get(property);
- Assert.assertEquals(
- "Snapshot property " + property + " has unexpected value.",
expectedValue, actual);
+ if (null == expectedValue) {
+ assertThat(snapshot.summary()).doesNotContainKey(property);
+ } else {
+ assertThat(snapshot.summary())
+ .as("Snapshot property " + property + " has unexpected value.")
+ .containsEntry(property, expectedValue);
+ }
}
protected void sleep(long millis) {
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index 6020e00b32..cdc508f985 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -19,12 +19,14 @@
package org.apache.iceberg.spark.extensions;
import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
+import static org.apache.iceberg.SnapshotSummary.ADD_POS_DELETE_FILES_PROP;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_MODE;
import static org.apache.iceberg.TableProperties.DELETE_MODE_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
import static org.apache.spark.sql.functions.lit;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;
import java.util.Arrays;
@@ -334,6 +336,31 @@ public abstract class TestDelete extends
SparkRowLevelOperationsTestBase {
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget()));
}
+ @Test
+ public void deleteSingleRecordProducesDeleteOperation() throws
NoSuchTableException {
+ createAndInitPartitionedTable();
+ append(tableName, new Employee(1, "eng"), new Employee(2, "eng"), new
Employee(3, "eng"));
+
+ sql("DELETE FROM %s WHERE id = 2", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ assertThat(table.snapshots()).hasSize(2);
+
+ Snapshot currentSnapshot = table.currentSnapshot();
+
+ if (mode(table) == COPY_ON_WRITE) {
+ // this is an OverwriteFiles and produces "overwrite"
+ validateCopyOnWrite(currentSnapshot, "1", "1", "1");
+ } else {
+ // this is a RowDelta that produces a "delete" instead of "overwrite"
+ validateMergeOnRead(currentSnapshot, "1", "1", null);
+ validateProperty(currentSnapshot, ADD_POS_DELETE_FILES_PROP, "1");
+ }
+
+ assertThat(sql("SELECT * FROM %s", tableName))
+ .containsExactlyInAnyOrder(row(1, "eng"), row(3, "eng"));
+ }
+
@Test
public void testDeleteWithoutCondition() {
createAndInitPartitionedTable();
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index a2d0c9acaf..c706603d06 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -31,8 +31,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
@@ -476,7 +479,15 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
dataDeletes,
deleteRowSchema);
- table.newRowDelta().addDeletes(eqDeletes).commit();
+ DataFile dataFile =
+ DataFiles.builder(table.spec())
+ .withPath(temp.newFile().toString())
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .withFormat(FileFormat.PARQUET)
+ .build();
+
+ table.newRowDelta().addRows(dataFile).addDeletes(eqDeletes).commit();
// check pre-condition - that the above Delete file write - actually
resulted in snapshot of
// type OVERWRITE
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
index e4605a7658..463cf2a47d 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
@@ -292,8 +292,9 @@ public abstract class SparkRowLevelOperationsTestBase
extends SparkExtensionsTes
String changedPartitionCount,
String addedDeleteFiles,
String addedDataFiles) {
+ String operation = null == addedDataFiles && null != addedDeleteFiles ?
DELETE : OVERWRITE;
validateSnapshot(
- snapshot, OVERWRITE, changedPartitionCount, null, addedDeleteFiles,
addedDataFiles);
+ snapshot, operation, changedPartitionCount, null, addedDeleteFiles,
addedDataFiles);
}
protected void validateSnapshot(
@@ -323,9 +324,13 @@ public abstract class SparkRowLevelOperationsTestBase
extends SparkExtensionsTes
}
protected void validateProperty(Snapshot snapshot, String property, String
expectedValue) {
- String actual = snapshot.summary().get(property);
- Assert.assertEquals(
- "Snapshot property " + property + " has unexpected value.",
expectedValue, actual);
+ if (null == expectedValue) {
+ assertThat(snapshot.summary()).doesNotContainKey(property);
+ } else {
+ assertThat(snapshot.summary())
+ .as("Snapshot property " + property + " has unexpected value.")
+ .containsEntry(property, expectedValue);
+ }
}
protected void sleep(long millis) {
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index b901d567f6..731dedbd48 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.spark.extensions;
import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
+import static org.apache.iceberg.SnapshotSummary.ADD_POS_DELETE_FILES_PROP;
import static org.apache.iceberg.TableProperties.DELETE_DISTRIBUTION_MODE;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_MODE;
@@ -27,6 +28,7 @@ import static
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
import static org.apache.spark.sql.functions.lit;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;
import java.util.Arrays;
@@ -502,6 +504,31 @@ public abstract class TestDelete extends
SparkRowLevelOperationsTestBase {
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget()));
}
+ @Test
+ public void deleteSingleRecordProducesDeleteOperation() throws
NoSuchTableException {
+ createAndInitPartitionedTable();
+ append(tableName, new Employee(1, "eng"), new Employee(2, "eng"), new
Employee(3, "eng"));
+
+ sql("DELETE FROM %s WHERE id = 2", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ assertThat(table.snapshots()).hasSize(2);
+
+ Snapshot currentSnapshot = table.currentSnapshot();
+
+ if (mode(table) == COPY_ON_WRITE) {
+ // this is an OverwriteFiles and produces "overwrite"
+ validateCopyOnWrite(currentSnapshot, "1", "1", "1");
+ } else {
+ // this is a RowDelta that produces a "delete" instead of "overwrite"
+ validateMergeOnRead(currentSnapshot, "1", "1", null);
+ validateProperty(currentSnapshot, ADD_POS_DELETE_FILES_PROP, "1");
+ }
+
+ assertThat(sql("SELECT * FROM %s", tableName))
+ .containsExactlyInAnyOrder(row(1, "eng"), row(3, "eng"));
+ }
+
@Test
public void testDeleteWithoutCondition() {
createAndInitPartitionedTable();
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index 1c08744f56..c1bbf304fa 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -30,8 +30,11 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
@@ -483,7 +486,15 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
dataDeletes,
deleteRowSchema);
- table.newRowDelta().addDeletes(eqDeletes).commit();
+ DataFile dataFile =
+ DataFiles.builder(table.spec())
+ .withPath(temp.newFile().toString())
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .withFormat(FileFormat.PARQUET)
+ .build();
+
+ table.newRowDelta().addRows(dataFile).addDeletes(eqDeletes).commit();
// check pre-condition - that the above Delete file write - actually
resulted in snapshot of
// type OVERWRITE
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
index ea1040dcf0..a926388e4a 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
@@ -285,8 +285,9 @@ public abstract class SparkRowLevelOperationsTestBase
extends ExtensionsTestBase
String changedPartitionCount,
String addedDeleteFiles,
String addedDataFiles) {
+ String operation = null == addedDataFiles && null != addedDeleteFiles ?
DELETE : OVERWRITE;
validateSnapshot(
- snapshot, OVERWRITE, changedPartitionCount, null, addedDeleteFiles,
addedDataFiles);
+ snapshot, operation, changedPartitionCount, null, addedDeleteFiles,
addedDataFiles);
}
protected void validateSnapshot(
@@ -317,10 +318,13 @@ public abstract class SparkRowLevelOperationsTestBase
extends ExtensionsTestBase
}
protected void validateProperty(Snapshot snapshot, String property, String
expectedValue) {
- String actual = snapshot.summary().get(property);
- assertThat(actual)
- .as("Snapshot property " + property + " has unexpected value.")
- .isEqualTo(expectedValue);
+ if (null == expectedValue) {
+ assertThat(snapshot.summary()).doesNotContainKey(property);
+ } else {
+ assertThat(snapshot.summary())
+ .as("Snapshot property " + property + " has unexpected value.")
+ .containsEntry(property, expectedValue);
+ }
}
protected void sleep(long millis) {
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index e9a8c13be5..05887d2a8b 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.spark.extensions;
import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
+import static org.apache.iceberg.SnapshotSummary.ADD_POS_DELETE_FILES_PROP;
import static org.apache.iceberg.TableProperties.DELETE_DISTRIBUTION_MODE;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_MODE;
@@ -501,6 +502,31 @@ public abstract class TestDelete extends
SparkRowLevelOperationsTestBase {
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget()));
}
+ @TestTemplate
+ public void deleteSingleRecordProducesDeleteOperation() throws
NoSuchTableException {
+ createAndInitPartitionedTable();
+ append(tableName, new Employee(1, "eng"), new Employee(2, "eng"), new
Employee(3, "eng"));
+
+ sql("DELETE FROM %s WHERE id = 2", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ assertThat(table.snapshots()).hasSize(2);
+
+ Snapshot currentSnapshot = table.currentSnapshot();
+
+ if (mode(table) == COPY_ON_WRITE) {
+ // this is an OverwriteFiles and produces "overwrite"
+ validateCopyOnWrite(currentSnapshot, "1", "1", "1");
+ } else {
+ // this is a RowDelta that produces a "delete" instead of "overwrite"
+ validateMergeOnRead(currentSnapshot, "1", "1", null);
+ validateProperty(currentSnapshot, ADD_POS_DELETE_FILES_PROP, "1");
+ }
+
+ assertThat(sql("SELECT * FROM %s", tableName))
+ .containsExactlyInAnyOrder(row(1, "eng"), row(3, "eng"));
+ }
+
@TestTemplate
public void testDeleteWithoutCondition() {
createAndInitPartitionedTable();
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index 22e7df0f4e..a5bcf53bd7 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -32,8 +32,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.RewriteFiles;
@@ -484,7 +486,15 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
dataDeletes,
deleteRowSchema);
- table.newRowDelta().addDeletes(eqDeletes).commit();
+ DataFile dataFile =
+ DataFiles.builder(table.spec())
+ .withPath(File.createTempFile("junit", null,
temp.toFile()).getPath())
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .withFormat(FileFormat.PARQUET)
+ .build();
+
+ table.newRowDelta().addRows(dataFile).addDeletes(eqDeletes).commit();
// check pre-condition - that the above Delete file write - actually
resulted in snapshot of
// type OVERWRITE