This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 2424e2c31d Flink: Port #10992 to v1.19 (#10994)
2424e2c31d is described below

commit 2424e2c31dbee75d642fed4c10bf5bd9ebbe09bd
Author: pvary <[email protected]>
AuthorDate: Fri Aug 23 17:51:00 2024 +0200

    Flink: Port #10992 to v1.19 (#10994)
---
 .../maintenance/operator/JdbcLockFactory.java      |  14 +-
 .../flink/maintenance/operator/TableChange.java    | 193 ++++++++++++++-------
 .../maintenance/operator/TriggerEvaluator.java     |  85 +++++----
 .../maintenance/operator/TestMonitorSource.java    |  27 +--
 .../maintenance/operator/TestTriggerManager.java   | 182 ++++++++++++-------
 5 files changed, 329 insertions(+), 172 deletions(-)

diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java
index 21c8935abe..f22be33aea 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java
@@ -105,12 +105,12 @@ public class JdbcLockFactory implements 
TriggerLockFactory {
 
   @Override
   public Lock createLock() {
-    return new Lock(pool, lockId, Type.MAINTENANCE);
+    return new JdbcLock(pool, lockId, Type.MAINTENANCE);
   }
 
   @Override
   public Lock createRecoveryLock() {
-    return new Lock(pool, lockId, Type.RECOVERY);
+    return new JdbcLock(pool, lockId, Type.RECOVERY);
   }
 
   @Override
@@ -153,12 +153,12 @@ public class JdbcLockFactory implements 
TriggerLockFactory {
     }
   }
 
-  public static class Lock implements TriggerLockFactory.Lock {
+  private static class JdbcLock implements TriggerLockFactory.Lock {
     private final JdbcClientPool pool;
     private final String lockId;
     private final Type type;
 
-    public Lock(JdbcClientPool pool, String lockId, Type type) {
+    private JdbcLock(JdbcClientPool pool, String lockId, Type type) {
       this.pool = pool;
       this.lockId = lockId;
       this.type = type;
@@ -221,7 +221,7 @@ public class JdbcLockFactory implements TriggerLockFactory {
         throw new UncheckedInterruptedException(e, "Interrupted during 
isHeld");
       } catch (SQLException e) {
         // SQL exception happened when getting lock information
-        throw new UncheckedSQLException(e, "Failed to get lock information for 
%s", this);
+        throw new UncheckedSQLException(e, "Failed to check the state of the 
lock %s", this);
       }
     }
 
@@ -266,8 +266,6 @@ public class JdbcLockFactory implements TriggerLockFactory {
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new UncheckedInterruptedException(e, "Interrupted during 
unlock");
-      } catch (UncheckedSQLException e) {
-        throw e;
       } catch (SQLException e) {
         // SQL exception happened when getting/updating lock information
         throw new UncheckedSQLException(e, "Failed to remove lock %s", this);
@@ -312,7 +310,7 @@ public class JdbcLockFactory implements TriggerLockFactory {
     MAINTENANCE("m"),
     RECOVERY("r");
 
-    private String key;
+    private final String key;
 
     Type(String key) {
       this.key = key;
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
index 7d0b94e97d..5252cf61b0 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
@@ -29,19 +29,29 @@ import 
org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 /** Event describing changes in an Iceberg table */
 @Internal
 class TableChange {
-  private int dataFileNum;
-  private int deleteFileNum;
-  private long dataFileSize;
-  private long deleteFileSize;
-  private int commitNum;
-
-  private TableChange(
-      int dataFileNum, int deleteFileNum, long dataFileSize, long 
deleteFileSize, int commitNum) {
-    this.dataFileNum = dataFileNum;
-    this.deleteFileNum = deleteFileNum;
-    this.dataFileSize = dataFileSize;
-    this.deleteFileSize = deleteFileSize;
-    this.commitNum = commitNum;
+  private int dataFileCount;
+  private long dataFileSizeInBytes;
+  private int posDeleteFileCount;
+  private long posDeleteRecordCount;
+  private int eqDeleteFileCount;
+  private long eqDeleteRecordCount;
+  private int commitCount;
+
+  TableChange(
+      int dataFileCount,
+      long dataFileSizeInBytes,
+      int posDeleteFileCount,
+      long posDeleteRecordCount,
+      int eqDeleteFileCount,
+      long eqDeleteRecordCount,
+      int commitCount) {
+    this.dataFileCount = dataFileCount;
+    this.dataFileSizeInBytes = dataFileSizeInBytes;
+    this.posDeleteFileCount = posDeleteFileCount;
+    this.posDeleteRecordCount = posDeleteRecordCount;
+    this.eqDeleteFileCount = eqDeleteFileCount;
+    this.eqDeleteRecordCount = eqDeleteRecordCount;
+    this.commitCount = commitCount;
   }
 
   TableChange(Snapshot snapshot, FileIO io) {
@@ -50,67 +60,96 @@ class TableChange {
 
     dataFiles.forEach(
         dataFile -> {
-          this.dataFileNum++;
-          this.dataFileSize += dataFile.fileSizeInBytes();
+          this.dataFileCount++;
+          this.dataFileSizeInBytes += dataFile.fileSizeInBytes();
         });
 
     deleteFiles.forEach(
         deleteFile -> {
-          this.deleteFileNum++;
-          this.deleteFileSize += deleteFile.fileSizeInBytes();
+          switch (deleteFile.content()) {
+            case POSITION_DELETES:
+              this.posDeleteFileCount++;
+              this.posDeleteRecordCount += deleteFile.recordCount();
+              break;
+            case EQUALITY_DELETES:
+              this.eqDeleteFileCount++;
+              this.eqDeleteRecordCount += deleteFile.recordCount();
+              break;
+            default:
+              throw new IllegalArgumentException("Unexpected delete file 
content: " + deleteFile);
+          }
         });
 
-    this.commitNum = 1;
+    this.commitCount = 1;
   }
 
   static TableChange empty() {
-    return new TableChange(0, 0, 0L, 0L, 0);
+    return new TableChange(0, 0L, 0, 0L, 0, 0L, 0);
   }
 
   static Builder builder() {
     return new Builder();
   }
 
-  int dataFileNum() {
-    return dataFileNum;
+  int dataFileCount() {
+    return dataFileCount;
+  }
+
+  long dataFileSizeInBytes() {
+    return dataFileSizeInBytes;
   }
 
-  int deleteFileNum() {
-    return deleteFileNum;
+  int posDeleteFileCount() {
+    return posDeleteFileCount;
   }
 
-  long dataFileSize() {
-    return dataFileSize;
+  long posDeleteRecordCount() {
+    return posDeleteRecordCount;
   }
 
-  long deleteFileSize() {
-    return deleteFileSize;
+  int eqDeleteFileCount() {
+    return eqDeleteFileCount;
   }
 
-  public int commitNum() {
-    return commitNum;
+  long eqDeleteRecordCount() {
+    return eqDeleteRecordCount;
+  }
+
+  public int commitCount() {
+    return commitCount;
   }
 
   public void merge(TableChange other) {
-    this.dataFileNum += other.dataFileNum;
-    this.deleteFileNum += other.deleteFileNum;
-    this.dataFileSize += other.dataFileSize;
-    this.deleteFileSize += other.deleteFileSize;
-    this.commitNum += other.commitNum;
+    this.dataFileCount += other.dataFileCount;
+    this.dataFileSizeInBytes += other.dataFileSizeInBytes;
+    this.posDeleteFileCount += other.posDeleteFileCount;
+    this.posDeleteRecordCount += other.posDeleteRecordCount;
+    this.eqDeleteFileCount += other.eqDeleteFileCount;
+    this.eqDeleteRecordCount += other.eqDeleteRecordCount;
+    this.commitCount += other.commitCount;
   }
 
   TableChange copy() {
-    return new TableChange(dataFileNum, deleteFileNum, dataFileSize, 
deleteFileSize, commitNum);
+    return new TableChange(
+        dataFileCount,
+        dataFileSizeInBytes,
+        posDeleteFileCount,
+        posDeleteRecordCount,
+        eqDeleteFileCount,
+        eqDeleteRecordCount,
+        commitCount);
   }
 
   @Override
   public String toString() {
     return MoreObjects.toStringHelper(this)
-        .add("dataFileNum", dataFileNum)
-        .add("deleteFileNum", deleteFileNum)
-        .add("dataFileSize", dataFileSize)
-        .add("deleteFileSize", deleteFileSize)
-        .add("commitNum", commitNum)
+        .add("dataFileCount", dataFileCount)
+        .add("dataFileSizeInBytes", dataFileSizeInBytes)
+        .add("posDeleteFileCount", posDeleteFileCount)
+        .add("posDeleteRecordCount", posDeleteRecordCount)
+        .add("eqDeleteFileCount", eqDeleteFileCount)
+        .add("eqDeleteRecordCount", eqDeleteRecordCount)
+        .add("commitCount", commitCount)
         .toString();
   }
 
@@ -123,52 +162,80 @@ class TableChange {
     }
 
     TableChange that = (TableChange) other;
-    return dataFileNum == that.dataFileNum
-        && deleteFileNum == that.deleteFileNum
-        && dataFileSize == that.dataFileSize
-        && deleteFileSize == that.deleteFileSize
-        && commitNum == that.commitNum;
+    return dataFileCount == that.dataFileCount
+        && dataFileSizeInBytes == that.dataFileSizeInBytes
+        && posDeleteFileCount == that.posDeleteFileCount
+        && posDeleteRecordCount == that.posDeleteRecordCount
+        && eqDeleteFileCount == that.eqDeleteFileCount
+        && eqDeleteRecordCount == that.eqDeleteRecordCount
+        && commitCount == that.commitCount;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(dataFileNum, deleteFileNum, dataFileSize, 
deleteFileSize, commitNum);
+    return Objects.hash(
+        dataFileCount,
+        dataFileSizeInBytes,
+        posDeleteFileCount,
+        posDeleteRecordCount,
+        eqDeleteFileCount,
+        eqDeleteRecordCount,
+        commitCount);
   }
 
   static class Builder {
-    private int dataFileNum = 0;
-    private int deleteFileNum = 0;
-    private long dataFileSize = 0L;
-    private long deleteFileSize = 0L;
-    private int commitNum = 0;
-
-    public Builder dataFileNum(int newDataFileNum) {
-      this.dataFileNum = newDataFileNum;
+    private int dataFileCount = 0;
+    private long dataFileSizeInBytes = 0L;
+    private int posDeleteFileCount = 0;
+    private long posDeleteRecordCount = 0L;
+    private int eqDeleteFileCount = 0;
+    private long eqDeleteRecordCount = 0L;
+    private int commitCount = 0;
+
+    public Builder dataFileCount(int newDataFileCount) {
+      this.dataFileCount = newDataFileCount;
+      return this;
+    }
+
+    public Builder dataFileSizeInBytes(long newDataFileSizeInBytes) {
+      this.dataFileSizeInBytes = newDataFileSizeInBytes;
+      return this;
+    }
+
+    public Builder posDeleteFileCount(int newPosDeleteFileCount) {
+      this.posDeleteFileCount = newPosDeleteFileCount;
       return this;
     }
 
-    public Builder deleteFileNum(int newDeleteFileNum) {
-      this.deleteFileNum = newDeleteFileNum;
+    public Builder posDeleteRecordCount(long newPosDeleteRecordCount) {
+      this.posDeleteRecordCount = newPosDeleteRecordCount;
       return this;
     }
 
-    public Builder dataFileSize(long newDataFileSize) {
-      this.dataFileSize = newDataFileSize;
+    public Builder eqDeleteFileCount(int newEqDeleteFileCount) {
+      this.eqDeleteFileCount = newEqDeleteFileCount;
       return this;
     }
 
-    public Builder deleteFileSize(long newDeleteFileSize) {
-      this.deleteFileSize = newDeleteFileSize;
+    public Builder eqDeleteRecordCount(long newEqDeleteRecordCount) {
+      this.eqDeleteRecordCount = newEqDeleteRecordCount;
       return this;
     }
 
-    public Builder commitNum(int newCommitNum) {
-      this.commitNum = newCommitNum;
+    public Builder commitCount(int newCommitCount) {
+      this.commitCount = newCommitCount;
       return this;
     }
 
     public TableChange build() {
-      return new TableChange(dataFileNum, deleteFileNum, dataFileSize, 
deleteFileSize, commitNum);
+      return new TableChange(
+          dataFileCount,
+          dataFileSizeInBytes,
+          posDeleteFileCount,
+          posDeleteRecordCount,
+          eqDeleteFileCount,
+          eqDeleteRecordCount,
+          commitCount);
     }
   }
 }
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerEvaluator.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerEvaluator.java
index 37e4e3afd4..dba33b22a4 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerEvaluator.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerEvaluator.java
@@ -40,15 +40,7 @@ class TriggerEvaluator implements Serializable {
 
   boolean check(TableChange event, long lastTimeMs, long currentTimeMs) {
     boolean result =
-        predicates.stream()
-            .anyMatch(
-                p -> {
-                  try {
-                    return p.evaluate(event, lastTimeMs, currentTimeMs);
-                  } catch (Exception e) {
-                    throw new RuntimeException("Error accessing state", e);
-                  }
-                });
+        predicates.stream().anyMatch(p -> p.evaluate(event, lastTimeMs, 
currentTimeMs));
     LOG.debug(
         "Checking event: {}, at {}, last: {} with result: {}",
         event,
@@ -59,29 +51,47 @@ class TriggerEvaluator implements Serializable {
   }
 
   static class Builder implements Serializable {
-    private Integer commitNumber;
-    private Integer fileNumber;
-    private Long fileSize;
-    private Integer deleteFileNumber;
+    private Integer dataFileCount;
+    private Long dataFileSizeInBytes;
+    private Integer posDeleteFileCount;
+    private Long posDeleteRecordCount;
+    private Integer eqDeleteFileCount;
+    private Long eqDeleteRecordCount;
+    private Integer commitCount;
     private Duration timeout;
 
-    Builder commitNumber(int newCommitNumber) {
-      this.commitNumber = newCommitNumber;
+    public Builder dataFileCount(int newDataFileCount) {
+      this.dataFileCount = newDataFileCount;
       return this;
     }
 
-    Builder fileNumber(int newFileNumber) {
-      this.fileNumber = newFileNumber;
+    public Builder dataFileSizeInBytes(long neDataFileSizeInBytes) {
+      this.dataFileSizeInBytes = neDataFileSizeInBytes;
       return this;
     }
 
-    Builder fileSize(long newFileSize) {
-      this.fileSize = newFileSize;
+    public Builder posDeleteFileCount(int newPosDeleteFileCount) {
+      this.posDeleteFileCount = newPosDeleteFileCount;
       return this;
     }
 
-    Builder deleteFileNumber(int newDeleteFileNumber) {
-      this.deleteFileNumber = newDeleteFileNumber;
+    public Builder posDeleteRecordCount(long newPosDeleteRecordCount) {
+      this.posDeleteRecordCount = newPosDeleteRecordCount;
+      return this;
+    }
+
+    public Builder eqDeleteFileCount(int newEqDeleteFileCount) {
+      this.eqDeleteFileCount = newEqDeleteFileCount;
+      return this;
+    }
+
+    public Builder eqDeleteRecordCount(long newEqDeleteRecordCount) {
+      this.eqDeleteRecordCount = newEqDeleteRecordCount;
+      return this;
+    }
+
+    public Builder commitCount(int newCommitCount) {
+      this.commitCount = newCommitCount;
       return this;
     }
 
@@ -92,24 +102,37 @@ class TriggerEvaluator implements Serializable {
 
     TriggerEvaluator build() {
       List<Predicate> predicates = Lists.newArrayList();
-      if (commitNumber != null) {
-        predicates.add((change, unused, unused2) -> change.commitNum() >= 
commitNumber);
+      if (dataFileCount != null) {
+        predicates.add((change, unused, unused2) -> change.dataFileCount() >= 
dataFileCount);
+      }
+
+      if (dataFileSizeInBytes != null) {
+        predicates.add(
+            (change, unused, unused2) -> change.dataFileSizeInBytes() >= 
dataFileSizeInBytes);
+      }
+
+      if (posDeleteFileCount != null) {
+        predicates.add(
+            (change, unused, unused2) -> change.posDeleteFileCount() >= 
posDeleteFileCount);
+      }
+
+      if (posDeleteRecordCount != null) {
+        predicates.add(
+            (change, unused, unused2) -> change.posDeleteRecordCount() >= 
posDeleteRecordCount);
       }
 
-      if (fileNumber != null) {
+      if (eqDeleteFileCount != null) {
         predicates.add(
-            (change, unused, unused2) ->
-                change.dataFileNum() + change.deleteFileNum() >= fileNumber);
+            (change, unused, unused2) -> change.eqDeleteFileCount() >= 
eqDeleteFileCount);
       }
 
-      if (fileSize != null) {
+      if (eqDeleteRecordCount != null) {
         predicates.add(
-            (change, unused, unused2) ->
-                change.dataFileSize() + change.deleteFileSize() >= fileSize);
+            (change, unused, unused2) -> change.eqDeleteRecordCount() >= 
eqDeleteRecordCount);
       }
 
-      if (deleteFileNumber != null) {
-        predicates.add((change, unused, unused2) -> change.deleteFileNum() >= 
deleteFileNumber);
+      if (commitCount != null) {
+        predicates.add((change, unused, unused2) -> change.commitCount() >= 
commitCount);
       }
 
       if (timeout != null) {
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
index 8c02601025..3aee053225 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
@@ -162,7 +162,11 @@ class TestMonitorSource extends OperatorTestBase {
 
                 // The first non-empty event should contain the expected value
                 return newEvent.equals(
-                    
TableChange.builder().dataFileNum(1).dataFileSize(size).commitNum(1).build());
+                    TableChange.builder()
+                        .dataFileCount(1)
+                        .dataFileSizeInBytes(size)
+                        .commitCount(1)
+                        .build());
               });
     } finally {
       closeJobClient(jobClient);
@@ -298,17 +302,17 @@ class TestMonitorSource extends OperatorTestBase {
         new MonitorSource.TableChangeIterator(tableLoader, null, 1);
 
     // For a single maxReadBack we only get a single change
-    assertThat(iterator.next().commitNum()).isEqualTo(1);
+    assertThat(iterator.next().commitCount()).isEqualTo(1);
 
     iterator = new MonitorSource.TableChangeIterator(tableLoader, null, 2);
 
     // Expecting 2 commits/snapshots for maxReadBack=2
-    assertThat(iterator.next().commitNum()).isEqualTo(2);
+    assertThat(iterator.next().commitCount()).isEqualTo(2);
 
     iterator = new MonitorSource.TableChangeIterator(tableLoader, null, 
Long.MAX_VALUE);
 
     // For maxReadBack Long.MAX_VALUE we get every change
-    assertThat(iterator.next().commitNum()).isEqualTo(3);
+    assertThat(iterator.next().commitCount()).isEqualTo(3);
   }
 
   @Test
@@ -323,7 +327,7 @@ class TestMonitorSource extends OperatorTestBase {
         new MonitorSource.TableChangeIterator(tableLoader, null, 
Long.MAX_VALUE);
 
     // Read the current snapshot
-    assertThat(iterator.next().commitNum()).isEqualTo(1);
+    assertThat(iterator.next().commitCount()).isEqualTo(1);
 
     // Create a DataOperations.REPLACE snapshot
     Table table = tableLoader.loadTable();
@@ -350,16 +354,17 @@ class TestMonitorSource extends OperatorTestBase {
         
Lists.newArrayList(table.currentSnapshot().addedDeleteFiles(table.io()).iterator());
 
     long dataSize = 
dataFiles.stream().mapToLong(ContentFile::fileSizeInBytes).sum();
-    long deleteSize = 
deleteFiles.stream().mapToLong(ContentFile::fileSizeInBytes).sum();
+    long deleteRecordCount = 
deleteFiles.stream().mapToLong(DeleteFile::recordCount).sum();
 
     TableChange newChange = previous.copy();
     newChange.merge(
         TableChange.builder()
-            .dataFileNum(dataFiles.size())
-            .dataFileSize(dataSize)
-            .deleteFileNum(deleteFiles.size())
-            .deleteFileSize(deleteSize)
-            .commitNum(1)
+            .dataFileCount(dataFiles.size())
+            .dataFileSizeInBytes(dataSize)
+            // Currently we only test with equality deletes
+            .eqDeleteFileCount(deleteFiles.size())
+            .eqDeleteRecordCount(deleteRecordCount)
+            .commitCount(1)
             .build());
     return newChange;
   }
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java
index 55e64f3e84..fba4a12d9c 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java
@@ -81,95 +81,159 @@ class TestTriggerManager extends OperatorTestBase {
   }
 
   @Test
-  void testCommitNumber() throws Exception {
+  void testCommitCount() throws Exception {
+    TriggerManager manager =
+        manager(sql.tableLoader(TABLE_NAME), new 
TriggerEvaluator.Builder().commitCount(3).build());
+    try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> 
testHarness =
+        harness(manager)) {
+      testHarness.open();
+
+      addEventAndCheckResult(testHarness, 
TableChange.builder().commitCount(1).build(), 0);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().commitCount(2).build(), 1);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().commitCount(3).build(), 2);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().commitCount(10).build(), 3);
+
+      // No trigger in this case
+      addEventAndCheckResult(testHarness, 
TableChange.builder().commitCount(1).build(), 3);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().commitCount(1).build(), 3);
+
+      addEventAndCheckResult(testHarness, 
TableChange.builder().commitCount(1).build(), 4);
+    }
+  }
+
+  @Test
+  void testDataFileCount() throws Exception {
     TriggerManager manager =
         manager(
-            sql.tableLoader(TABLE_NAME), new 
TriggerEvaluator.Builder().commitNumber(3).build());
+            sql.tableLoader(TABLE_NAME), new 
TriggerEvaluator.Builder().dataFileCount(3).build());
     try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> 
testHarness =
         harness(manager)) {
       testHarness.open();
 
-      addEventAndCheckResult(testHarness, 
TableChange.builder().commitNum(1).build(), 0);
-      addEventAndCheckResult(testHarness, 
TableChange.builder().commitNum(2).build(), 1);
-      addEventAndCheckResult(testHarness, 
TableChange.builder().commitNum(3).build(), 2);
-      addEventAndCheckResult(testHarness, 
TableChange.builder().commitNum(10).build(), 3);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().dataFileCount(1).build(), 0);
+
+      addEventAndCheckResult(testHarness, 
TableChange.builder().dataFileCount(2).build(), 1);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().dataFileCount(3).build(), 2);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().dataFileCount(5).build(), 3);
 
       // No trigger in this case
-      addEventAndCheckResult(testHarness, 
TableChange.builder().commitNum(1).build(), 3);
-      addEventAndCheckResult(testHarness, 
TableChange.builder().commitNum(1).build(), 3);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().dataFileCount(1).build(), 3);
 
-      addEventAndCheckResult(testHarness, 
TableChange.builder().commitNum(1).build(), 4);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().dataFileCount(2).build(), 4);
     }
   }
 
   @Test
-  void testFileNumber() throws Exception {
+  void testDataFileSizeInBytes() throws Exception {
     TriggerManager manager =
-        manager(sql.tableLoader(TABLE_NAME), new 
TriggerEvaluator.Builder().fileNumber(3).build());
+        manager(
+            sql.tableLoader(TABLE_NAME),
+            new TriggerEvaluator.Builder().dataFileSizeInBytes(3).build());
     try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> 
testHarness =
         harness(manager)) {
       testHarness.open();
 
-      addEventAndCheckResult(testHarness, 
TableChange.builder().dataFileNum(1).build(), 0);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().dataFileSizeInBytes(1L).build(), 0);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().dataFileSizeInBytes(2L).build(), 1);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().dataFileSizeInBytes(5L).build(), 2);
 
-      addEventAndCheckResult(
-          testHarness, 
TableChange.builder().dataFileNum(1).deleteFileNum(1).build(), 1);
-      addEventAndCheckResult(testHarness, 
TableChange.builder().deleteFileNum(3).build(), 2);
-      addEventAndCheckResult(
-          testHarness, 
TableChange.builder().dataFileNum(5).deleteFileNum(7).build(), 3);
+      // No trigger in this case
+      addEventAndCheckResult(testHarness, 
TableChange.builder().dataFileSizeInBytes(1L).build(), 2);
+
+      addEventAndCheckResult(testHarness, 
TableChange.builder().dataFileSizeInBytes(2L).build(), 3);
+    }
+  }
+
+  @Test
+  void testPosDeleteFileCount() throws Exception {
+    TriggerManager manager =
+        manager(
+            sql.tableLoader(TABLE_NAME),
+            new TriggerEvaluator.Builder().posDeleteFileCount(3).build());
+    try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> 
testHarness =
+        harness(manager)) {
+      testHarness.open();
+
+      addEventAndCheckResult(testHarness, 
TableChange.builder().posDeleteFileCount(1).build(), 0);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().posDeleteFileCount(2).build(), 1);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().posDeleteFileCount(3).build(), 2);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().posDeleteFileCount(10).build(), 3);
 
       // No trigger in this case
-      addEventAndCheckResult(testHarness, 
TableChange.builder().dataFileNum(1).build(), 3);
-      addEventAndCheckResult(testHarness, 
TableChange.builder().deleteFileNum(1).build(), 3);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().posDeleteFileCount(1).build(), 3);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().posDeleteFileCount(1).build(), 3);
 
-      addEventAndCheckResult(testHarness, 
TableChange.builder().dataFileNum(1).build(), 4);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().posDeleteFileCount(1).build(), 4);
     }
   }
 
   @Test
-  void testFileSize() throws Exception {
+  void testPosDeleteRecordCount() throws Exception {
     TriggerManager manager =
-        manager(sql.tableLoader(TABLE_NAME), new 
TriggerEvaluator.Builder().fileSize(3).build());
+        manager(
+            sql.tableLoader(TABLE_NAME),
+            new TriggerEvaluator.Builder().posDeleteRecordCount(3).build());
     try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> 
testHarness =
         harness(manager)) {
       testHarness.open();
 
-      addEventAndCheckResult(testHarness, 
TableChange.builder().dataFileSize(1L).build(), 0);
       addEventAndCheckResult(
-          testHarness, 
TableChange.builder().dataFileSize(1L).deleteFileSize(1L).build(), 1);
-      addEventAndCheckResult(testHarness, 
TableChange.builder().deleteFileSize(3L).build(), 2);
+          testHarness, TableChange.builder().posDeleteRecordCount(1L).build(), 
0);
       addEventAndCheckResult(
-          testHarness, 
TableChange.builder().dataFileSize(5L).deleteFileSize(7L).build(), 3);
+          testHarness, TableChange.builder().posDeleteRecordCount(2L).build(), 
1);
+      addEventAndCheckResult(
+          testHarness, TableChange.builder().posDeleteRecordCount(5L).build(), 
2);
 
       // No trigger in this case
-      addEventAndCheckResult(testHarness, 
TableChange.builder().dataFileSize(1L).build(), 3);
-      addEventAndCheckResult(testHarness, 
TableChange.builder().deleteFileSize(1L).build(), 3);
+      addEventAndCheckResult(
+          testHarness, TableChange.builder().posDeleteRecordCount(1L).build(), 
2);
 
-      addEventAndCheckResult(testHarness, 
TableChange.builder().dataFileSize(1L).build(), 4);
+      addEventAndCheckResult(
+          testHarness, TableChange.builder().posDeleteRecordCount(2L).build(), 
3);
     }
   }
 
   @Test
-  void testDeleteFileNumber() throws Exception {
+  void testEqDeleteFileCount() throws Exception {
     TriggerManager manager =
         manager(
             sql.tableLoader(TABLE_NAME),
-            new TriggerEvaluator.Builder().deleteFileNumber(3).build());
+            new TriggerEvaluator.Builder().eqDeleteFileCount(3).build());
     try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> 
testHarness =
         harness(manager)) {
       testHarness.open();
 
-      addEventAndCheckResult(
-          testHarness, 
TableChange.builder().dataFileNum(3).deleteFileNum(1).build(), 0);
-      addEventAndCheckResult(testHarness, 
TableChange.builder().deleteFileNum(2).build(), 1);
-      addEventAndCheckResult(testHarness, 
TableChange.builder().deleteFileNum(3).build(), 2);
-      addEventAndCheckResult(testHarness, 
TableChange.builder().deleteFileNum(10).build(), 3);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().eqDeleteFileCount(1).build(), 0);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().eqDeleteFileCount(2).build(), 1);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().eqDeleteFileCount(3).build(), 2);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().eqDeleteFileCount(10).build(), 3);
+
+      // No trigger in this case
+      addEventAndCheckResult(testHarness, 
TableChange.builder().eqDeleteFileCount(1).build(), 3);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().eqDeleteFileCount(1).build(), 3);
+
+      addEventAndCheckResult(testHarness, 
TableChange.builder().eqDeleteFileCount(1).build(), 4);
+    }
+  }
+
+  @Test
+  void testEqDeleteRecordCount() throws Exception {
+    TriggerManager manager =
+        manager(
+            sql.tableLoader(TABLE_NAME),
+            new TriggerEvaluator.Builder().eqDeleteRecordCount(3).build());
+    try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> 
testHarness =
+        harness(manager)) {
+      testHarness.open();
+
+      addEventAndCheckResult(testHarness, 
TableChange.builder().eqDeleteRecordCount(1L).build(), 0);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().eqDeleteRecordCount(2L).build(), 1);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().eqDeleteRecordCount(5L).build(), 2);
 
       // No trigger in this case
-      addEventAndCheckResult(testHarness, 
TableChange.builder().deleteFileNum(1).build(), 3);
-      addEventAndCheckResult(testHarness, 
TableChange.builder().deleteFileNum(1).build(), 3);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().eqDeleteRecordCount(1L).build(), 2);
 
-      addEventAndCheckResult(testHarness, 
TableChange.builder().deleteFileNum(1).build(), 4);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().eqDeleteRecordCount(2L).build(), 3);
     }
   }
 
@@ -183,7 +247,7 @@ class TestTriggerManager extends OperatorTestBase {
         harness(manager)) {
       testHarness.open();
 
-      TableChange event = 
TableChange.builder().dataFileSize(1).commitNum(1).build();
+      TableChange event = 
TableChange.builder().dataFileCount(1).commitCount(1).build();
 
       // Wait for some time
       testHarness.processElement(event, EVENT_TIME);
@@ -225,7 +289,7 @@ class TestTriggerManager extends OperatorTestBase {
       testHarness.open();
 
       testHarness.processElement(
-          TableChange.builder().dataFileSize(1).commitNum(1).build(), 
EVENT_TIME);
+          TableChange.builder().dataFileCount(1).commitCount(1).build(), 
EVENT_TIME);
 
       assertThat(testHarness.extractOutputValues()).isEmpty();
 
@@ -240,7 +304,7 @@ class TestTriggerManager extends OperatorTestBase {
       testHarness.open();
 
       // Arrives the first real change which triggers the recovery process
-      testHarness.processElement(TableChange.builder().commitNum(1).build(), 
EVENT_TIME_2);
+      testHarness.processElement(TableChange.builder().commitCount(1).build(), 
EVENT_TIME_2);
       assertTriggers(
           testHarness.extractOutputValues(),
           
Lists.newArrayList(Trigger.recovery(testHarness.getProcessingTime())));
@@ -261,11 +325,11 @@ class TestTriggerManager extends OperatorTestBase {
         harness(manager)) {
       testHarness.open();
 
-      addEventAndCheckResult(testHarness, 
TableChange.builder().commitNum(2).build(), 1);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().commitCount(2).build(), 1);
       long currentTime = testHarness.getProcessingTime();
 
       // No new fire yet
-      addEventAndCheckResult(testHarness, 
TableChange.builder().commitNum(2).build(), 1);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().commitCount(2).build(), 1);
 
       // Check that the trigger fired after the delay
       testHarness.setProcessingTime(currentTime + DELAY);
@@ -281,11 +345,11 @@ class TestTriggerManager extends OperatorTestBase {
         harness(manager)) {
       testHarness.open();
 
-      addEventAndCheckResult(testHarness, 
TableChange.builder().commitNum(2).build(), 1);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().commitCount(2).build(), 1);
 
       // Create a lock to prevent execution, and check that there is no result
       assertThat(lock.tryLock()).isTrue();
-      addEventAndCheckResult(testHarness, 
TableChange.builder().commitNum(2).build(), 1);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().commitCount(2).build(), 1);
       long currentTime = testHarness.getProcessingTime();
 
       // Remove the lock, and still no trigger
@@ -331,7 +395,7 @@ class TestTriggerManager extends OperatorTestBase {
       ++processingTime;
       expected.add(Trigger.recovery(processingTime));
       testHarness.setProcessingTime(processingTime);
-      testHarness.processElement(TableChange.builder().commitNum(2).build(), 
processingTime);
+      testHarness.processElement(TableChange.builder().commitCount(2).build(), 
processingTime);
       assertTriggers(testHarness.extractOutputValues(), expected);
 
       // Nothing happens until the recovery is finished
@@ -347,7 +411,7 @@ class TestTriggerManager extends OperatorTestBase {
       // Still no results as the recovery is ongoing
       ++processingTime;
       testHarness.setProcessingTime(processingTime);
-      testHarness.processElement(TableChange.builder().commitNum(2).build(), 
processingTime);
+      testHarness.processElement(TableChange.builder().commitCount(2).build(), 
processingTime);
       assertTriggers(testHarness.extractOutputValues(), expected);
 
       // Simulate the action of removing lock and recoveryLock by downstream 
lock cleaner when it
@@ -383,8 +447,8 @@ class TestTriggerManager extends OperatorTestBase {
             lockFactory,
             Lists.newArrayList(NAME_1, NAME_2),
             Lists.newArrayList(
-                new TriggerEvaluator.Builder().commitNumber(2).build(),
-                new TriggerEvaluator.Builder().commitNumber(4).build()),
+                new TriggerEvaluator.Builder().commitCount(2).build(),
+                new TriggerEvaluator.Builder().commitCount(4).build()),
             1L,
             1L);
     source
@@ -400,7 +464,7 @@ class TestTriggerManager extends OperatorTestBase {
       jobClient = env.executeAsync();
 
       // This one doesn't trigger - tests NOTHING_TO_TRIGGER
-      source.sendRecord(TableChange.builder().commitNum(1).build());
+      source.sendRecord(TableChange.builder().commitCount(1).build());
 
       Awaitility.await()
           .until(
@@ -412,7 +476,7 @@ class TestTriggerManager extends OperatorTestBase {
               });
 
       // Trigger one of the tasks - tests TRIGGERED
-      source.sendRecord(TableChange.builder().commitNum(1).build());
+      source.sendRecord(TableChange.builder().commitCount(1).build());
       // Wait until we receive the trigger
       assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
       assertThat(
@@ -421,7 +485,7 @@ class TestTriggerManager extends OperatorTestBase {
       lock.unlock();
 
       // Trigger both of the tasks - tests TRIGGERED
-      source.sendRecord(TableChange.builder().commitNum(2).build());
+      source.sendRecord(TableChange.builder().commitCount(2).build());
       // Wait until we receive the trigger
       assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
       lock.unlock();
@@ -472,14 +536,14 @@ class TestTriggerManager extends OperatorTestBase {
       jobClient = env.executeAsync();
 
       // Start the first trigger
-      source.sendRecord(TableChange.builder().commitNum(2).build());
+      source.sendRecord(TableChange.builder().commitCount(2).build());
       assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
 
       // Remove the lock to allow the next trigger
       lock.unlock();
 
       // The second trigger will be blocked
-      source.sendRecord(TableChange.builder().commitNum(2).build());
+      source.sendRecord(TableChange.builder().commitCount(2).build());
       Awaitility.await()
           .until(
               () ->
@@ -518,11 +582,11 @@ class TestTriggerManager extends OperatorTestBase {
       jobClient = env.executeAsync();
 
       // Start the first trigger - notice that we do not remove the lock after 
the trigger
-      source.sendRecord(TableChange.builder().commitNum(2).build());
+      source.sendRecord(TableChange.builder().commitCount(2).build());
       assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
 
       // The second trigger will be blocked by the lock
-      source.sendRecord(TableChange.builder().commitNum(2).build());
+      source.sendRecord(TableChange.builder().commitCount(2).build());
       Awaitility.await()
           .until(
               () ->
@@ -589,13 +653,13 @@ class TestTriggerManager extends OperatorTestBase {
         tableLoader,
         lockFactory,
         Lists.newArrayList(NAME_1),
-        Lists.newArrayList(new 
TriggerEvaluator.Builder().commitNumber(2).build()),
+        Lists.newArrayList(new 
TriggerEvaluator.Builder().commitCount(2).build()),
         minFireDelayMs,
         lockCheckDelayMs);
   }
 
   private TriggerManager manager(TableLoader tableLoader) {
-    return manager(tableLoader, new 
TriggerEvaluator.Builder().commitNumber(2).build());
+    return manager(tableLoader, new 
TriggerEvaluator.Builder().commitCount(2).build());
   }
 
   private static void assertTriggers(List<Trigger> expected, List<Trigger> 
actual) {


Reply via email to