This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 2424e2c31d Flink: Port #10992 to v1.19 (#10994)
2424e2c31d is described below
commit 2424e2c31dbee75d642fed4c10bf5bd9ebbe09bd
Author: pvary <[email protected]>
AuthorDate: Fri Aug 23 17:51:00 2024 +0200
Flink: Port #10992 to v1.19 (#10994)
---
.../maintenance/operator/JdbcLockFactory.java | 14 +-
.../flink/maintenance/operator/TableChange.java | 193 ++++++++++++++-------
.../maintenance/operator/TriggerEvaluator.java | 85 +++++----
.../maintenance/operator/TestMonitorSource.java | 27 +--
.../maintenance/operator/TestTriggerManager.java | 182 ++++++++++++-------
5 files changed, 329 insertions(+), 172 deletions(-)
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java
index 21c8935abe..f22be33aea 100644
---
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java
@@ -105,12 +105,12 @@ public class JdbcLockFactory implements
TriggerLockFactory {
@Override
public Lock createLock() {
- return new Lock(pool, lockId, Type.MAINTENANCE);
+ return new JdbcLock(pool, lockId, Type.MAINTENANCE);
}
@Override
public Lock createRecoveryLock() {
- return new Lock(pool, lockId, Type.RECOVERY);
+ return new JdbcLock(pool, lockId, Type.RECOVERY);
}
@Override
@@ -153,12 +153,12 @@ public class JdbcLockFactory implements
TriggerLockFactory {
}
}
- public static class Lock implements TriggerLockFactory.Lock {
+ private static class JdbcLock implements TriggerLockFactory.Lock {
private final JdbcClientPool pool;
private final String lockId;
private final Type type;
- public Lock(JdbcClientPool pool, String lockId, Type type) {
+ private JdbcLock(JdbcClientPool pool, String lockId, Type type) {
this.pool = pool;
this.lockId = lockId;
this.type = type;
@@ -221,7 +221,7 @@ public class JdbcLockFactory implements TriggerLockFactory {
throw new UncheckedInterruptedException(e, "Interrupted during
isHeld");
} catch (SQLException e) {
// SQL exception happened when getting lock information
- throw new UncheckedSQLException(e, "Failed to get lock information for
%s", this);
+ throw new UncheckedSQLException(e, "Failed to check the state of the
lock %s", this);
}
}
@@ -266,8 +266,6 @@ public class JdbcLockFactory implements TriggerLockFactory {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new UncheckedInterruptedException(e, "Interrupted during
unlock");
- } catch (UncheckedSQLException e) {
- throw e;
} catch (SQLException e) {
// SQL exception happened when getting/updating lock information
throw new UncheckedSQLException(e, "Failed to remove lock %s", this);
@@ -312,7 +310,7 @@ public class JdbcLockFactory implements TriggerLockFactory {
MAINTENANCE("m"),
RECOVERY("r");
- private String key;
+ private final String key;
Type(String key) {
this.key = key;
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
index 7d0b94e97d..5252cf61b0 100644
---
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
@@ -29,19 +29,29 @@ import
org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
/** Event describing changes in an Iceberg table */
@Internal
class TableChange {
- private int dataFileNum;
- private int deleteFileNum;
- private long dataFileSize;
- private long deleteFileSize;
- private int commitNum;
-
- private TableChange(
- int dataFileNum, int deleteFileNum, long dataFileSize, long
deleteFileSize, int commitNum) {
- this.dataFileNum = dataFileNum;
- this.deleteFileNum = deleteFileNum;
- this.dataFileSize = dataFileSize;
- this.deleteFileSize = deleteFileSize;
- this.commitNum = commitNum;
+ private int dataFileCount;
+ private long dataFileSizeInBytes;
+ private int posDeleteFileCount;
+ private long posDeleteRecordCount;
+ private int eqDeleteFileCount;
+ private long eqDeleteRecordCount;
+ private int commitCount;
+
+ TableChange(
+ int dataFileCount,
+ long dataFileSizeInBytes,
+ int posDeleteFileCount,
+ long posDeleteRecordCount,
+ int eqDeleteFileCount,
+ long eqDeleteRecordCount,
+ int commitCount) {
+ this.dataFileCount = dataFileCount;
+ this.dataFileSizeInBytes = dataFileSizeInBytes;
+ this.posDeleteFileCount = posDeleteFileCount;
+ this.posDeleteRecordCount = posDeleteRecordCount;
+ this.eqDeleteFileCount = eqDeleteFileCount;
+ this.eqDeleteRecordCount = eqDeleteRecordCount;
+ this.commitCount = commitCount;
}
TableChange(Snapshot snapshot, FileIO io) {
@@ -50,67 +60,96 @@ class TableChange {
dataFiles.forEach(
dataFile -> {
- this.dataFileNum++;
- this.dataFileSize += dataFile.fileSizeInBytes();
+ this.dataFileCount++;
+ this.dataFileSizeInBytes += dataFile.fileSizeInBytes();
});
deleteFiles.forEach(
deleteFile -> {
- this.deleteFileNum++;
- this.deleteFileSize += deleteFile.fileSizeInBytes();
+ switch (deleteFile.content()) {
+ case POSITION_DELETES:
+ this.posDeleteFileCount++;
+ this.posDeleteRecordCount += deleteFile.recordCount();
+ break;
+ case EQUALITY_DELETES:
+ this.eqDeleteFileCount++;
+ this.eqDeleteRecordCount += deleteFile.recordCount();
+ break;
+ default:
+ throw new IllegalArgumentException("Unexpected delete file
content: " + deleteFile);
+ }
});
- this.commitNum = 1;
+ this.commitCount = 1;
}
static TableChange empty() {
- return new TableChange(0, 0, 0L, 0L, 0);
+ return new TableChange(0, 0L, 0, 0L, 0, 0L, 0);
}
static Builder builder() {
return new Builder();
}
- int dataFileNum() {
- return dataFileNum;
+ int dataFileCount() {
+ return dataFileCount;
+ }
+
+ long dataFileSizeInBytes() {
+ return dataFileSizeInBytes;
}
- int deleteFileNum() {
- return deleteFileNum;
+ int posDeleteFileCount() {
+ return posDeleteFileCount;
}
- long dataFileSize() {
- return dataFileSize;
+ long posDeleteRecordCount() {
+ return posDeleteRecordCount;
}
- long deleteFileSize() {
- return deleteFileSize;
+ int eqDeleteFileCount() {
+ return eqDeleteFileCount;
}
- public int commitNum() {
- return commitNum;
+ long eqDeleteRecordCount() {
+ return eqDeleteRecordCount;
+ }
+
+ public int commitCount() {
+ return commitCount;
}
public void merge(TableChange other) {
- this.dataFileNum += other.dataFileNum;
- this.deleteFileNum += other.deleteFileNum;
- this.dataFileSize += other.dataFileSize;
- this.deleteFileSize += other.deleteFileSize;
- this.commitNum += other.commitNum;
+ this.dataFileCount += other.dataFileCount;
+ this.dataFileSizeInBytes += other.dataFileSizeInBytes;
+ this.posDeleteFileCount += other.posDeleteFileCount;
+ this.posDeleteRecordCount += other.posDeleteRecordCount;
+ this.eqDeleteFileCount += other.eqDeleteFileCount;
+ this.eqDeleteRecordCount += other.eqDeleteRecordCount;
+ this.commitCount += other.commitCount;
}
TableChange copy() {
- return new TableChange(dataFileNum, deleteFileNum, dataFileSize,
deleteFileSize, commitNum);
+ return new TableChange(
+ dataFileCount,
+ dataFileSizeInBytes,
+ posDeleteFileCount,
+ posDeleteRecordCount,
+ eqDeleteFileCount,
+ eqDeleteRecordCount,
+ commitCount);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
- .add("dataFileNum", dataFileNum)
- .add("deleteFileNum", deleteFileNum)
- .add("dataFileSize", dataFileSize)
- .add("deleteFileSize", deleteFileSize)
- .add("commitNum", commitNum)
+ .add("dataFileCount", dataFileCount)
+ .add("dataFileSizeInBytes", dataFileSizeInBytes)
+ .add("posDeleteFileCount", posDeleteFileCount)
+ .add("posDeleteRecordCount", posDeleteRecordCount)
+ .add("eqDeleteFileCount", eqDeleteFileCount)
+ .add("eqDeleteRecordCount", eqDeleteRecordCount)
+ .add("commitCount", commitCount)
.toString();
}
@@ -123,52 +162,80 @@ class TableChange {
}
TableChange that = (TableChange) other;
- return dataFileNum == that.dataFileNum
- && deleteFileNum == that.deleteFileNum
- && dataFileSize == that.dataFileSize
- && deleteFileSize == that.deleteFileSize
- && commitNum == that.commitNum;
+ return dataFileCount == that.dataFileCount
+ && dataFileSizeInBytes == that.dataFileSizeInBytes
+ && posDeleteFileCount == that.posDeleteFileCount
+ && posDeleteRecordCount == that.posDeleteRecordCount
+ && eqDeleteFileCount == that.eqDeleteFileCount
+ && eqDeleteRecordCount == that.eqDeleteRecordCount
+ && commitCount == that.commitCount;
}
@Override
public int hashCode() {
- return Objects.hash(dataFileNum, deleteFileNum, dataFileSize,
deleteFileSize, commitNum);
+ return Objects.hash(
+ dataFileCount,
+ dataFileSizeInBytes,
+ posDeleteFileCount,
+ posDeleteRecordCount,
+ eqDeleteFileCount,
+ eqDeleteRecordCount,
+ commitCount);
}
static class Builder {
- private int dataFileNum = 0;
- private int deleteFileNum = 0;
- private long dataFileSize = 0L;
- private long deleteFileSize = 0L;
- private int commitNum = 0;
-
- public Builder dataFileNum(int newDataFileNum) {
- this.dataFileNum = newDataFileNum;
+ private int dataFileCount = 0;
+ private long dataFileSizeInBytes = 0L;
+ private int posDeleteFileCount = 0;
+ private long posDeleteRecordCount = 0L;
+ private int eqDeleteFileCount = 0;
+ private long eqDeleteRecordCount = 0L;
+ private int commitCount = 0;
+
+ public Builder dataFileCount(int newDataFileCount) {
+ this.dataFileCount = newDataFileCount;
+ return this;
+ }
+
+ public Builder dataFileSizeInBytes(long newDataFileSizeInBytes) {
+ this.dataFileSizeInBytes = newDataFileSizeInBytes;
+ return this;
+ }
+
+ public Builder posDeleteFileCount(int newPosDeleteFileCount) {
+ this.posDeleteFileCount = newPosDeleteFileCount;
return this;
}
- public Builder deleteFileNum(int newDeleteFileNum) {
- this.deleteFileNum = newDeleteFileNum;
+ public Builder posDeleteRecordCount(long newPosDeleteRecordCount) {
+ this.posDeleteRecordCount = newPosDeleteRecordCount;
return this;
}
- public Builder dataFileSize(long newDataFileSize) {
- this.dataFileSize = newDataFileSize;
+ public Builder eqDeleteFileCount(int newEqDeleteFileCount) {
+ this.eqDeleteFileCount = newEqDeleteFileCount;
return this;
}
- public Builder deleteFileSize(long newDeleteFileSize) {
- this.deleteFileSize = newDeleteFileSize;
+ public Builder eqDeleteRecordCount(long newEqDeleteRecordCount) {
+ this.eqDeleteRecordCount = newEqDeleteRecordCount;
return this;
}
- public Builder commitNum(int newCommitNum) {
- this.commitNum = newCommitNum;
+ public Builder commitCount(int newCommitCount) {
+ this.commitCount = newCommitCount;
return this;
}
public TableChange build() {
- return new TableChange(dataFileNum, deleteFileNum, dataFileSize,
deleteFileSize, commitNum);
+ return new TableChange(
+ dataFileCount,
+ dataFileSizeInBytes,
+ posDeleteFileCount,
+ posDeleteRecordCount,
+ eqDeleteFileCount,
+ eqDeleteRecordCount,
+ commitCount);
}
}
}
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerEvaluator.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerEvaluator.java
index 37e4e3afd4..dba33b22a4 100644
---
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerEvaluator.java
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerEvaluator.java
@@ -40,15 +40,7 @@ class TriggerEvaluator implements Serializable {
boolean check(TableChange event, long lastTimeMs, long currentTimeMs) {
boolean result =
- predicates.stream()
- .anyMatch(
- p -> {
- try {
- return p.evaluate(event, lastTimeMs, currentTimeMs);
- } catch (Exception e) {
- throw new RuntimeException("Error accessing state", e);
- }
- });
+ predicates.stream().anyMatch(p -> p.evaluate(event, lastTimeMs,
currentTimeMs));
LOG.debug(
"Checking event: {}, at {}, last: {} with result: {}",
event,
@@ -59,29 +51,47 @@ class TriggerEvaluator implements Serializable {
}
static class Builder implements Serializable {
- private Integer commitNumber;
- private Integer fileNumber;
- private Long fileSize;
- private Integer deleteFileNumber;
+ private Integer dataFileCount;
+ private Long dataFileSizeInBytes;
+ private Integer posDeleteFileCount;
+ private Long posDeleteRecordCount;
+ private Integer eqDeleteFileCount;
+ private Long eqDeleteRecordCount;
+ private Integer commitCount;
private Duration timeout;
- Builder commitNumber(int newCommitNumber) {
- this.commitNumber = newCommitNumber;
+ public Builder dataFileCount(int newDataFileCount) {
+ this.dataFileCount = newDataFileCount;
return this;
}
- Builder fileNumber(int newFileNumber) {
- this.fileNumber = newFileNumber;
+ public Builder dataFileSizeInBytes(long neDataFileSizeInBytes) {
+ this.dataFileSizeInBytes = neDataFileSizeInBytes;
return this;
}
- Builder fileSize(long newFileSize) {
- this.fileSize = newFileSize;
+ public Builder posDeleteFileCount(int newPosDeleteFileCount) {
+ this.posDeleteFileCount = newPosDeleteFileCount;
return this;
}
- Builder deleteFileNumber(int newDeleteFileNumber) {
- this.deleteFileNumber = newDeleteFileNumber;
+ public Builder posDeleteRecordCount(long newPosDeleteRecordCount) {
+ this.posDeleteRecordCount = newPosDeleteRecordCount;
+ return this;
+ }
+
+ public Builder eqDeleteFileCount(int newEqDeleteFileCount) {
+ this.eqDeleteFileCount = newEqDeleteFileCount;
+ return this;
+ }
+
+ public Builder eqDeleteRecordCount(long newEqDeleteRecordCount) {
+ this.eqDeleteRecordCount = newEqDeleteRecordCount;
+ return this;
+ }
+
+ public Builder commitCount(int newCommitCount) {
+ this.commitCount = newCommitCount;
return this;
}
@@ -92,24 +102,37 @@ class TriggerEvaluator implements Serializable {
TriggerEvaluator build() {
List<Predicate> predicates = Lists.newArrayList();
- if (commitNumber != null) {
- predicates.add((change, unused, unused2) -> change.commitNum() >=
commitNumber);
+ if (dataFileCount != null) {
+ predicates.add((change, unused, unused2) -> change.dataFileCount() >=
dataFileCount);
+ }
+
+ if (dataFileSizeInBytes != null) {
+ predicates.add(
+ (change, unused, unused2) -> change.dataFileSizeInBytes() >=
dataFileSizeInBytes);
+ }
+
+ if (posDeleteFileCount != null) {
+ predicates.add(
+ (change, unused, unused2) -> change.posDeleteFileCount() >=
posDeleteFileCount);
+ }
+
+ if (posDeleteRecordCount != null) {
+ predicates.add(
+ (change, unused, unused2) -> change.posDeleteRecordCount() >=
posDeleteRecordCount);
}
- if (fileNumber != null) {
+ if (eqDeleteFileCount != null) {
predicates.add(
- (change, unused, unused2) ->
- change.dataFileNum() + change.deleteFileNum() >= fileNumber);
+ (change, unused, unused2) -> change.eqDeleteFileCount() >=
eqDeleteFileCount);
}
- if (fileSize != null) {
+ if (eqDeleteRecordCount != null) {
predicates.add(
- (change, unused, unused2) ->
- change.dataFileSize() + change.deleteFileSize() >= fileSize);
+ (change, unused, unused2) -> change.eqDeleteRecordCount() >=
eqDeleteRecordCount);
}
- if (deleteFileNumber != null) {
- predicates.add((change, unused, unused2) -> change.deleteFileNum() >=
deleteFileNumber);
+ if (commitCount != null) {
+ predicates.add((change, unused, unused2) -> change.commitCount() >=
commitCount);
}
if (timeout != null) {
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
index 8c02601025..3aee053225 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
@@ -162,7 +162,11 @@ class TestMonitorSource extends OperatorTestBase {
// The first non-empty event should contain the expected value
return newEvent.equals(
-
TableChange.builder().dataFileNum(1).dataFileSize(size).commitNum(1).build());
+ TableChange.builder()
+ .dataFileCount(1)
+ .dataFileSizeInBytes(size)
+ .commitCount(1)
+ .build());
});
} finally {
closeJobClient(jobClient);
@@ -298,17 +302,17 @@ class TestMonitorSource extends OperatorTestBase {
new MonitorSource.TableChangeIterator(tableLoader, null, 1);
// For a single maxReadBack we only get a single change
- assertThat(iterator.next().commitNum()).isEqualTo(1);
+ assertThat(iterator.next().commitCount()).isEqualTo(1);
iterator = new MonitorSource.TableChangeIterator(tableLoader, null, 2);
// Expecting 2 commits/snapshots for maxReadBack=2
- assertThat(iterator.next().commitNum()).isEqualTo(2);
+ assertThat(iterator.next().commitCount()).isEqualTo(2);
iterator = new MonitorSource.TableChangeIterator(tableLoader, null,
Long.MAX_VALUE);
// For maxReadBack Long.MAX_VALUE we get every change
- assertThat(iterator.next().commitNum()).isEqualTo(3);
+ assertThat(iterator.next().commitCount()).isEqualTo(3);
}
@Test
@@ -323,7 +327,7 @@ class TestMonitorSource extends OperatorTestBase {
new MonitorSource.TableChangeIterator(tableLoader, null,
Long.MAX_VALUE);
// Read the current snapshot
- assertThat(iterator.next().commitNum()).isEqualTo(1);
+ assertThat(iterator.next().commitCount()).isEqualTo(1);
// Create a DataOperations.REPLACE snapshot
Table table = tableLoader.loadTable();
@@ -350,16 +354,17 @@ class TestMonitorSource extends OperatorTestBase {
Lists.newArrayList(table.currentSnapshot().addedDeleteFiles(table.io()).iterator());
long dataSize =
dataFiles.stream().mapToLong(ContentFile::fileSizeInBytes).sum();
- long deleteSize =
deleteFiles.stream().mapToLong(ContentFile::fileSizeInBytes).sum();
+ long deleteRecordCount =
deleteFiles.stream().mapToLong(DeleteFile::recordCount).sum();
TableChange newChange = previous.copy();
newChange.merge(
TableChange.builder()
- .dataFileNum(dataFiles.size())
- .dataFileSize(dataSize)
- .deleteFileNum(deleteFiles.size())
- .deleteFileSize(deleteSize)
- .commitNum(1)
+ .dataFileCount(dataFiles.size())
+ .dataFileSizeInBytes(dataSize)
+ // Currently we only test with equality deletes
+ .eqDeleteFileCount(deleteFiles.size())
+ .eqDeleteRecordCount(deleteRecordCount)
+ .commitCount(1)
.build());
return newChange;
}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java
index 55e64f3e84..fba4a12d9c 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java
@@ -81,95 +81,159 @@ class TestTriggerManager extends OperatorTestBase {
}
@Test
- void testCommitNumber() throws Exception {
+ void testCommitCount() throws Exception {
+ TriggerManager manager =
+ manager(sql.tableLoader(TABLE_NAME), new
TriggerEvaluator.Builder().commitCount(3).build());
+ try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger>
testHarness =
+ harness(manager)) {
+ testHarness.open();
+
+ addEventAndCheckResult(testHarness,
TableChange.builder().commitCount(1).build(), 0);
+ addEventAndCheckResult(testHarness,
TableChange.builder().commitCount(2).build(), 1);
+ addEventAndCheckResult(testHarness,
TableChange.builder().commitCount(3).build(), 2);
+ addEventAndCheckResult(testHarness,
TableChange.builder().commitCount(10).build(), 3);
+
+ // No trigger in this case
+ addEventAndCheckResult(testHarness,
TableChange.builder().commitCount(1).build(), 3);
+ addEventAndCheckResult(testHarness,
TableChange.builder().commitCount(1).build(), 3);
+
+ addEventAndCheckResult(testHarness,
TableChange.builder().commitCount(1).build(), 4);
+ }
+ }
+
+ @Test
+ void testDataFileCount() throws Exception {
TriggerManager manager =
manager(
- sql.tableLoader(TABLE_NAME), new
TriggerEvaluator.Builder().commitNumber(3).build());
+ sql.tableLoader(TABLE_NAME), new
TriggerEvaluator.Builder().dataFileCount(3).build());
try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger>
testHarness =
harness(manager)) {
testHarness.open();
- addEventAndCheckResult(testHarness,
TableChange.builder().commitNum(1).build(), 0);
- addEventAndCheckResult(testHarness,
TableChange.builder().commitNum(2).build(), 1);
- addEventAndCheckResult(testHarness,
TableChange.builder().commitNum(3).build(), 2);
- addEventAndCheckResult(testHarness,
TableChange.builder().commitNum(10).build(), 3);
+ addEventAndCheckResult(testHarness,
TableChange.builder().dataFileCount(1).build(), 0);
+
+ addEventAndCheckResult(testHarness,
TableChange.builder().dataFileCount(2).build(), 1);
+ addEventAndCheckResult(testHarness,
TableChange.builder().dataFileCount(3).build(), 2);
+ addEventAndCheckResult(testHarness,
TableChange.builder().dataFileCount(5).build(), 3);
// No trigger in this case
- addEventAndCheckResult(testHarness,
TableChange.builder().commitNum(1).build(), 3);
- addEventAndCheckResult(testHarness,
TableChange.builder().commitNum(1).build(), 3);
+ addEventAndCheckResult(testHarness,
TableChange.builder().dataFileCount(1).build(), 3);
- addEventAndCheckResult(testHarness,
TableChange.builder().commitNum(1).build(), 4);
+ addEventAndCheckResult(testHarness,
TableChange.builder().dataFileCount(2).build(), 4);
}
}
@Test
- void testFileNumber() throws Exception {
+ void testDataFileSizeInBytes() throws Exception {
TriggerManager manager =
- manager(sql.tableLoader(TABLE_NAME), new
TriggerEvaluator.Builder().fileNumber(3).build());
+ manager(
+ sql.tableLoader(TABLE_NAME),
+ new TriggerEvaluator.Builder().dataFileSizeInBytes(3).build());
try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger>
testHarness =
harness(manager)) {
testHarness.open();
- addEventAndCheckResult(testHarness,
TableChange.builder().dataFileNum(1).build(), 0);
+ addEventAndCheckResult(testHarness,
TableChange.builder().dataFileSizeInBytes(1L).build(), 0);
+ addEventAndCheckResult(testHarness,
TableChange.builder().dataFileSizeInBytes(2L).build(), 1);
+ addEventAndCheckResult(testHarness,
TableChange.builder().dataFileSizeInBytes(5L).build(), 2);
- addEventAndCheckResult(
- testHarness,
TableChange.builder().dataFileNum(1).deleteFileNum(1).build(), 1);
- addEventAndCheckResult(testHarness,
TableChange.builder().deleteFileNum(3).build(), 2);
- addEventAndCheckResult(
- testHarness,
TableChange.builder().dataFileNum(5).deleteFileNum(7).build(), 3);
+ // No trigger in this case
+ addEventAndCheckResult(testHarness,
TableChange.builder().dataFileSizeInBytes(1L).build(), 2);
+
+ addEventAndCheckResult(testHarness,
TableChange.builder().dataFileSizeInBytes(2L).build(), 3);
+ }
+ }
+
+ @Test
+ void testPosDeleteFileCount() throws Exception {
+ TriggerManager manager =
+ manager(
+ sql.tableLoader(TABLE_NAME),
+ new TriggerEvaluator.Builder().posDeleteFileCount(3).build());
+ try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger>
testHarness =
+ harness(manager)) {
+ testHarness.open();
+
+ addEventAndCheckResult(testHarness,
TableChange.builder().posDeleteFileCount(1).build(), 0);
+ addEventAndCheckResult(testHarness,
TableChange.builder().posDeleteFileCount(2).build(), 1);
+ addEventAndCheckResult(testHarness,
TableChange.builder().posDeleteFileCount(3).build(), 2);
+ addEventAndCheckResult(testHarness,
TableChange.builder().posDeleteFileCount(10).build(), 3);
// No trigger in this case
- addEventAndCheckResult(testHarness,
TableChange.builder().dataFileNum(1).build(), 3);
- addEventAndCheckResult(testHarness,
TableChange.builder().deleteFileNum(1).build(), 3);
+ addEventAndCheckResult(testHarness,
TableChange.builder().posDeleteFileCount(1).build(), 3);
+ addEventAndCheckResult(testHarness,
TableChange.builder().posDeleteFileCount(1).build(), 3);
- addEventAndCheckResult(testHarness,
TableChange.builder().dataFileNum(1).build(), 4);
+ addEventAndCheckResult(testHarness,
TableChange.builder().posDeleteFileCount(1).build(), 4);
}
}
@Test
- void testFileSize() throws Exception {
+ void testPosDeleteRecordCount() throws Exception {
TriggerManager manager =
- manager(sql.tableLoader(TABLE_NAME), new
TriggerEvaluator.Builder().fileSize(3).build());
+ manager(
+ sql.tableLoader(TABLE_NAME),
+ new TriggerEvaluator.Builder().posDeleteRecordCount(3).build());
try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger>
testHarness =
harness(manager)) {
testHarness.open();
- addEventAndCheckResult(testHarness,
TableChange.builder().dataFileSize(1L).build(), 0);
addEventAndCheckResult(
- testHarness,
TableChange.builder().dataFileSize(1L).deleteFileSize(1L).build(), 1);
- addEventAndCheckResult(testHarness,
TableChange.builder().deleteFileSize(3L).build(), 2);
+ testHarness, TableChange.builder().posDeleteRecordCount(1L).build(),
0);
addEventAndCheckResult(
- testHarness,
TableChange.builder().dataFileSize(5L).deleteFileSize(7L).build(), 3);
+ testHarness, TableChange.builder().posDeleteRecordCount(2L).build(),
1);
+ addEventAndCheckResult(
+ testHarness, TableChange.builder().posDeleteRecordCount(5L).build(),
2);
// No trigger in this case
- addEventAndCheckResult(testHarness,
TableChange.builder().dataFileSize(1L).build(), 3);
- addEventAndCheckResult(testHarness,
TableChange.builder().deleteFileSize(1L).build(), 3);
+ addEventAndCheckResult(
+ testHarness, TableChange.builder().posDeleteRecordCount(1L).build(),
2);
- addEventAndCheckResult(testHarness,
TableChange.builder().dataFileSize(1L).build(), 4);
+ addEventAndCheckResult(
+ testHarness, TableChange.builder().posDeleteRecordCount(2L).build(),
3);
}
}
@Test
- void testDeleteFileNumber() throws Exception {
+ void testEqDeleteFileCount() throws Exception {
TriggerManager manager =
manager(
sql.tableLoader(TABLE_NAME),
- new TriggerEvaluator.Builder().deleteFileNumber(3).build());
+ new TriggerEvaluator.Builder().eqDeleteFileCount(3).build());
try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger>
testHarness =
harness(manager)) {
testHarness.open();
- addEventAndCheckResult(
- testHarness,
TableChange.builder().dataFileNum(3).deleteFileNum(1).build(), 0);
- addEventAndCheckResult(testHarness,
TableChange.builder().deleteFileNum(2).build(), 1);
- addEventAndCheckResult(testHarness,
TableChange.builder().deleteFileNum(3).build(), 2);
- addEventAndCheckResult(testHarness,
TableChange.builder().deleteFileNum(10).build(), 3);
+ addEventAndCheckResult(testHarness,
TableChange.builder().eqDeleteFileCount(1).build(), 0);
+ addEventAndCheckResult(testHarness,
TableChange.builder().eqDeleteFileCount(2).build(), 1);
+ addEventAndCheckResult(testHarness,
TableChange.builder().eqDeleteFileCount(3).build(), 2);
+ addEventAndCheckResult(testHarness,
TableChange.builder().eqDeleteFileCount(10).build(), 3);
+
+ // No trigger in this case
+ addEventAndCheckResult(testHarness,
TableChange.builder().eqDeleteFileCount(1).build(), 3);
+ addEventAndCheckResult(testHarness,
TableChange.builder().eqDeleteFileCount(1).build(), 3);
+
+ addEventAndCheckResult(testHarness,
TableChange.builder().eqDeleteFileCount(1).build(), 4);
+ }
+ }
+
+ @Test
+ void testEqDeleteRecordCount() throws Exception {
+ TriggerManager manager =
+ manager(
+ sql.tableLoader(TABLE_NAME),
+ new TriggerEvaluator.Builder().eqDeleteRecordCount(3).build());
+ try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger>
testHarness =
+ harness(manager)) {
+ testHarness.open();
+
+ addEventAndCheckResult(testHarness,
TableChange.builder().eqDeleteRecordCount(1L).build(), 0);
+ addEventAndCheckResult(testHarness,
TableChange.builder().eqDeleteRecordCount(2L).build(), 1);
+ addEventAndCheckResult(testHarness,
TableChange.builder().eqDeleteRecordCount(5L).build(), 2);
// No trigger in this case
- addEventAndCheckResult(testHarness,
TableChange.builder().deleteFileNum(1).build(), 3);
- addEventAndCheckResult(testHarness,
TableChange.builder().deleteFileNum(1).build(), 3);
+ addEventAndCheckResult(testHarness,
TableChange.builder().eqDeleteRecordCount(1L).build(), 2);
- addEventAndCheckResult(testHarness,
TableChange.builder().deleteFileNum(1).build(), 4);
+ addEventAndCheckResult(testHarness,
TableChange.builder().eqDeleteRecordCount(2L).build(), 3);
}
}
@@ -183,7 +247,7 @@ class TestTriggerManager extends OperatorTestBase {
harness(manager)) {
testHarness.open();
- TableChange event =
TableChange.builder().dataFileSize(1).commitNum(1).build();
+ TableChange event =
TableChange.builder().dataFileCount(1).commitCount(1).build();
// Wait for some time
testHarness.processElement(event, EVENT_TIME);
@@ -225,7 +289,7 @@ class TestTriggerManager extends OperatorTestBase {
testHarness.open();
testHarness.processElement(
- TableChange.builder().dataFileSize(1).commitNum(1).build(),
EVENT_TIME);
+ TableChange.builder().dataFileCount(1).commitCount(1).build(),
EVENT_TIME);
assertThat(testHarness.extractOutputValues()).isEmpty();
@@ -240,7 +304,7 @@ class TestTriggerManager extends OperatorTestBase {
testHarness.open();
// Arrives the first real change which triggers the recovery process
- testHarness.processElement(TableChange.builder().commitNum(1).build(),
EVENT_TIME_2);
+ testHarness.processElement(TableChange.builder().commitCount(1).build(),
EVENT_TIME_2);
assertTriggers(
testHarness.extractOutputValues(),
Lists.newArrayList(Trigger.recovery(testHarness.getProcessingTime())));
@@ -261,11 +325,11 @@ class TestTriggerManager extends OperatorTestBase {
harness(manager)) {
testHarness.open();
- addEventAndCheckResult(testHarness,
TableChange.builder().commitNum(2).build(), 1);
+ addEventAndCheckResult(testHarness,
TableChange.builder().commitCount(2).build(), 1);
long currentTime = testHarness.getProcessingTime();
// No new fire yet
- addEventAndCheckResult(testHarness,
TableChange.builder().commitNum(2).build(), 1);
+ addEventAndCheckResult(testHarness,
TableChange.builder().commitCount(2).build(), 1);
// Check that the trigger fired after the delay
testHarness.setProcessingTime(currentTime + DELAY);
@@ -281,11 +345,11 @@ class TestTriggerManager extends OperatorTestBase {
harness(manager)) {
testHarness.open();
- addEventAndCheckResult(testHarness,
TableChange.builder().commitNum(2).build(), 1);
+ addEventAndCheckResult(testHarness,
TableChange.builder().commitCount(2).build(), 1);
// Create a lock to prevent execution, and check that there is no result
assertThat(lock.tryLock()).isTrue();
- addEventAndCheckResult(testHarness,
TableChange.builder().commitNum(2).build(), 1);
+ addEventAndCheckResult(testHarness,
TableChange.builder().commitCount(2).build(), 1);
long currentTime = testHarness.getProcessingTime();
// Remove the lock, and still no trigger
@@ -331,7 +395,7 @@ class TestTriggerManager extends OperatorTestBase {
++processingTime;
expected.add(Trigger.recovery(processingTime));
testHarness.setProcessingTime(processingTime);
- testHarness.processElement(TableChange.builder().commitNum(2).build(),
processingTime);
+ testHarness.processElement(TableChange.builder().commitCount(2).build(),
processingTime);
assertTriggers(testHarness.extractOutputValues(), expected);
// Nothing happens until the recovery is finished
@@ -347,7 +411,7 @@ class TestTriggerManager extends OperatorTestBase {
// Still no results as the recovery is ongoing
++processingTime;
testHarness.setProcessingTime(processingTime);
- testHarness.processElement(TableChange.builder().commitNum(2).build(),
processingTime);
+ testHarness.processElement(TableChange.builder().commitCount(2).build(),
processingTime);
assertTriggers(testHarness.extractOutputValues(), expected);
// Simulate the action of removing lock and recoveryLock by downstream
lock cleaner when it
@@ -383,8 +447,8 @@ class TestTriggerManager extends OperatorTestBase {
lockFactory,
Lists.newArrayList(NAME_1, NAME_2),
Lists.newArrayList(
- new TriggerEvaluator.Builder().commitNumber(2).build(),
- new TriggerEvaluator.Builder().commitNumber(4).build()),
+ new TriggerEvaluator.Builder().commitCount(2).build(),
+ new TriggerEvaluator.Builder().commitCount(4).build()),
1L,
1L);
source
@@ -400,7 +464,7 @@ class TestTriggerManager extends OperatorTestBase {
jobClient = env.executeAsync();
// This one doesn't trigger - tests NOTHING_TO_TRIGGER
- source.sendRecord(TableChange.builder().commitNum(1).build());
+ source.sendRecord(TableChange.builder().commitCount(1).build());
Awaitility.await()
.until(
@@ -412,7 +476,7 @@ class TestTriggerManager extends OperatorTestBase {
});
// Trigger one of the tasks - tests TRIGGERED
- source.sendRecord(TableChange.builder().commitNum(1).build());
+ source.sendRecord(TableChange.builder().commitCount(1).build());
// Wait until we receive the trigger
assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
assertThat(
@@ -421,7 +485,7 @@ class TestTriggerManager extends OperatorTestBase {
lock.unlock();
// Trigger both of the tasks - tests TRIGGERED
- source.sendRecord(TableChange.builder().commitNum(2).build());
+ source.sendRecord(TableChange.builder().commitCount(2).build());
// Wait until we receive the trigger
assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
lock.unlock();
@@ -472,14 +536,14 @@ class TestTriggerManager extends OperatorTestBase {
jobClient = env.executeAsync();
// Start the first trigger
- source.sendRecord(TableChange.builder().commitNum(2).build());
+ source.sendRecord(TableChange.builder().commitCount(2).build());
assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
// Remove the lock to allow the next trigger
lock.unlock();
// The second trigger will be blocked
- source.sendRecord(TableChange.builder().commitNum(2).build());
+ source.sendRecord(TableChange.builder().commitCount(2).build());
Awaitility.await()
.until(
() ->
@@ -518,11 +582,11 @@ class TestTriggerManager extends OperatorTestBase {
jobClient = env.executeAsync();
// Start the first trigger - notice that we do not remove the lock after
the trigger
- source.sendRecord(TableChange.builder().commitNum(2).build());
+ source.sendRecord(TableChange.builder().commitCount(2).build());
assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
// The second trigger will be blocked by the lock
- source.sendRecord(TableChange.builder().commitNum(2).build());
+ source.sendRecord(TableChange.builder().commitCount(2).build());
Awaitility.await()
.until(
() ->
@@ -589,13 +653,13 @@ class TestTriggerManager extends OperatorTestBase {
tableLoader,
lockFactory,
Lists.newArrayList(NAME_1),
- Lists.newArrayList(new
TriggerEvaluator.Builder().commitNumber(2).build()),
+ Lists.newArrayList(new
TriggerEvaluator.Builder().commitCount(2).build()),
minFireDelayMs,
lockCheckDelayMs);
}
private TriggerManager manager(TableLoader tableLoader) {
- return manager(tableLoader, new
TriggerEvaluator.Builder().commitNumber(2).build());
+ return manager(tableLoader, new
TriggerEvaluator.Builder().commitCount(2).build());
}
private static void assertTriggers(List<Trigger> expected, List<Trigger>
actual) {