This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new f2d62757e2 Flink: Port #10484 to v1.20 (#10989)
f2d62757e2 is described below
commit f2d62757e210df5498f147aa126926477ac6c156
Author: pvary <[email protected]>
AuthorDate: Fri Aug 23 15:42:24 2024 +0200
Flink: Port #10484 to v1.20 (#10989)
---
flink/v1.20/build.gradle | 1 +
.../maintenance/operator/JdbcLockFactory.java | 321 +++++++++++
.../flink/maintenance/operator/MonitorSource.java | 4 +-
.../flink/maintenance/operator/TableChange.java | 43 +-
.../operator/TableMaintenanceMetrics.java | 34 ++
.../flink/maintenance/operator/Trigger.java | 72 +++
.../maintenance/operator/TriggerEvaluator.java | 128 +++++
.../maintenance/operator/TriggerLockFactory.java | 63 +++
.../flink/maintenance/operator/TriggerManager.java | 339 +++++++++++
.../maintenance/operator/ConstantsForTests.java | 29 +
.../operator/MetricsReporterFactoryForTests.java | 153 +++++
.../maintenance/operator/OperatorTestBase.java | 107 +++-
.../maintenance/operator/TestJdbcLockFactory.java | 57 ++
.../maintenance/operator/TestLockFactoryBase.java | 80 +++
.../maintenance/operator/TestMonitorSource.java | 28 +-
.../maintenance/operator/TestTriggerManager.java | 622 +++++++++++++++++++++
...he.flink.metrics.reporter.MetricReporterFactory | 16 +
17 files changed, 2081 insertions(+), 16 deletions(-)
diff --git a/flink/v1.20/build.gradle b/flink/v1.20/build.gradle
index f2e1fb51a1..294c88d907 100644
--- a/flink/v1.20/build.gradle
+++ b/flink/v1.20/build.gradle
@@ -119,6 +119,7 @@
project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
testImplementation libs.awaitility
testImplementation libs.assertj.core
+ testImplementation libs.sqlite.jdbc
}
test {
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java
new file mode 100644
index 0000000000..21c8935abe
--- /dev/null
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLNonTransientConnectionException;
+import java.sql.SQLTimeoutException;
+import java.sql.SQLTransientConnectionException;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.jdbc.JdbcClientPool;
+import org.apache.iceberg.jdbc.UncheckedInterruptedException;
+import org.apache.iceberg.jdbc.UncheckedSQLException;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JdbcLockFactory implements TriggerLockFactory {
+ private static final Logger LOG =
LoggerFactory.getLogger(JdbcLockFactory.class);
+
+ @VisibleForTesting
+ static final String INIT_LOCK_TABLES_PROPERTY =
"flink-maintenance.lock.jdbc.init-lock-tables";
+
+ private static final String LOCK_TABLE_NAME = "flink_maintenance_lock";
+ private static final int LOCK_ID_MAX_LENGTH = 100;
+ private static final String CREATE_LOCK_TABLE_SQL =
+ String.format(
+ "CREATE TABLE %s "
+ + "(LOCK_TYPE CHAR(1) NOT NULL, "
+ + "LOCK_ID VARCHAR(%s) NOT NULL, "
+ + "INSTANCE_ID CHAR(36) NOT NULL, PRIMARY KEY (LOCK_TYPE,
LOCK_ID))",
+ LOCK_TABLE_NAME, LOCK_ID_MAX_LENGTH);
+
+ private static final String CREATE_LOCK_SQL =
+ String.format(
+ "INSERT INTO %s (LOCK_TYPE, LOCK_ID, INSTANCE_ID) VALUES (?, ?, ?)",
LOCK_TABLE_NAME);
+ private static final String GET_LOCK_SQL =
+ String.format("SELECT INSTANCE_ID FROM %s WHERE LOCK_TYPE=? AND
LOCK_ID=?", LOCK_TABLE_NAME);
+ private static final String DELETE_LOCK_SQL =
+ String.format(
+ "DELETE FROM %s WHERE LOCK_TYPE=? AND LOCK_ID=? AND INSTANCE_ID=?",
LOCK_TABLE_NAME);
+
+ private final String uri;
+ private final String lockId;
+ private final Map<String, String> properties;
+ private transient JdbcClientPool pool;
+
+ /**
+ * Creates a new {@link TriggerLockFactory}. The lockId should be unique
between the users of the
+ * same uri.
+ *
+ * @param uri of the jdbc connection
+ * @param lockId which should indentify the job and the table
+ * @param properties used for creating the jdbc connection pool
+ */
+ public JdbcLockFactory(String uri, String lockId, Map<String, String>
properties) {
+ Preconditions.checkNotNull(uri, "JDBC connection URI is required");
+ Preconditions.checkNotNull(properties, "Properties map is required");
+ Preconditions.checkArgument(
+ lockId.length() < LOCK_ID_MAX_LENGTH,
+ "Invalid prefix length: lockId should be shorter than %s",
+ LOCK_ID_MAX_LENGTH);
+ this.uri = uri;
+ this.lockId = lockId;
+ this.properties = properties;
+ }
+
+ @Override
+ public void open() {
+ this.pool = new JdbcClientPool(1, uri, properties);
+
+ if (PropertyUtil.propertyAsBoolean(properties, INIT_LOCK_TABLES_PROPERTY,
false)) {
+ initializeLockTables();
+ }
+ }
+
+ /** Only used in testing to share the jdbc pool */
+ @VisibleForTesting
+ void open(JdbcLockFactory other) {
+ this.pool = other.pool;
+ }
+
+ @Override
+ public Lock createLock() {
+ return new Lock(pool, lockId, Type.MAINTENANCE);
+ }
+
+ @Override
+ public Lock createRecoveryLock() {
+ return new Lock(pool, lockId, Type.RECOVERY);
+ }
+
+ @Override
+ public void close() throws IOException {
+ pool.close();
+ }
+
+ private void initializeLockTables() {
+ LOG.debug("Creating database tables (if missing) to store table
maintenance locks");
+ try {
+ pool.run(
+ conn -> {
+ DatabaseMetaData dbMeta = conn.getMetaData();
+ ResultSet tableExists =
+ dbMeta.getTables(
+ null /* catalog name */,
+ null /* schemaPattern */,
+ LOCK_TABLE_NAME /* tableNamePattern */,
+ null /* types */);
+ if (tableExists.next()) {
+ LOG.debug("Flink maintenance lock table already exists");
+ return true;
+ }
+
+ LOG.info("Creating Flink maintenance lock table {}",
LOCK_TABLE_NAME);
+ return conn.prepareStatement(CREATE_LOCK_TABLE_SQL).execute();
+ });
+
+ } catch (SQLTimeoutException e) {
+ throw new UncheckedSQLException(
+ e, "Cannot initialize JDBC table maintenance lock: Query timed out");
+ } catch (SQLTransientConnectionException |
SQLNonTransientConnectionException e) {
+ throw new UncheckedSQLException(
+ e, "Cannot initialize JDBC table maintenance lock: Connection
failed");
+ } catch (SQLException e) {
+ throw new UncheckedSQLException(e, "Cannot initialize JDBC table
maintenance lock");
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedInterruptedException(e, "Interrupted in call to
initialize");
+ }
+ }
+
+ public static class Lock implements TriggerLockFactory.Lock {
+ private final JdbcClientPool pool;
+ private final String lockId;
+ private final Type type;
+
+ public Lock(JdbcClientPool pool, String lockId, Type type) {
+ this.pool = pool;
+ this.lockId = lockId;
+ this.type = type;
+ }
+
+ @Override
+ public boolean tryLock() {
+ if (isHeld()) {
+ LOG.info("Lock is already held for {}", this);
+ return false;
+ }
+
+ String newInstanceId = UUID.randomUUID().toString();
+ try {
+ return pool.run(
+ conn -> {
+ try (PreparedStatement sql =
conn.prepareStatement(CREATE_LOCK_SQL)) {
+ sql.setString(1, type.key);
+ sql.setString(2, lockId);
+ sql.setString(3, newInstanceId);
+ int count = sql.executeUpdate();
+ LOG.info(
+ "Created {} lock with instanceId {} with row count {}",
+ this,
+ newInstanceId,
+ count);
+ return count == 1;
+ }
+ });
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedInterruptedException(e, "Interrupted during
tryLock");
+ } catch (SQLException e) {
+ // SQL exception happened when creating the lock. Check if the lock
creation was
+ // successful behind the scenes.
+ if (newInstanceId.equals(instanceId())) {
+ return true;
+ } else {
+ throw new UncheckedSQLException(e, "Failed to create %s lock", this);
+ }
+ }
+ }
+
+ @SuppressWarnings("checkstyle:NestedTryDepth")
+ @Override
+ public boolean isHeld() {
+ try {
+ return pool.run(
+ conn -> {
+ try (PreparedStatement sql =
conn.prepareStatement(GET_LOCK_SQL)) {
+ sql.setString(1, type.key);
+ sql.setString(2, lockId);
+ try (ResultSet rs = sql.executeQuery()) {
+ return rs.next();
+ }
+ }
+ });
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedInterruptedException(e, "Interrupted during
isHeld");
+ } catch (SQLException e) {
+ // SQL exception happened when getting lock information
+ throw new UncheckedSQLException(e, "Failed to get lock information for
%s", this);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:NestedTryDepth")
+ @Override
+ public void unlock() {
+ try {
+ // Possible concurrency issue:
+ // - `unlock` and `tryLock` happens at the same time when there is an
existing lock
+ //
+ // Steps:
+ // 1. `unlock` removes the lock in the database, but there is a
temporary connection failure
+ // 2. `lock` founds that there is no lock, so creates a new lock
+ // 3. `unlock` retires the lock removal and removes the new lock
+ //
+ // To prevent the situation above we fetch the current lockId, and
remove the lock
+ // only with the given id.
+ String instanceId = instanceId();
+
+ if (instanceId != null) {
+ pool.run(
+ conn -> {
+ try (PreparedStatement sql =
conn.prepareStatement(DELETE_LOCK_SQL)) {
+ sql.setString(1, type.key);
+ sql.setString(2, lockId);
+ sql.setString(3, instanceId);
+ long count = sql.executeUpdate();
+ LOG.info(
+ "Deleted {} lock with instanceId {} with row count {}",
+ this,
+ instanceId,
+ count);
+ } catch (SQLException e) {
+ // SQL exception happened when deleting lock information
+ throw new UncheckedSQLException(
+ e, "Failed to delete %s lock with instanceId %s", this,
instanceId);
+ }
+
+ return null;
+ });
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedInterruptedException(e, "Interrupted during
unlock");
+ } catch (UncheckedSQLException e) {
+ throw e;
+ } catch (SQLException e) {
+ // SQL exception happened when getting/updating lock information
+ throw new UncheckedSQLException(e, "Failed to remove lock %s", this);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("type", type).add("lockId",
lockId).toString();
+ }
+
+ @SuppressWarnings("checkstyle:NestedTryDepth")
+ private String instanceId() {
+ try {
+ return pool.run(
+ conn -> {
+ try (PreparedStatement sql =
conn.prepareStatement(GET_LOCK_SQL)) {
+ sql.setString(1, type.key);
+ sql.setString(2, lockId);
+ try (ResultSet rs = sql.executeQuery()) {
+ if (rs.next()) {
+ return rs.getString(1);
+ } else {
+ return null;
+ }
+ }
+ } catch (SQLException e) {
+ // SQL exception happened when getting lock information
+ throw new UncheckedSQLException(e, "Failed to get lock
information for %s", type);
+ }
+ });
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedInterruptedException(e, "Interrupted during
unlock");
+ } catch (SQLException e) {
+ throw new UncheckedSQLException(e, "Failed to get lock information for
%s", type);
+ }
+ }
+ }
+
+ private enum Type {
+ MAINTENANCE("m"),
+ RECOVERY("r");
+
+ private String key;
+
+ Type(String key) {
+ this.key = key;
+ }
+ }
+}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
index d74b2349b1..89efffa15f 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
/** Monitors an Iceberg table for changes */
@Internal
-public class MonitorSource extends SingleThreadedIteratorSource<TableChange> {
+class MonitorSource extends SingleThreadedIteratorSource<TableChange> {
private static final Logger LOG =
LoggerFactory.getLogger(MonitorSource.class);
private final TableLoader tableLoader;
@@ -58,7 +58,7 @@ public class MonitorSource extends
SingleThreadedIteratorSource<TableChange> {
* @param rateLimiterStrategy limits the frequency the table is checked
* @param maxReadBack sets the number of snapshots read before stopping
change collection
*/
- public MonitorSource(
+ MonitorSource(
TableLoader tableLoader, RateLimiterStrategy rateLimiterStrategy, long
maxReadBack) {
Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
Preconditions.checkNotNull(rateLimiterStrategy, "Rate limiter strategy
should no be null");
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
index 452ed80ed0..7d0b94e97d 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
@@ -35,7 +35,7 @@ class TableChange {
private long deleteFileSize;
private int commitNum;
- TableChange(
+ private TableChange(
int dataFileNum, int deleteFileNum, long dataFileSize, long
deleteFileSize, int commitNum) {
this.dataFileNum = dataFileNum;
this.deleteFileNum = deleteFileNum;
@@ -67,6 +67,10 @@ class TableChange {
return new TableChange(0, 0, 0L, 0L, 0);
}
+ static Builder builder() {
+ return new Builder();
+ }
+
int dataFileNum() {
return dataFileNum;
}
@@ -130,4 +134,41 @@ class TableChange {
public int hashCode() {
return Objects.hash(dataFileNum, deleteFileNum, dataFileSize,
deleteFileSize, commitNum);
}
+
+ static class Builder {
+ private int dataFileNum = 0;
+ private int deleteFileNum = 0;
+ private long dataFileSize = 0L;
+ private long deleteFileSize = 0L;
+ private int commitNum = 0;
+
+ public Builder dataFileNum(int newDataFileNum) {
+ this.dataFileNum = newDataFileNum;
+ return this;
+ }
+
+ public Builder deleteFileNum(int newDeleteFileNum) {
+ this.deleteFileNum = newDeleteFileNum;
+ return this;
+ }
+
+ public Builder dataFileSize(long newDataFileSize) {
+ this.dataFileSize = newDataFileSize;
+ return this;
+ }
+
+ public Builder deleteFileSize(long newDeleteFileSize) {
+ this.deleteFileSize = newDeleteFileSize;
+ return this;
+ }
+
+ public Builder commitNum(int newCommitNum) {
+ this.commitNum = newCommitNum;
+ return this;
+ }
+
+ public TableChange build() {
+ return new TableChange(dataFileNum, deleteFileNum, dataFileSize,
deleteFileSize, commitNum);
+ }
+ }
}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java
new file mode 100644
index 0000000000..ec0fd920c3
--- /dev/null
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+public class TableMaintenanceMetrics {
+ public static final String GROUP_KEY = "maintenanceTask";
+ public static final String GROUP_VALUE_DEFAULT = "maintenanceTask";
+
+ // TriggerManager metrics
+ public static final String RATE_LIMITER_TRIGGERED = "rateLimiterTriggered";
+ public static final String CONCURRENT_RUN_THROTTLED =
"concurrentRunThrottled";
+ public static final String TRIGGERED = "triggered";
+ public static final String NOTHING_TO_TRIGGER = "nothingToTrigger";
+
+ private TableMaintenanceMetrics() {
+ // do not instantiate
+ }
+}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/Trigger.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/Trigger.java
new file mode 100644
index 0000000000..85c6c8dbdd
--- /dev/null
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/Trigger.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+@Internal
+class Trigger {
+ private final long timestamp;
+ private final SerializableTable table;
+ private final Integer taskId;
+ private final boolean isRecovery;
+
+ private Trigger(long timestamp, SerializableTable table, Integer taskId,
boolean isRecovery) {
+ this.timestamp = timestamp;
+ this.table = table;
+ this.taskId = taskId;
+ this.isRecovery = isRecovery;
+ }
+
+ static Trigger create(long timestamp, SerializableTable table, int taskId) {
+ return new Trigger(timestamp, table, taskId, false);
+ }
+
+ static Trigger recovery(long timestamp) {
+ return new Trigger(timestamp, null, null, true);
+ }
+
+ long timestamp() {
+ return timestamp;
+ }
+
+ SerializableTable table() {
+ return table;
+ }
+
+ Integer taskId() {
+ return taskId;
+ }
+
+ boolean isRecovery() {
+ return isRecovery;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("timestamp", timestamp)
+ .add("table", table == null ? null : table.name())
+ .add("taskId", taskId)
+ .add("isRecovery", isRecovery)
+ .toString();
+ }
+}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerEvaluator.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerEvaluator.java
new file mode 100644
index 0000000000..37e4e3afd4
--- /dev/null
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerEvaluator.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.List;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+class TriggerEvaluator implements Serializable {
+ private static final Logger LOG =
LoggerFactory.getLogger(TriggerEvaluator.class);
+ private final List<Predicate> predicates;
+
+ private TriggerEvaluator(List<Predicate> predicates) {
+ Preconditions.checkArgument(!predicates.isEmpty(), "Provide at least 1
condition.");
+
+ this.predicates = predicates;
+ }
+
+ boolean check(TableChange event, long lastTimeMs, long currentTimeMs) {
+ boolean result =
+ predicates.stream()
+ .anyMatch(
+ p -> {
+ try {
+ return p.evaluate(event, lastTimeMs, currentTimeMs);
+ } catch (Exception e) {
+ throw new RuntimeException("Error accessing state", e);
+ }
+ });
+ LOG.debug(
+ "Checking event: {}, at {}, last: {} with result: {}",
+ event,
+ currentTimeMs,
+ lastTimeMs,
+ result);
+ return result;
+ }
+
+ static class Builder implements Serializable {
+ private Integer commitNumber;
+ private Integer fileNumber;
+ private Long fileSize;
+ private Integer deleteFileNumber;
+ private Duration timeout;
+
+ Builder commitNumber(int newCommitNumber) {
+ this.commitNumber = newCommitNumber;
+ return this;
+ }
+
+ Builder fileNumber(int newFileNumber) {
+ this.fileNumber = newFileNumber;
+ return this;
+ }
+
+ Builder fileSize(long newFileSize) {
+ this.fileSize = newFileSize;
+ return this;
+ }
+
+ Builder deleteFileNumber(int newDeleteFileNumber) {
+ this.deleteFileNumber = newDeleteFileNumber;
+ return this;
+ }
+
+ Builder timeout(Duration newTimeout) {
+ this.timeout = newTimeout;
+ return this;
+ }
+
+ TriggerEvaluator build() {
+ List<Predicate> predicates = Lists.newArrayList();
+ if (commitNumber != null) {
+ predicates.add((change, unused, unused2) -> change.commitNum() >=
commitNumber);
+ }
+
+ if (fileNumber != null) {
+ predicates.add(
+ (change, unused, unused2) ->
+ change.dataFileNum() + change.deleteFileNum() >= fileNumber);
+ }
+
+ if (fileSize != null) {
+ predicates.add(
+ (change, unused, unused2) ->
+ change.dataFileSize() + change.deleteFileSize() >= fileSize);
+ }
+
+ if (deleteFileNumber != null) {
+ predicates.add((change, unused, unused2) -> change.deleteFileNum() >=
deleteFileNumber);
+ }
+
+ if (timeout != null) {
+ predicates.add(
+ (change, lastTimeMs, currentTimeMs) ->
+ currentTimeMs - lastTimeMs >= timeout.toMillis());
+ }
+
+ return new TriggerEvaluator(predicates);
+ }
+ }
+
+ private interface Predicate extends Serializable {
+ boolean evaluate(TableChange event, long lastTimeMs, long currentTimeMs);
+ }
+}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerLockFactory.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerLockFactory.java
new file mode 100644
index 0000000000..329223d27c
--- /dev/null
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerLockFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import org.apache.flink.annotation.Experimental;
+
+/** Lock interface for handling locks for the Flink Table Maintenance jobs. */
+@Experimental
+public interface TriggerLockFactory extends Serializable, Closeable {
+ void open();
+
+ Lock createLock();
+
+ Lock createRecoveryLock();
+
+ interface Lock {
+ /**
+ * Tries to acquire a lock with a given key. Anyone already holding a lock
would prevent
+ * acquiring this lock. Not reentrant.
+ *
+ * <p>Called by {@link TriggerManager}. Implementations could assume that
are no concurrent
+ * calls for this method.
+ *
+ * @return <code>true</code> if the lock is acquired by this job,
<code>false</code> if the lock
+ * is already held by someone
+ */
+ boolean tryLock();
+
+ /**
+ * Checks if the lock is already taken.
+ *
+ * @return <code>true</code> if the lock is held by someone
+ */
+ boolean isHeld();
+
+ // TODO: Fix the link to the LockRemover when we have a final name and
implementation
+ /**
+ * Releases the lock. Should not fail if the lock is not held by anyone.
+ *
+ * <p>Called by LockRemover. Implementations could assume that are no
concurrent calls for this
+ * method.
+ */
+ void unlock();
+ }
+}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
new file mode 100644
index 0000000000..f4c3c1d47c
--- /dev/null
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TriggerManager starts the Maintenance Tasks by emitting {@link Trigger}
messages which are
+ * calculated based on the incoming {@link TableChange} messages. The
TriggerManager keeps track of
+ * the changes since the last run of the Maintenance Tasks and triggers a new
run based on the
+ * result of the {@link TriggerEvaluator}.
+ *
+ * <p>The TriggerManager prevents overlapping Maintenance Task runs using
{@link
+ * TriggerLockFactory.Lock}. The current implementation only handles conflicts
within a single job.
+ * Users should avoid scheduling maintenance for the same table in different
Flink jobs.
+ *
+ * <p>The TriggerManager should run as a global operator. {@link
KeyedProcessFunction} is used, so
+ * the timer functions are available, but the key is not used.
+ */
+@Internal
+class TriggerManager extends KeyedProcessFunction<Boolean, TableChange,
Trigger>
+ implements CheckpointedFunction {
+ private static final Logger LOG =
LoggerFactory.getLogger(TriggerManager.class);
+
+ private final TableLoader tableLoader;
+ private final TriggerLockFactory lockFactory;
+ private final List<String> taskNames;
+ private final List<TriggerEvaluator> evaluators;
+ private final long minFireDelayMs;
+ private final long lockCheckDelayMs;
+ private transient Counter rateLimiterTriggeredCounter;
+ private transient Counter concurrentRunThrottledCounter;
+ private transient Counter nothingToTriggerCounter;
+ private transient List<Counter> triggerCounters;
+ private transient ValueState<Long> nextEvaluationTimeState;
+ private transient ListState<TableChange> accumulatedChangesState;
+ private transient ListState<Long> lastTriggerTimesState;
+ private transient Long nextEvaluationTime;
+ private transient List<TableChange> accumulatedChanges;
+ private transient List<Long> lastTriggerTimes;
+ private transient TriggerLockFactory.Lock lock;
+ private transient TriggerLockFactory.Lock recoveryLock;
+ private transient boolean shouldRestoreTasks = false;
+ private transient boolean inited = false;
+ // To keep the task scheduling fair we keep the last triggered task position
in memory.
+ // If we find a task to trigger, then we run it, but after it is finished,
we start from the given
+ // position to prevent "starvation" of the tasks.
+ // When there is nothing to trigger, we start from the beginning, as the
order of the tasks might
+ // be important (RewriteDataFiles first, and then RewriteManifestFiles later)
+ private transient int startsFrom = 0;
+ private transient boolean triggered = false;
+
+ TriggerManager(
+ TableLoader tableLoader,
+ TriggerLockFactory lockFactory,
+ List<String> taskNames,
+ List<TriggerEvaluator> evaluators,
+ long minFireDelayMs,
+ long lockCheckDelayMs) {
+ Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+ Preconditions.checkNotNull(lockFactory, "Lock factory should no be null");
+ Preconditions.checkArgument(
+ taskNames != null && !taskNames.isEmpty(), "Invalid task names: null
or empty");
+ Preconditions.checkArgument(
+ evaluators != null && !evaluators.isEmpty(), "Invalid evaluators: null
or empty");
+ Preconditions.checkArgument(
+ taskNames.size() == evaluators.size(), "Provide a name and evaluator
for all of the tasks");
+ Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should
be at least 1.");
+ Preconditions.checkArgument(
+ lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1
ms.");
+
+ this.tableLoader = tableLoader;
+ this.lockFactory = lockFactory;
+ this.taskNames = taskNames;
+ this.evaluators = evaluators;
+ this.minFireDelayMs = minFireDelayMs;
+ this.lockCheckDelayMs = lockCheckDelayMs;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ this.rateLimiterTriggeredCounter =
+ getRuntimeContext()
+ .getMetricGroup()
+ .addGroup(
+ TableMaintenanceMetrics.GROUP_KEY,
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+ .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED);
+ this.concurrentRunThrottledCounter =
+ getRuntimeContext()
+ .getMetricGroup()
+ .addGroup(
+ TableMaintenanceMetrics.GROUP_KEY,
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+ .counter(TableMaintenanceMetrics.CONCURRENT_RUN_THROTTLED);
+ this.nothingToTriggerCounter =
+ getRuntimeContext()
+ .getMetricGroup()
+ .addGroup(
+ TableMaintenanceMetrics.GROUP_KEY,
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+ .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER);
+ this.triggerCounters =
+ taskNames.stream()
+ .map(
+ name ->
+ getRuntimeContext()
+ .getMetricGroup()
+ .addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
+ .counter(TableMaintenanceMetrics.TRIGGERED))
+ .collect(Collectors.toList());
+
+ this.nextEvaluationTimeState =
+ getRuntimeContext()
+ .getState(new
ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG));
+ this.accumulatedChangesState =
+ getRuntimeContext()
+ .getListState(
+ new ListStateDescriptor<>(
+ "triggerManagerAccumulatedChange",
TypeInformation.of(TableChange.class)));
+ this.lastTriggerTimesState =
+ getRuntimeContext()
+ .getListState(new
ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG));
+
+ tableLoader.open();
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ if (inited) {
+ // Only store state if initialized
+ nextEvaluationTimeState.update(nextEvaluationTime);
+ accumulatedChangesState.update(accumulatedChanges);
+ lastTriggerTimesState.update(lastTriggerTimes);
+ LOG.info(
+ "Storing state: nextEvaluationTime {}, accumulatedChanges {},
lastTriggerTimes {}",
+ nextEvaluationTime,
+ accumulatedChanges,
+ lastTriggerTimes);
+ } else {
+ LOG.info("Not initialized, state is not stored");
+ }
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws
Exception {
+ LOG.info("Initializing state restored: {}", context.isRestored());
+ lockFactory.open();
+ this.lock = lockFactory.createLock();
+ this.recoveryLock = lockFactory.createRecoveryLock();
+ if (context.isRestored()) {
+ shouldRestoreTasks = true;
+ }
+ }
+
+ @Override
+ public void processElement(TableChange change, Context ctx,
Collector<Trigger> out)
+ throws Exception {
+ init(out, ctx.timerService());
+
+ accumulatedChanges.forEach(tableChange -> tableChange.merge(change));
+
+ long current = ctx.timerService().currentProcessingTime();
+ if (nextEvaluationTime == null) {
+ checkAndFire(current, ctx.timerService(), out);
+ } else {
+ LOG.info(
+ "Trigger manager rate limiter triggered current: {}, next: {},
accumulated changes: {}",
+ current,
+ nextEvaluationTime,
+ accumulatedChanges);
+ rateLimiterTriggeredCounter.inc();
+ }
+ }
+
+ @Override
+ public void onTimer(long timestamp, OnTimerContext ctx, Collector<Trigger>
out) throws Exception {
+ init(out, ctx.timerService());
+ this.nextEvaluationTime = null;
+ checkAndFire(ctx.timerService().currentProcessingTime(),
ctx.timerService(), out);
+ }
+
+ @Override
+ public void close() throws IOException {
+ tableLoader.close();
+ lockFactory.close();
+ }
+
+ private void checkAndFire(long current, TimerService timerService,
Collector<Trigger> out) {
+ if (shouldRestoreTasks) {
+ if (recoveryLock.isHeld()) {
+ // Recovered tasks in progress. Skip trigger check
+ LOG.debug("The recovery lock is still held at {}", current);
+ schedule(timerService, current + lockCheckDelayMs);
+ return;
+ } else {
+ LOG.info("The recovery is finished at {}", current);
+ shouldRestoreTasks = false;
+ }
+ }
+
+ Integer taskToStart =
+ nextTrigger(evaluators, accumulatedChanges, lastTriggerTimes, current,
startsFrom);
+ if (taskToStart == null) {
+ // Nothing to execute
+ if (!triggered) {
+ nothingToTriggerCounter.inc();
+ LOG.debug("Nothing to execute at {} for collected: {}", current,
accumulatedChanges);
+ } else {
+ LOG.debug("Execution check finished");
+ }
+
+ // Next time start from the beginning
+ startsFrom = 0;
+ triggered = false;
+ return;
+ }
+
+ if (lock.tryLock()) {
+ TableChange change = accumulatedChanges.get(taskToStart);
+ SerializableTable table =
+ (SerializableTable)
SerializableTable.copyOf(tableLoader.loadTable());
+ out.collect(Trigger.create(current, table, taskToStart));
+ LOG.debug("Fired event with time: {}, collected: {} for {}", current,
change, table.name());
+ triggerCounters.get(taskToStart).inc();
+ accumulatedChanges.set(taskToStart, TableChange.empty());
+ lastTriggerTimes.set(taskToStart, current);
+ schedule(timerService, current + minFireDelayMs);
+ startsFrom = (taskToStart + 1) % evaluators.size();
+ triggered = true;
+ } else {
+ // A task is already running, waiting for it to finish
+ LOG.info("Failed to acquire lock. Delaying task to {}", current +
lockCheckDelayMs);
+
+ startsFrom = taskToStart;
+ concurrentRunThrottledCounter.inc();
+ schedule(timerService, current + lockCheckDelayMs);
+ }
+
+ timerService.registerProcessingTimeTimer(nextEvaluationTime);
+ }
+
+ private void schedule(TimerService timerService, long time) {
+ this.nextEvaluationTime = time;
+ timerService.registerProcessingTimeTimer(time);
+ }
+
+ private static Integer nextTrigger(
+ List<TriggerEvaluator> evaluators,
+ List<TableChange> changes,
+ List<Long> lastTriggerTimes,
+ long currentTime,
+ int startPos) {
+ int current = startPos;
+ do {
+ if (evaluators
+ .get(current)
+ .check(changes.get(current), lastTriggerTimes.get(current),
currentTime)) {
+ return current;
+ }
+
+ current = (current + 1) % evaluators.size();
+ } while (current != startPos);
+
+ return null;
+ }
+
+ private void init(Collector<Trigger> out, TimerService timerService) throws
Exception {
+ if (!inited) {
+ long current = timerService.currentProcessingTime();
+
+ // Initialize from state
+ this.nextEvaluationTime = nextEvaluationTimeState.value();
+ this.accumulatedChanges =
Lists.newArrayList(accumulatedChangesState.get());
+ this.lastTriggerTimes = Lists.newArrayList(lastTriggerTimesState.get());
+
+ // Initialize if the state was empty
+ if (accumulatedChanges.isEmpty()) {
+ for (int i = 0; i < evaluators.size(); ++i) {
+ accumulatedChanges.add(TableChange.empty());
+ lastTriggerTimes.add(current);
+ }
+ }
+
+ if (shouldRestoreTasks) {
+ // When the job state is restored, there could be ongoing tasks.
+ // To prevent collision with the new triggers the following is done:
+ // - add a recovery lock
+ // - fire a recovery trigger
+ // This ensures that the tasks of the previous trigger are executed,
and the lock is removed
+ // in the end. The result of the 'tryLock' is ignored as an already
existing lock prevents
+ // collisions as well.
+ recoveryLock.tryLock();
+ out.collect(Trigger.recovery(current));
+ if (nextEvaluationTime == null) {
+ schedule(timerService, current + minFireDelayMs);
+ }
+ }
+
+ inited = true;
+ }
+ }
+}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ConstantsForTests.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ConstantsForTests.java
new file mode 100644
index 0000000000..36e162d4f0
--- /dev/null
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ConstantsForTests.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+class ConstantsForTests {
+ public static final long EVENT_TIME = 10L;
+ static final long EVENT_TIME_2 = 11L;
+ static final String DUMMY_NAME = "dummy";
+
+ private ConstantsForTests() {
+ // Do not instantiate
+ }
+}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/MetricsReporterFactoryForTests.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/MetricsReporterFactoryForTests.java
new file mode 100644
index 0000000000..7a523035b7
--- /dev/null
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/MetricsReporterFactoryForTests.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.MetricReporterFactory;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class MetricsReporterFactoryForTests implements MetricReporterFactory {
+ private static final TestMetricsReporter INSTANCE = new
TestMetricsReporter();
+ private static final Pattern FULL_METRIC_NAME =
+ Pattern.compile(
+ "\\.taskmanager\\.[^.]+\\.[^.]+\\.([^.]+)\\.\\d+\\."
+ + TableMaintenanceMetrics.GROUP_KEY
+ + "\\.([^.]+)\\.([^.]+)");
+
+ private static Map<String, Counter> counters = Maps.newConcurrentMap();
+ private static Map<String, Gauge> gauges = Maps.newConcurrentMap();
+ private static Set<String> monitoredMetricNames;
+
+ public MetricsReporterFactoryForTests() {
+ monitoredMetricNames =
+ Arrays.stream(TableMaintenanceMetrics.class.getDeclaredFields())
+ .map(
+ f -> {
+ try {
+ return f.get(null).toString();
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public MetricReporter createMetricReporter(Properties properties) {
+ return INSTANCE;
+ }
+
+ public static void reset() {
+ counters = Maps.newConcurrentMap();
+ gauges = Maps.newConcurrentMap();
+ }
+
+ public static Long counter(String name) {
+ return counterValues().get(name);
+ }
+
+ public static Long gauge(String name) {
+ return gaugeValues().get(name);
+ }
+
+ public static void assertGauges(Map<String, Long> expected) {
+ assertThat(filter(gaugeValues(), expected)).isEqualTo(filter(expected,
expected));
+ }
+
+ public static void assertCounters(Map<String, Long> expected) {
+ assertThat(filter(counterValues(), expected)).isEqualTo(filter(expected,
expected));
+ }
+
+ private static Map<String, Long> gaugeValues() {
+ return gauges.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ entry -> longName(entry.getKey()), entry -> (Long)
entry.getValue().getValue()));
+ }
+
+ private static Map<String, Long> counterValues() {
+ return counters.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ entry -> longName(entry.getKey()), entry ->
entry.getValue().getCount()));
+ }
+
+ private static Map<String, Long> filter(Map<String, Long> original,
Map<String, Long> filter) {
+ return original.entrySet().stream()
+ .filter(
+ entry -> {
+ Long filterValue = filter.get(entry.getKey());
+ return filterValue == null || filterValue != -1;
+ })
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ private static String longName(String fullName) {
+ Matcher matcher = FULL_METRIC_NAME.matcher(fullName);
+ if (!matcher.matches()) {
+ throw new RuntimeException(String.format("Can't parse simplified metrics
name %s", fullName));
+ }
+
+ return matcher.group(1) + "." + matcher.group(2) + "." + matcher.group(3);
+ }
+
+ private static class TestMetricsReporter implements MetricReporter {
+ @Override
+ public void open(MetricConfig config) {
+ // do nothing
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+
+ @Override
+ public void notifyOfAddedMetric(Metric metric, String metricName,
MetricGroup group) {
+ if (monitoredMetricNames.contains(metricName)) {
+ if (metric instanceof Counter) {
+ counters.put(group.getMetricIdentifier(metricName), (Counter)
metric);
+ }
+
+ if (metric instanceof Gauge) {
+ gauges.put(group.getMetricIdentifier(metricName), (Gauge) metric);
+ }
+ }
+ }
+
+ @Override
+ public void notifyOfRemovedMetric(Metric metric, String metricName,
MetricGroup group) {
+ // do nothing
+ }
+ }
+}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
index 272e0b693f..2258530865 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
@@ -20,16 +20,24 @@ package org.apache.iceberg.flink.maintenance.operator;
import static
org.apache.iceberg.flink.MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG;
+import java.io.File;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.iceberg.flink.FlinkCatalogFactory;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.extension.RegisterExtension;
class OperatorTestBase {
private static final int NUMBER_TASK_MANAGERS = 1;
private static final int SLOTS_PER_TASK_MANAGER = 8;
+ private static final TriggerLockFactory.Lock MAINTENANCE_LOCK = new
MemoryLock();
+ private static final TriggerLockFactory.Lock RECOVERY_LOCK = new
MemoryLock();
static final String TABLE_NAME = "test_table";
@@ -39,7 +47,7 @@ class OperatorTestBase {
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(NUMBER_TASK_MANAGERS)
.setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
- .setConfiguration(new
Configuration(DISABLE_CLASSLOADER_CHECK_CONFIG))
+ .setConfiguration(config())
.build());
@RegisterExtension
@@ -48,4 +56,101 @@ class OperatorTestBase {
"catalog",
ImmutableMap.of("type", "iceberg",
FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hadoop"),
"db");
+
+ private static Configuration config() {
+ Configuration config = new Configuration(DISABLE_CLASSLOADER_CHECK_CONFIG);
+ MetricOptions.forReporter(config, "test_reporter")
+ .set(MetricOptions.REPORTER_FACTORY_CLASS,
MetricsReporterFactoryForTests.class.getName());
+ return config;
+ }
+
+ protected static TriggerLockFactory lockFactory() {
+ return new TriggerLockFactory() {
+ @Override
+ public void open() {
+ MAINTENANCE_LOCK.unlock();
+ RECOVERY_LOCK.unlock();
+ }
+
+ @Override
+ public Lock createLock() {
+ return MAINTENANCE_LOCK;
+ }
+
+ @Override
+ public Lock createRecoveryLock() {
+ return RECOVERY_LOCK;
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+ };
+ }
+
+ /**
+ * Close the {@link JobClient} and wait for the job closure. If the
savepointDir is specified, it
+ * stops the job with a savepoint.
+ *
+ * @param jobClient the job to close
+ * @param savepointDir the savepointDir to store the last savepoint. If
<code>null</code> then
+ * stop without a savepoint.
+ * @return configuration for restarting the job from the savepoint
+ */
+ public static Configuration closeJobClient(JobClient jobClient, File
savepointDir) {
+ Configuration conf = new Configuration();
+ if (jobClient != null) {
+ if (savepointDir != null) {
+ // Stop with savepoint
+ jobClient.stopWithSavepoint(false, savepointDir.getPath(),
SavepointFormatType.CANONICAL);
+ // Wait until the savepoint is created and the job has been stopped
+ Awaitility.await().until(() ->
savepointDir.listFiles(File::isDirectory).length == 1);
+ conf.set(
+ SavepointConfigOptions.SAVEPOINT_PATH,
+ savepointDir.listFiles(File::isDirectory)[0].getAbsolutePath());
+ } else {
+ jobClient.cancel();
+ }
+
+ // Wait until the job has been stopped
+ Awaitility.await().until(() ->
jobClient.getJobStatus().get().isTerminalState());
+ return conf;
+ }
+
+ return null;
+ }
+
+ /**
+ * Close the {@link JobClient} and wait for the job closure.
+ *
+ * @param jobClient the job to close
+ */
+ public static void closeJobClient(JobClient jobClient) {
+ closeJobClient(jobClient, null);
+ }
+
+ private static class MemoryLock implements TriggerLockFactory.Lock {
+ boolean locked = false;
+
+ @Override
+ public boolean tryLock() {
+ if (locked) {
+ return false;
+ } else {
+ locked = true;
+ return true;
+ }
+ }
+
+ @Override
+ public boolean isHeld() {
+ return locked;
+ }
+
+ @Override
+ public void unlock() {
+ locked = false;
+ }
+ }
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestJdbcLockFactory.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestJdbcLockFactory.java
new file mode 100644
index 0000000000..051d09d92b
--- /dev/null
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestJdbcLockFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static
org.apache.iceberg.flink.maintenance.operator.JdbcLockFactory.INIT_LOCK_TABLES_PROPERTY;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.jdbc.JdbcCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.jupiter.api.Test;
+
+class TestJdbcLockFactory extends TestLockFactoryBase {
+ @Override
+ TriggerLockFactory lockFactory() {
+ return lockFactory("tableName");
+ }
+
+ @Test
+ void testMultiTableLock() {
+ JdbcLockFactory other = lockFactory("tableName2");
+ other.open((JdbcLockFactory) this.lockFactory);
+ TriggerLockFactory.Lock lock1 = lockFactory.createLock();
+ TriggerLockFactory.Lock lock2 = other.createLock();
+ assertThat(lock1.tryLock()).isTrue();
+ assertThat(lock2.tryLock()).isTrue();
+ }
+
+ private JdbcLockFactory lockFactory(String tableName) {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user");
+ properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password");
+ properties.put(INIT_LOCK_TABLES_PROPERTY, "true");
+
+ return new JdbcLockFactory(
+ "jdbc:sqlite:file::memory:?ic" +
UUID.randomUUID().toString().replace("-", ""),
+ tableName,
+ properties);
+ }
+}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockFactoryBase.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockFactoryBase.java
new file mode 100644
index 0000000000..bf9e86f253
--- /dev/null
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockFactoryBase.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+abstract class TestLockFactoryBase {
+ protected TriggerLockFactory lockFactory;
+
+ abstract TriggerLockFactory lockFactory();
+
+ @BeforeEach
+ void before() {
+ this.lockFactory = lockFactory();
+ lockFactory.open();
+ }
+
+ @AfterEach
+ void after() throws IOException {
+ lockFactory.close();
+ }
+
+ @Test
+ void testTryLock() {
+ TriggerLockFactory.Lock lock1 = lockFactory.createLock();
+ TriggerLockFactory.Lock lock2 = lockFactory.createLock();
+ assertThat(lock1.tryLock()).isTrue();
+ assertThat(lock1.tryLock()).isFalse();
+ assertThat(lock2.tryLock()).isFalse();
+ }
+
+ @Test
+ void testUnLock() {
+ TriggerLockFactory.Lock lock = lockFactory.createLock();
+ assertThat(lock.tryLock()).isTrue();
+
+ lock.unlock();
+ assertThat(lock.tryLock()).isTrue();
+ }
+
+ @Test
+ void testNoConflictWithRecoveryLock() {
+ TriggerLockFactory.Lock lock1 = lockFactory.createLock();
+ TriggerLockFactory.Lock lock2 = lockFactory.createRecoveryLock();
+ assertThat(lock1.tryLock()).isTrue();
+ assertThat(lock2.tryLock()).isTrue();
+ }
+
+ @Test
+ void testDoubleUnLock() {
+ TriggerLockFactory.Lock lock = lockFactory.createLock();
+ assertThat(lock.tryLock()).isTrue();
+
+ lock.unlock();
+ lock.unlock();
+ assertThat(lock.tryLock()).isTrue();
+ assertThat(lock.tryLock()).isFalse();
+ }
+}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
index 876d642145..8c02601025 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
@@ -18,7 +18,6 @@
*/
package org.apache.iceberg.flink.maintenance.operator;
-import static
org.apache.iceberg.flink.maintenance.operator.FlinkStreamingTestUtils.closeJobClient;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -35,6 +34,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
@@ -161,7 +161,8 @@ class TestMonitorSource extends OperatorTestBase {
}
// The first non-empty event should contain the expected value
- return newEvent.equals(new TableChange(1, 0, size, 0L, 1));
+ return newEvent.equals(
+
TableChange.builder().dataFileNum(1).dataFileSize(size).commitNum(1).build());
});
} finally {
closeJobClient(jobClient);
@@ -348,15 +349,18 @@ class TestMonitorSource extends OperatorTestBase {
List<DeleteFile> deleteFiles =
Lists.newArrayList(table.currentSnapshot().addedDeleteFiles(table.io()).iterator());
- long dataSize = dataFiles.stream().mapToLong(d ->
d.fileSizeInBytes()).sum();
- long deleteSize = deleteFiles.stream().mapToLong(d ->
d.fileSizeInBytes()).sum();
- boolean hasDelete =
table.currentSnapshot().addedDeleteFiles(table.io()).iterator().hasNext();
-
- return new TableChange(
- previous.dataFileNum() + dataFiles.size(),
- previous.deleteFileNum() + deleteFiles.size(),
- previous.dataFileSize() + dataSize,
- previous.deleteFileSize() + deleteSize,
- previous.commitNum() + 1);
+ long dataSize =
dataFiles.stream().mapToLong(ContentFile::fileSizeInBytes).sum();
+ long deleteSize =
deleteFiles.stream().mapToLong(ContentFile::fileSizeInBytes).sum();
+
+ TableChange newChange = previous.copy();
+ newChange.merge(
+ TableChange.builder()
+ .dataFileNum(dataFiles.size())
+ .dataFileSize(dataSize)
+ .deleteFileNum(deleteFiles.size())
+ .deleteFileSize(deleteSize)
+ .commitNum(1)
+ .build());
+ return newChange;
}
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java
new file mode 100644
index 0000000000..55e64f3e84
--- /dev/null
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static
org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.DUMMY_NAME;
+import static
org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.EVENT_TIME;
+import static
org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.EVENT_TIME_2;
+import static
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.CONCURRENT_RUN_THROTTLED;
+import static
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.GROUP_VALUE_DEFAULT;
+import static
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.NOTHING_TO_TRIGGER;
+import static
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED;
+import static
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.TRIGGERED;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Stream;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+class TestTriggerManager extends OperatorTestBase {
+ private static final long DELAY = 10L;
+ private static final String NAME_1 = "name1";
+ private static final String NAME_2 = "name2";
+ private long processingTime = 0L;
+ private TriggerLockFactory lockFactory;
+ private TriggerLockFactory.Lock lock;
+ private TriggerLockFactory.Lock recoveringLock;
+
+ @BeforeEach
+ void before() {
+ sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME);
+ this.lockFactory = lockFactory();
+ lockFactory.open();
+ this.lock = lockFactory.createLock();
+ this.recoveringLock = lockFactory.createRecoveryLock();
+ lock.unlock();
+ recoveringLock.unlock();
+ MetricsReporterFactoryForTests.reset();
+ }
+
+ @AfterEach
+ void after() throws IOException {
+ lockFactory.close();
+ }
+
+ @Test
+ void testCommitNumber() throws Exception {
+ TriggerManager manager =
+ manager(
+ sql.tableLoader(TABLE_NAME), new
TriggerEvaluator.Builder().commitNumber(3).build());
+ try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger>
testHarness =
+ harness(manager)) {
+ testHarness.open();
+
+ addEventAndCheckResult(testHarness,
TableChange.builder().commitNum(1).build(), 0);
+ addEventAndCheckResult(testHarness,
TableChange.builder().commitNum(2).build(), 1);
+ addEventAndCheckResult(testHarness,
TableChange.builder().commitNum(3).build(), 2);
+ addEventAndCheckResult(testHarness,
TableChange.builder().commitNum(10).build(), 3);
+
+ // No trigger in this case
+ addEventAndCheckResult(testHarness,
TableChange.builder().commitNum(1).build(), 3);
+ addEventAndCheckResult(testHarness,
TableChange.builder().commitNum(1).build(), 3);
+
+ addEventAndCheckResult(testHarness,
TableChange.builder().commitNum(1).build(), 4);
+ }
+ }
+
+ @Test
+ void testFileNumber() throws Exception {
+ TriggerManager manager =
+ manager(sql.tableLoader(TABLE_NAME), new
TriggerEvaluator.Builder().fileNumber(3).build());
+ try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger>
testHarness =
+ harness(manager)) {
+ testHarness.open();
+
+ addEventAndCheckResult(testHarness,
TableChange.builder().dataFileNum(1).build(), 0);
+
+ addEventAndCheckResult(
+ testHarness,
TableChange.builder().dataFileNum(1).deleteFileNum(1).build(), 1);
+ addEventAndCheckResult(testHarness,
TableChange.builder().deleteFileNum(3).build(), 2);
+ addEventAndCheckResult(
+ testHarness,
TableChange.builder().dataFileNum(5).deleteFileNum(7).build(), 3);
+
+ // No trigger in this case
+ addEventAndCheckResult(testHarness,
TableChange.builder().dataFileNum(1).build(), 3);
+ addEventAndCheckResult(testHarness,
TableChange.builder().deleteFileNum(1).build(), 3);
+
+ addEventAndCheckResult(testHarness,
TableChange.builder().dataFileNum(1).build(), 4);
+ }
+ }
+
+ @Test
+ void testFileSize() throws Exception {
+ TriggerManager manager =
+ manager(sql.tableLoader(TABLE_NAME), new
TriggerEvaluator.Builder().fileSize(3).build());
+ try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger>
testHarness =
+ harness(manager)) {
+ testHarness.open();
+
+ addEventAndCheckResult(testHarness,
TableChange.builder().dataFileSize(1L).build(), 0);
+ addEventAndCheckResult(
+ testHarness,
TableChange.builder().dataFileSize(1L).deleteFileSize(1L).build(), 1);
+ addEventAndCheckResult(testHarness,
TableChange.builder().deleteFileSize(3L).build(), 2);
+ addEventAndCheckResult(
+ testHarness,
TableChange.builder().dataFileSize(5L).deleteFileSize(7L).build(), 3);
+
+ // No trigger in this case
+ addEventAndCheckResult(testHarness,
TableChange.builder().dataFileSize(1L).build(), 3);
+ addEventAndCheckResult(testHarness,
TableChange.builder().deleteFileSize(1L).build(), 3);
+
+ addEventAndCheckResult(testHarness,
TableChange.builder().dataFileSize(1L).build(), 4);
+ }
+ }
+
+ @Test
+ void testDeleteFileNumber() throws Exception {
+ TriggerManager manager =
+ manager(
+ sql.tableLoader(TABLE_NAME),
+ new TriggerEvaluator.Builder().deleteFileNumber(3).build());
+ try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger>
testHarness =
+ harness(manager)) {
+ testHarness.open();
+
+ addEventAndCheckResult(
+ testHarness,
TableChange.builder().dataFileNum(3).deleteFileNum(1).build(), 0);
+ addEventAndCheckResult(testHarness,
TableChange.builder().deleteFileNum(2).build(), 1);
+ addEventAndCheckResult(testHarness,
TableChange.builder().deleteFileNum(3).build(), 2);
+ addEventAndCheckResult(testHarness,
TableChange.builder().deleteFileNum(10).build(), 3);
+
+ // No trigger in this case
+ addEventAndCheckResult(testHarness,
TableChange.builder().deleteFileNum(1).build(), 3);
+ addEventAndCheckResult(testHarness,
TableChange.builder().deleteFileNum(1).build(), 3);
+
+ addEventAndCheckResult(testHarness,
TableChange.builder().deleteFileNum(1).build(), 4);
+ }
+ }
+
+ @Test
+ void testTimeout() throws Exception {
+ TriggerManager manager =
+ manager(
+ sql.tableLoader(TABLE_NAME),
+ new
TriggerEvaluator.Builder().timeout(Duration.ofSeconds(1)).build());
+ try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger>
testHarness =
+ harness(manager)) {
+ testHarness.open();
+
+ TableChange event =
TableChange.builder().dataFileSize(1).commitNum(1).build();
+
+ // Wait for some time
+ testHarness.processElement(event, EVENT_TIME);
+ assertThat(testHarness.extractOutputValues()).isEmpty();
+
+ // Wait for the timeout to expire
+ long newTime = EVENT_TIME + Duration.ofSeconds(1).toMillis();
+ testHarness.setProcessingTime(newTime);
+ testHarness.processElement(event, newTime);
+ assertThat(testHarness.extractOutputValues()).hasSize(1);
+
+ // Remove the lock to allow the next trigger
+ lock.unlock();
+
+ // Send a new event
+ testHarness.setProcessingTime(newTime + 1);
+ testHarness.processElement(event, newTime);
+
+ // No trigger yet
+ assertThat(testHarness.extractOutputValues()).hasSize(1);
+
+ // Send a new event
+ newTime += Duration.ofSeconds(1).toMillis();
+ testHarness.setProcessingTime(newTime);
+ testHarness.processElement(event, newTime);
+
+ // New trigger should arrive
+ assertThat(testHarness.extractOutputValues()).hasSize(2);
+ }
+ }
+
+ @Test
+ void testStateRestore() throws Exception {
+ TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+ TriggerManager manager = manager(tableLoader);
+ OperatorSubtaskState state;
+ try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger>
testHarness =
+ harness(manager)) {
+ testHarness.open();
+
+ testHarness.processElement(
+ TableChange.builder().dataFileSize(1).commitNum(1).build(),
EVENT_TIME);
+
+ assertThat(testHarness.extractOutputValues()).isEmpty();
+
+ state = testHarness.snapshot(1, EVENT_TIME);
+ }
+
+ // Restore the state, write some more data, create a checkpoint, check the
data which is written
+ manager = manager(tableLoader);
+ try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger>
testHarness =
+ harness(manager)) {
+ testHarness.initializeState(state);
+ testHarness.open();
+
+ // Arrives the first real change which triggers the recovery process
+ testHarness.processElement(TableChange.builder().commitNum(1).build(),
EVENT_TIME_2);
+ assertTriggers(
+ testHarness.extractOutputValues(),
+
Lists.newArrayList(Trigger.recovery(testHarness.getProcessingTime())));
+
+ // Remove the lock to allow the next trigger
+ recoveringLock.unlock();
+ testHarness.setProcessingTime(EVENT_TIME_2);
+ // At this point the output contains the recovery trigger and the real
trigger
+ assertThat(testHarness.extractOutputValues()).hasSize(2);
+ }
+ }
+
+ @Test
+ void testMinFireDelay() throws Exception {
+ TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+ TriggerManager manager = manager(tableLoader, DELAY, 1);
+ try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger>
testHarness =
+ harness(manager)) {
+ testHarness.open();
+
+ addEventAndCheckResult(testHarness,
TableChange.builder().commitNum(2).build(), 1);
+ long currentTime = testHarness.getProcessingTime();
+
+ // No new fire yet
+ addEventAndCheckResult(testHarness,
TableChange.builder().commitNum(2).build(), 1);
+
+ // Check that the trigger fired after the delay
+ testHarness.setProcessingTime(currentTime + DELAY);
+ assertThat(testHarness.extractOutputValues()).hasSize(2);
+ }
+ }
+
+ @Test
+ void testLockCheckDelay() throws Exception {
+ TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+ TriggerManager manager = manager(tableLoader, 1, DELAY);
+ try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger>
testHarness =
+ harness(manager)) {
+ testHarness.open();
+
+ addEventAndCheckResult(testHarness,
TableChange.builder().commitNum(2).build(), 1);
+
+ // Create a lock to prevent execution, and check that there is no result
+ assertThat(lock.tryLock()).isTrue();
+ addEventAndCheckResult(testHarness,
TableChange.builder().commitNum(2).build(), 1);
+ long currentTime = testHarness.getProcessingTime();
+
+ // Remove the lock, and still no trigger
+ lock.unlock();
+ assertThat(testHarness.extractOutputValues()).hasSize(1);
+
+ // Check that the trigger fired after the delay
+ testHarness.setProcessingTime(currentTime + DELAY);
+ assertThat(testHarness.extractOutputValues()).hasSize(2);
+ }
+ }
+
+ /**
+ * Simulating recovery scenarios where there is a leftover table lock, and
ongoing maintenance
+ * task.
+ *
+ * @param locked if a lock exists on the table on job recovery
+ * @param runningTask is running and continues to run after job recovery
+ */
+ @ParameterizedTest
+ @MethodSource("parametersForTestRecovery")
+ void testRecovery(boolean locked, boolean runningTask) throws Exception {
+ TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+ TriggerManager manager = manager(tableLoader);
+ OperatorSubtaskState state;
+ try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger>
testHarness =
+ harness(manager)) {
+ testHarness.open();
+ state = testHarness.snapshot(1, EVENT_TIME);
+ }
+
+ if (locked) {
+ assertThat(lock.tryLock()).isTrue();
+ }
+
+ manager = manager(tableLoader);
+ List<Trigger> expected = Lists.newArrayListWithExpectedSize(3);
+ try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger>
testHarness =
+ harness(manager)) {
+ testHarness.initializeState(state);
+ testHarness.open();
+
+ ++processingTime;
+ expected.add(Trigger.recovery(processingTime));
+ testHarness.setProcessingTime(processingTime);
+ testHarness.processElement(TableChange.builder().commitNum(2).build(),
processingTime);
+ assertTriggers(testHarness.extractOutputValues(), expected);
+
+ // Nothing happens until the recovery is finished
+ ++processingTime;
+ testHarness.setProcessingTime(processingTime);
+ assertTriggers(testHarness.extractOutputValues(), expected);
+
+ if (runningTask) {
+ // Simulate the action of the recovered maintenance task lock removal
when it finishes
+ lock.unlock();
+ }
+
+ // Still no results as the recovery is ongoing
+ ++processingTime;
+ testHarness.setProcessingTime(processingTime);
+ testHarness.processElement(TableChange.builder().commitNum(2).build(),
processingTime);
+ assertTriggers(testHarness.extractOutputValues(), expected);
+
+ // Simulate the action of removing lock and recoveryLock by downstream
lock cleaner when it
+ // received recovery trigger
+ lock.unlock();
+ recoveringLock.unlock();
+
+ // Emit only a single trigger
+ ++processingTime;
+ testHarness.setProcessingTime(processingTime);
+ // Releasing lock will create a new snapshot, and we receive this in the
trigger
+ expected.add(
+ Trigger.create(
+ processingTime,
+ (SerializableTable)
SerializableTable.copyOf(tableLoader.loadTable()),
+ 0));
+ assertTriggers(testHarness.extractOutputValues(), expected);
+ }
+ }
+
+ @Test
+ void testTriggerMetrics() throws Exception {
+ TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ ManualSource<TableChange> source =
+ new ManualSource<>(env, TypeInformation.of(TableChange.class));
+ CollectingSink<Trigger> sink = new CollectingSink<>();
+
+ TriggerManager manager =
+ new TriggerManager(
+ tableLoader,
+ lockFactory,
+ Lists.newArrayList(NAME_1, NAME_2),
+ Lists.newArrayList(
+ new TriggerEvaluator.Builder().commitNumber(2).build(),
+ new TriggerEvaluator.Builder().commitNumber(4).build()),
+ 1L,
+ 1L);
+ source
+ .dataStream()
+ .keyBy(unused -> true)
+ .process(manager)
+ .name(DUMMY_NAME)
+ .forceNonParallel()
+ .sinkTo(sink);
+
+ JobClient jobClient = null;
+ try {
+ jobClient = env.executeAsync();
+
+ // This one doesn't trigger - tests NOTHING_TO_TRIGGER
+ source.sendRecord(TableChange.builder().commitNum(1).build());
+
+ Awaitility.await()
+ .until(
+ () -> {
+ Long notingCounter =
+ MetricsReporterFactoryForTests.counter(
+ DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." +
NOTHING_TO_TRIGGER);
+ return notingCounter != null && notingCounter.equals(1L);
+ });
+
+ // Trigger one of the tasks - tests TRIGGERED
+ source.sendRecord(TableChange.builder().commitNum(1).build());
+ // Wait until we receive the trigger
+ assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
+ assertThat(
+ MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + NAME_1
+ "." + TRIGGERED))
+ .isEqualTo(1L);
+ lock.unlock();
+
+ // Trigger both of the tasks - tests TRIGGERED
+ source.sendRecord(TableChange.builder().commitNum(2).build());
+ // Wait until we receive the trigger
+ assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
+ lock.unlock();
+ assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
+ lock.unlock();
+ assertThat(
+ MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + NAME_1
+ "." + TRIGGERED))
+ .isEqualTo(2L);
+ assertThat(
+ MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + NAME_2
+ "." + TRIGGERED))
+ .isEqualTo(1L);
+
+ // Final check all the counters
+ MetricsReporterFactoryForTests.assertCounters(
+ new ImmutableMap.Builder<String, Long>()
+ .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." +
RATE_LIMITER_TRIGGERED, -1L)
+ .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." +
CONCURRENT_RUN_THROTTLED, -1L)
+ .put(DUMMY_NAME + "." + NAME_1 + "." + TRIGGERED, 2L)
+ .put(DUMMY_NAME + "." + NAME_2 + "." + TRIGGERED, 1L)
+ .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." +
NOTHING_TO_TRIGGER, 1L)
+ .build());
+ } finally {
+ closeJobClient(jobClient);
+ }
+ }
+
+ @Test
+ void testRateLimiterMetrics() throws Exception {
+ TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ ManualSource<TableChange> source =
+ new ManualSource<>(env, TypeInformation.of(TableChange.class));
+ CollectingSink<Trigger> sink = new CollectingSink<>();
+
+ // High delay, so only triggered once
+ TriggerManager manager = manager(tableLoader, 1_000_000L, 1L);
+ source
+ .dataStream()
+ .keyBy(unused -> true)
+ .process(manager)
+ .name(DUMMY_NAME)
+ .forceNonParallel()
+ .sinkTo(sink);
+
+ JobClient jobClient = null;
+ try {
+ jobClient = env.executeAsync();
+
+ // Start the first trigger
+ source.sendRecord(TableChange.builder().commitNum(2).build());
+ assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
+
+ // Remove the lock to allow the next trigger
+ lock.unlock();
+
+ // The second trigger will be blocked
+ source.sendRecord(TableChange.builder().commitNum(2).build());
+ Awaitility.await()
+ .until(
+ () ->
+ MetricsReporterFactoryForTests.counter(
+ DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." +
RATE_LIMITER_TRIGGERED)
+ .equals(1L));
+
+ // Final check all the counters
+ assertCounters(1L, 0L);
+ } finally {
+ closeJobClient(jobClient);
+ }
+ }
+
+ @Test
+ void testConcurrentRunMetrics() throws Exception {
+ TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ ManualSource<TableChange> source =
+ new ManualSource<>(env, TypeInformation.of(TableChange.class));
+ CollectingSink<Trigger> sink = new CollectingSink<>();
+
+ // High delay, so only triggered once
+ TriggerManager manager = manager(tableLoader, 1L, 1_000_000L);
+ source
+ .dataStream()
+ .keyBy(unused -> true)
+ .process(manager)
+ .name(DUMMY_NAME)
+ .forceNonParallel()
+ .sinkTo(sink);
+
+ JobClient jobClient = null;
+ try {
+ jobClient = env.executeAsync();
+
+ // Start the first trigger - notice that we do not remove the lock after
the trigger
+ source.sendRecord(TableChange.builder().commitNum(2).build());
+ assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
+
+ // The second trigger will be blocked by the lock
+ source.sendRecord(TableChange.builder().commitNum(2).build());
+ Awaitility.await()
+ .until(
+ () ->
+ MetricsReporterFactoryForTests.counter(
+ DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." +
CONCURRENT_RUN_THROTTLED)
+ .equals(1L));
+
+ // Final check all the counters
+ assertCounters(0L, 1L);
+ } finally {
+ closeJobClient(jobClient);
+ }
+ }
+
+ private static Stream<Arguments> parametersForTestRecovery() {
+ return Stream.of(
+ Arguments.of(true, false),
+ Arguments.of(true, false),
+ Arguments.of(false, true),
+ Arguments.of(false, false));
+ }
+
+ private void assertCounters(long rateLimiterTrigger, long
concurrentRunTrigger) {
+ MetricsReporterFactoryForTests.assertCounters(
+ new ImmutableMap.Builder<String, Long>()
+ .put(
+ DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." +
RATE_LIMITER_TRIGGERED,
+ rateLimiterTrigger)
+ .put(
+ DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." +
CONCURRENT_RUN_THROTTLED,
+ concurrentRunTrigger)
+ .put(DUMMY_NAME + "." + NAME_1 + "." + TRIGGERED, 1L)
+ .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." +
NOTHING_TO_TRIGGER, 0L)
+ .build());
+ }
+
+ private KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange,
Trigger> harness(
+ TriggerManager manager) throws Exception {
+ return new KeyedOneInputStreamOperatorTestHarness<>(
+ new KeyedProcessOperator<>(manager), value -> true, Types.BOOLEAN);
+ }
+
+ private void addEventAndCheckResult(
+ OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness,
+ TableChange event,
+ int expectedSize)
+ throws Exception {
+ ++processingTime;
+ testHarness.setProcessingTime(processingTime);
+ testHarness.processElement(event, processingTime);
+ assertThat(testHarness.extractOutputValues()).hasSize(expectedSize);
+ // Remove the lock to allow the next trigger
+ lock.unlock();
+ }
+
+ private TriggerManager manager(TableLoader tableLoader, TriggerEvaluator
evaluator) {
+ return new TriggerManager(
+ tableLoader, lockFactory, Lists.newArrayList(NAME_1),
Lists.newArrayList(evaluator), 1, 1);
+ }
+
+ private TriggerManager manager(
+ TableLoader tableLoader, long minFireDelayMs, long lockCheckDelayMs) {
+ return new TriggerManager(
+ tableLoader,
+ lockFactory,
+ Lists.newArrayList(NAME_1),
+ Lists.newArrayList(new
TriggerEvaluator.Builder().commitNumber(2).build()),
+ minFireDelayMs,
+ lockCheckDelayMs);
+ }
+
+ private TriggerManager manager(TableLoader tableLoader) {
+ return manager(tableLoader, new
TriggerEvaluator.Builder().commitNumber(2).build());
+ }
+
+ private static void assertTriggers(List<Trigger> expected, List<Trigger>
actual) {
+ assertThat(actual).hasSize(expected.size());
+ for (int i = 0; i < expected.size(); ++i) {
+ Trigger expectedTrigger = expected.get(i);
+ Trigger actualTrigger = actual.get(i);
+
assertThat(actualTrigger.timestamp()).isEqualTo(expectedTrigger.timestamp());
+ assertThat(actualTrigger.taskId()).isEqualTo(expectedTrigger.taskId());
+
assertThat(actualTrigger.isRecovery()).isEqualTo(expectedTrigger.isRecovery());
+ if (expectedTrigger.table() == null) {
+ assertThat(actualTrigger.table()).isNull();
+ } else {
+ Iterator<Snapshot> expectedSnapshots =
expectedTrigger.table().snapshots().iterator();
+ Iterator<Snapshot> actualSnapshots =
actualTrigger.table().snapshots().iterator();
+ while (expectedSnapshots.hasNext()) {
+ assertThat(actualSnapshots.hasNext()).isTrue();
+ assertThat(expectedSnapshots.next().snapshotId())
+ .isEqualTo(actualSnapshots.next().snapshotId());
+ }
+ }
+ }
+ }
+}
diff --git
a/flink/v1.20/flink/src/test/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory
b/flink/v1.20/flink/src/test/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory
new file mode 100644
index 0000000000..952255a52b
--- /dev/null
+++
b/flink/v1.20/flink/src/test/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.iceberg.flink.maintenance.operator.MetricsReporterFactoryForTests