rajarshisarkar commented on a change in pull request #3517:
URL: https://github.com/apache/iceberg/pull/3517#discussion_r794698122



##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -200,13 +202,25 @@ public void stop() {
 
   private boolean shouldProcess(Snapshot snapshot) {
     String op = snapshot.operation();
-    Preconditions.checkState(!op.equals(DataOperations.DELETE) || skipDelete,
-        "Cannot process delete snapshot: %s, to ignore deletes, set %s=true.",
-        snapshot.snapshotId(), 
SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS);
-    Preconditions.checkState(
-        op.equals(DataOperations.DELETE) || op.equals(DataOperations.APPEND) 
|| op.equals(DataOperations.REPLACE),
-        "Cannot process %s snapshot: %s", op.toLowerCase(Locale.ROOT), 
snapshot.snapshotId());
-    return op.equals(DataOperations.APPEND);
+    switch (op) {
+      case DataOperations.APPEND:
+        return true;
+      case DataOperations.REPLACE:
+        return false;
+      case DataOperations.DELETE:
+        Preconditions.checkState(skipDelete,
+            "Cannot process delete snapshot : %s. Set read option %s to allow 
skipping snapshots of type delete",
+            snapshot.snapshotId(), 
SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS);
+        return false;
+      case DataOperations.OVERWRITE:
+        Preconditions.checkState(skipOverwrite,
+            "Cannot process overwrite snapshot : %s. Set read option %s to 
allow skipping snapshots of type overwrite",
+            snapshot.snapshotId(), 
SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS);
+        return false;
+      default:
+        throw new IllegalStateException(String.format(
+            "Cannot process %s snapshot: %s", op.toLowerCase(Locale.ROOT), 
snapshot.snapshotId()));

Review comment:
       Thanks, this message seems better.

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -200,13 +202,25 @@ public void stop() {
 
   private boolean shouldProcess(Snapshot snapshot) {
     String op = snapshot.operation();
-    Preconditions.checkState(!op.equals(DataOperations.DELETE) || skipDelete,
-        "Cannot process delete snapshot: %s, to ignore deletes, set %s=true.",
-        snapshot.snapshotId(), 
SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS);
-    Preconditions.checkState(
-        op.equals(DataOperations.DELETE) || op.equals(DataOperations.APPEND) 
|| op.equals(DataOperations.REPLACE),
-        "Cannot process %s snapshot: %s", op.toLowerCase(Locale.ROOT), 
snapshot.snapshotId());
-    return op.equals(DataOperations.APPEND);
+    switch (op) {
+      case DataOperations.APPEND:
+        return true;
+      case DataOperations.REPLACE:
+        return false;
+      case DataOperations.DELETE:
+        Preconditions.checkState(skipDelete,
+            "Cannot process delete snapshot : %s. Set read option %s to allow 
skipping snapshots of type delete",
+            snapshot.snapshotId(), 
SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS);
+        return false;
+      case DataOperations.OVERWRITE:
+        Preconditions.checkState(skipOverwrite,
+            "Cannot process overwrite snapshot : %s. Set read option %s to 
allow skipping snapshots of type overwrite",

Review comment:
       Sure.

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -200,13 +202,25 @@ public void stop() {
 
   private boolean shouldProcess(Snapshot snapshot) {
     String op = snapshot.operation();
-    Preconditions.checkState(!op.equals(DataOperations.DELETE) || skipDelete,
-        "Cannot process delete snapshot: %s, to ignore deletes, set %s=true.",
-        snapshot.snapshotId(), 
SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS);
-    Preconditions.checkState(
-        op.equals(DataOperations.DELETE) || op.equals(DataOperations.APPEND) 
|| op.equals(DataOperations.REPLACE),
-        "Cannot process %s snapshot: %s", op.toLowerCase(Locale.ROOT), 
snapshot.snapshotId());
-    return op.equals(DataOperations.APPEND);
+    switch (op) {
+      case DataOperations.APPEND:
+        return true;
+      case DataOperations.REPLACE:
+        return false;
+      case DataOperations.DELETE:
+        Preconditions.checkState(skipDelete,
+            "Cannot process delete snapshot : %s. Set read option %s to allow 
skipping snapshots of type delete",

Review comment:
       Sure.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to