This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new f2d62757e2 Flink: Port #10484 to v1.20 (#10989)
f2d62757e2 is described below

commit f2d62757e210df5498f147aa126926477ac6c156
Author: pvary <[email protected]>
AuthorDate: Fri Aug 23 15:42:24 2024 +0200

    Flink: Port #10484 to v1.20 (#10989)
---
 flink/v1.20/build.gradle                           |   1 +
 .../maintenance/operator/JdbcLockFactory.java      | 321 +++++++++++
 .../flink/maintenance/operator/MonitorSource.java  |   4 +-
 .../flink/maintenance/operator/TableChange.java    |  43 +-
 .../operator/TableMaintenanceMetrics.java          |  34 ++
 .../flink/maintenance/operator/Trigger.java        |  72 +++
 .../maintenance/operator/TriggerEvaluator.java     | 128 +++++
 .../maintenance/operator/TriggerLockFactory.java   |  63 +++
 .../flink/maintenance/operator/TriggerManager.java | 339 +++++++++++
 .../maintenance/operator/ConstantsForTests.java    |  29 +
 .../operator/MetricsReporterFactoryForTests.java   | 153 +++++
 .../maintenance/operator/OperatorTestBase.java     | 107 +++-
 .../maintenance/operator/TestJdbcLockFactory.java  |  57 ++
 .../maintenance/operator/TestLockFactoryBase.java  |  80 +++
 .../maintenance/operator/TestMonitorSource.java    |  28 +-
 .../maintenance/operator/TestTriggerManager.java   | 622 +++++++++++++++++++++
 ...he.flink.metrics.reporter.MetricReporterFactory |  16 +
 17 files changed, 2081 insertions(+), 16 deletions(-)

diff --git a/flink/v1.20/build.gradle b/flink/v1.20/build.gradle
index f2e1fb51a1..294c88d907 100644
--- a/flink/v1.20/build.gradle
+++ b/flink/v1.20/build.gradle
@@ -119,6 +119,7 @@ 
project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
 
     testImplementation libs.awaitility
     testImplementation libs.assertj.core
+    testImplementation libs.sqlite.jdbc
   }
 
   test {
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java
new file mode 100644
index 0000000000..21c8935abe
--- /dev/null
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLNonTransientConnectionException;
+import java.sql.SQLTimeoutException;
+import java.sql.SQLTransientConnectionException;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.jdbc.JdbcClientPool;
+import org.apache.iceberg.jdbc.UncheckedInterruptedException;
+import org.apache.iceberg.jdbc.UncheckedSQLException;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JdbcLockFactory implements TriggerLockFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(JdbcLockFactory.class);
+
+  @VisibleForTesting
+  static final String INIT_LOCK_TABLES_PROPERTY = 
"flink-maintenance.lock.jdbc.init-lock-tables";
+
+  private static final String LOCK_TABLE_NAME = "flink_maintenance_lock";
+  private static final int LOCK_ID_MAX_LENGTH = 100;
+  private static final String CREATE_LOCK_TABLE_SQL =
+      String.format(
+          "CREATE TABLE %s "
+              + "(LOCK_TYPE CHAR(1) NOT NULL, "
+              + "LOCK_ID VARCHAR(%s) NOT NULL, "
+              + "INSTANCE_ID CHAR(36) NOT NULL, PRIMARY KEY (LOCK_TYPE, 
LOCK_ID))",
+          LOCK_TABLE_NAME, LOCK_ID_MAX_LENGTH);
+
+  private static final String CREATE_LOCK_SQL =
+      String.format(
+          "INSERT INTO %s (LOCK_TYPE, LOCK_ID, INSTANCE_ID) VALUES (?, ?, ?)", 
LOCK_TABLE_NAME);
+  private static final String GET_LOCK_SQL =
+      String.format("SELECT INSTANCE_ID FROM %s WHERE LOCK_TYPE=? AND 
LOCK_ID=?", LOCK_TABLE_NAME);
+  private static final String DELETE_LOCK_SQL =
+      String.format(
+          "DELETE FROM %s WHERE LOCK_TYPE=? AND LOCK_ID=? AND INSTANCE_ID=?", 
LOCK_TABLE_NAME);
+
+  private final String uri;
+  private final String lockId;
+  private final Map<String, String> properties;
+  private transient JdbcClientPool pool;
+
+  /**
+   * Creates a new {@link TriggerLockFactory}. The lockId should be unique 
between the users of the
+   * same uri.
+   *
+   * @param uri of the jdbc connection
+   * @param lockId which should indentify the job and the table
+   * @param properties used for creating the jdbc connection pool
+   */
+  public JdbcLockFactory(String uri, String lockId, Map<String, String> 
properties) {
+    Preconditions.checkNotNull(uri, "JDBC connection URI is required");
+    Preconditions.checkNotNull(properties, "Properties map is required");
+    Preconditions.checkArgument(
+        lockId.length() < LOCK_ID_MAX_LENGTH,
+        "Invalid prefix length: lockId should be shorter than %s",
+        LOCK_ID_MAX_LENGTH);
+    this.uri = uri;
+    this.lockId = lockId;
+    this.properties = properties;
+  }
+
+  @Override
+  public void open() {
+    this.pool = new JdbcClientPool(1, uri, properties);
+
+    if (PropertyUtil.propertyAsBoolean(properties, INIT_LOCK_TABLES_PROPERTY, 
false)) {
+      initializeLockTables();
+    }
+  }
+
+  /** Only used in testing to share the jdbc pool */
+  @VisibleForTesting
+  void open(JdbcLockFactory other) {
+    this.pool = other.pool;
+  }
+
+  @Override
+  public Lock createLock() {
+    return new Lock(pool, lockId, Type.MAINTENANCE);
+  }
+
+  @Override
+  public Lock createRecoveryLock() {
+    return new Lock(pool, lockId, Type.RECOVERY);
+  }
+
+  @Override
+  public void close() throws IOException {
+    pool.close();
+  }
+
+  private void initializeLockTables() {
+    LOG.debug("Creating database tables (if missing) to store table 
maintenance locks");
+    try {
+      pool.run(
+          conn -> {
+            DatabaseMetaData dbMeta = conn.getMetaData();
+            ResultSet tableExists =
+                dbMeta.getTables(
+                    null /* catalog name */,
+                    null /* schemaPattern */,
+                    LOCK_TABLE_NAME /* tableNamePattern */,
+                    null /* types */);
+            if (tableExists.next()) {
+              LOG.debug("Flink maintenance lock table already exists");
+              return true;
+            }
+
+            LOG.info("Creating Flink maintenance lock table {}", 
LOCK_TABLE_NAME);
+            return conn.prepareStatement(CREATE_LOCK_TABLE_SQL).execute();
+          });
+
+    } catch (SQLTimeoutException e) {
+      throw new UncheckedSQLException(
+          e, "Cannot initialize JDBC table maintenance lock: Query timed out");
+    } catch (SQLTransientConnectionException | 
SQLNonTransientConnectionException e) {
+      throw new UncheckedSQLException(
+          e, "Cannot initialize JDBC table maintenance lock: Connection 
failed");
+    } catch (SQLException e) {
+      throw new UncheckedSQLException(e, "Cannot initialize JDBC table 
maintenance lock");
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new UncheckedInterruptedException(e, "Interrupted in call to 
initialize");
+    }
+  }
+
+  public static class Lock implements TriggerLockFactory.Lock {
+    private final JdbcClientPool pool;
+    private final String lockId;
+    private final Type type;
+
+    public Lock(JdbcClientPool pool, String lockId, Type type) {
+      this.pool = pool;
+      this.lockId = lockId;
+      this.type = type;
+    }
+
+    @Override
+    public boolean tryLock() {
+      if (isHeld()) {
+        LOG.info("Lock is already held for {}", this);
+        return false;
+      }
+
+      String newInstanceId = UUID.randomUUID().toString();
+      try {
+        return pool.run(
+            conn -> {
+              try (PreparedStatement sql = 
conn.prepareStatement(CREATE_LOCK_SQL)) {
+                sql.setString(1, type.key);
+                sql.setString(2, lockId);
+                sql.setString(3, newInstanceId);
+                int count = sql.executeUpdate();
+                LOG.info(
+                    "Created {} lock with instanceId {} with row count {}",
+                    this,
+                    newInstanceId,
+                    count);
+                return count == 1;
+              }
+            });
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new UncheckedInterruptedException(e, "Interrupted during 
tryLock");
+      } catch (SQLException e) {
+        // SQL exception happened when creating the lock. Check if the lock 
creation was
+        // successful behind the scenes.
+        if (newInstanceId.equals(instanceId())) {
+          return true;
+        } else {
+          throw new UncheckedSQLException(e, "Failed to create %s lock", this);
+        }
+      }
+    }
+
+    @SuppressWarnings("checkstyle:NestedTryDepth")
+    @Override
+    public boolean isHeld() {
+      try {
+        return pool.run(
+            conn -> {
+              try (PreparedStatement sql = 
conn.prepareStatement(GET_LOCK_SQL)) {
+                sql.setString(1, type.key);
+                sql.setString(2, lockId);
+                try (ResultSet rs = sql.executeQuery()) {
+                  return rs.next();
+                }
+              }
+            });
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new UncheckedInterruptedException(e, "Interrupted during 
isHeld");
+      } catch (SQLException e) {
+        // SQL exception happened when getting lock information
+        throw new UncheckedSQLException(e, "Failed to get lock information for 
%s", this);
+      }
+    }
+
+    @SuppressWarnings("checkstyle:NestedTryDepth")
+    @Override
+    public void unlock() {
+      try {
+        // Possible concurrency issue:
+        // - `unlock` and `tryLock` happens at the same time when there is an 
existing lock
+        //
+        // Steps:
+        // 1. `unlock` removes the lock in the database, but there is a 
temporary connection failure
+        // 2. `lock` founds that there is no lock, so creates a new lock
+        // 3. `unlock` retires the lock removal and removes the new lock
+        //
+        // To prevent the situation above we fetch the current lockId, and 
remove the lock
+        // only with the given id.
+        String instanceId = instanceId();
+
+        if (instanceId != null) {
+          pool.run(
+              conn -> {
+                try (PreparedStatement sql = 
conn.prepareStatement(DELETE_LOCK_SQL)) {
+                  sql.setString(1, type.key);
+                  sql.setString(2, lockId);
+                  sql.setString(3, instanceId);
+                  long count = sql.executeUpdate();
+                  LOG.info(
+                      "Deleted {} lock with instanceId {} with row count {}",
+                      this,
+                      instanceId,
+                      count);
+                } catch (SQLException e) {
+                  // SQL exception happened when deleting lock information
+                  throw new UncheckedSQLException(
+                      e, "Failed to delete %s lock with instanceId %s", this, 
instanceId);
+                }
+
+                return null;
+              });
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new UncheckedInterruptedException(e, "Interrupted during 
unlock");
+      } catch (UncheckedSQLException e) {
+        throw e;
+      } catch (SQLException e) {
+        // SQL exception happened when getting/updating lock information
+        throw new UncheckedSQLException(e, "Failed to remove lock %s", this);
+      }
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this).add("type", type).add("lockId", 
lockId).toString();
+    }
+
+    @SuppressWarnings("checkstyle:NestedTryDepth")
+    private String instanceId() {
+      try {
+        return pool.run(
+            conn -> {
+              try (PreparedStatement sql = 
conn.prepareStatement(GET_LOCK_SQL)) {
+                sql.setString(1, type.key);
+                sql.setString(2, lockId);
+                try (ResultSet rs = sql.executeQuery()) {
+                  if (rs.next()) {
+                    return rs.getString(1);
+                  } else {
+                    return null;
+                  }
+                }
+              } catch (SQLException e) {
+                // SQL exception happened when getting lock information
+                throw new UncheckedSQLException(e, "Failed to get lock 
information for %s", type);
+              }
+            });
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new UncheckedInterruptedException(e, "Interrupted during 
unlock");
+      } catch (SQLException e) {
+        throw new UncheckedSQLException(e, "Failed to get lock information for 
%s", type);
+      }
+    }
+  }
+
+  private enum Type {
+    MAINTENANCE("m"),
+    RECOVERY("r");
+
+    private String key;
+
+    Type(String key) {
+      this.key = key;
+    }
+  }
+}
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
index d74b2349b1..89efffa15f 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
 
 /** Monitors an Iceberg table for changes */
 @Internal
-public class MonitorSource extends SingleThreadedIteratorSource<TableChange> {
+class MonitorSource extends SingleThreadedIteratorSource<TableChange> {
   private static final Logger LOG = 
LoggerFactory.getLogger(MonitorSource.class);
 
   private final TableLoader tableLoader;
@@ -58,7 +58,7 @@ public class MonitorSource extends 
SingleThreadedIteratorSource<TableChange> {
    * @param rateLimiterStrategy limits the frequency the table is checked
    * @param maxReadBack sets the number of snapshots read before stopping 
change collection
    */
-  public MonitorSource(
+  MonitorSource(
       TableLoader tableLoader, RateLimiterStrategy rateLimiterStrategy, long 
maxReadBack) {
     Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
     Preconditions.checkNotNull(rateLimiterStrategy, "Rate limiter strategy 
should no be null");
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
index 452ed80ed0..7d0b94e97d 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
@@ -35,7 +35,7 @@ class TableChange {
   private long deleteFileSize;
   private int commitNum;
 
-  TableChange(
+  private TableChange(
       int dataFileNum, int deleteFileNum, long dataFileSize, long 
deleteFileSize, int commitNum) {
     this.dataFileNum = dataFileNum;
     this.deleteFileNum = deleteFileNum;
@@ -67,6 +67,10 @@ class TableChange {
     return new TableChange(0, 0, 0L, 0L, 0);
   }
 
+  static Builder builder() {
+    return new Builder();
+  }
+
   int dataFileNum() {
     return dataFileNum;
   }
@@ -130,4 +134,41 @@ class TableChange {
   public int hashCode() {
     return Objects.hash(dataFileNum, deleteFileNum, dataFileSize, 
deleteFileSize, commitNum);
   }
+
+  static class Builder {
+    private int dataFileNum = 0;
+    private int deleteFileNum = 0;
+    private long dataFileSize = 0L;
+    private long deleteFileSize = 0L;
+    private int commitNum = 0;
+
+    public Builder dataFileNum(int newDataFileNum) {
+      this.dataFileNum = newDataFileNum;
+      return this;
+    }
+
+    public Builder deleteFileNum(int newDeleteFileNum) {
+      this.deleteFileNum = newDeleteFileNum;
+      return this;
+    }
+
+    public Builder dataFileSize(long newDataFileSize) {
+      this.dataFileSize = newDataFileSize;
+      return this;
+    }
+
+    public Builder deleteFileSize(long newDeleteFileSize) {
+      this.deleteFileSize = newDeleteFileSize;
+      return this;
+    }
+
+    public Builder commitNum(int newCommitNum) {
+      this.commitNum = newCommitNum;
+      return this;
+    }
+
+    public TableChange build() {
+      return new TableChange(dataFileNum, deleteFileNum, dataFileSize, 
deleteFileSize, commitNum);
+    }
+  }
 }
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java
new file mode 100644
index 0000000000..ec0fd920c3
--- /dev/null
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+public class TableMaintenanceMetrics {
+  public static final String GROUP_KEY = "maintenanceTask";
+  public static final String GROUP_VALUE_DEFAULT = "maintenanceTask";
+
+  // TriggerManager metrics
+  public static final String RATE_LIMITER_TRIGGERED = "rateLimiterTriggered";
+  public static final String CONCURRENT_RUN_THROTTLED = 
"concurrentRunThrottled";
+  public static final String TRIGGERED = "triggered";
+  public static final String NOTHING_TO_TRIGGER = "nothingToTrigger";
+
+  private TableMaintenanceMetrics() {
+    // do not instantiate
+  }
+}
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/Trigger.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/Trigger.java
new file mode 100644
index 0000000000..85c6c8dbdd
--- /dev/null
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/Trigger.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+@Internal
+class Trigger {
+  private final long timestamp;
+  private final SerializableTable table;
+  private final Integer taskId;
+  private final boolean isRecovery;
+
+  private Trigger(long timestamp, SerializableTable table, Integer taskId, 
boolean isRecovery) {
+    this.timestamp = timestamp;
+    this.table = table;
+    this.taskId = taskId;
+    this.isRecovery = isRecovery;
+  }
+
+  static Trigger create(long timestamp, SerializableTable table, int taskId) {
+    return new Trigger(timestamp, table, taskId, false);
+  }
+
+  static Trigger recovery(long timestamp) {
+    return new Trigger(timestamp, null, null, true);
+  }
+
+  long timestamp() {
+    return timestamp;
+  }
+
+  SerializableTable table() {
+    return table;
+  }
+
+  Integer taskId() {
+    return taskId;
+  }
+
+  boolean isRecovery() {
+    return isRecovery;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("timestamp", timestamp)
+        .add("table", table == null ? null : table.name())
+        .add("taskId", taskId)
+        .add("isRecovery", isRecovery)
+        .toString();
+  }
+}
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerEvaluator.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerEvaluator.java
new file mode 100644
index 0000000000..37e4e3afd4
--- /dev/null
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerEvaluator.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.List;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+class TriggerEvaluator implements Serializable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TriggerEvaluator.class);
+  private final List<Predicate> predicates;
+
+  private TriggerEvaluator(List<Predicate> predicates) {
+    Preconditions.checkArgument(!predicates.isEmpty(), "Provide at least 1 
condition.");
+
+    this.predicates = predicates;
+  }
+
+  boolean check(TableChange event, long lastTimeMs, long currentTimeMs) {
+    boolean result =
+        predicates.stream()
+            .anyMatch(
+                p -> {
+                  try {
+                    return p.evaluate(event, lastTimeMs, currentTimeMs);
+                  } catch (Exception e) {
+                    throw new RuntimeException("Error accessing state", e);
+                  }
+                });
+    LOG.debug(
+        "Checking event: {}, at {}, last: {} with result: {}",
+        event,
+        currentTimeMs,
+        lastTimeMs,
+        result);
+    return result;
+  }
+
+  static class Builder implements Serializable {
+    private Integer commitNumber;
+    private Integer fileNumber;
+    private Long fileSize;
+    private Integer deleteFileNumber;
+    private Duration timeout;
+
+    Builder commitNumber(int newCommitNumber) {
+      this.commitNumber = newCommitNumber;
+      return this;
+    }
+
+    Builder fileNumber(int newFileNumber) {
+      this.fileNumber = newFileNumber;
+      return this;
+    }
+
+    Builder fileSize(long newFileSize) {
+      this.fileSize = newFileSize;
+      return this;
+    }
+
+    Builder deleteFileNumber(int newDeleteFileNumber) {
+      this.deleteFileNumber = newDeleteFileNumber;
+      return this;
+    }
+
+    Builder timeout(Duration newTimeout) {
+      this.timeout = newTimeout;
+      return this;
+    }
+
+    TriggerEvaluator build() {
+      List<Predicate> predicates = Lists.newArrayList();
+      if (commitNumber != null) {
+        predicates.add((change, unused, unused2) -> change.commitNum() >= 
commitNumber);
+      }
+
+      if (fileNumber != null) {
+        predicates.add(
+            (change, unused, unused2) ->
+                change.dataFileNum() + change.deleteFileNum() >= fileNumber);
+      }
+
+      if (fileSize != null) {
+        predicates.add(
+            (change, unused, unused2) ->
+                change.dataFileSize() + change.deleteFileSize() >= fileSize);
+      }
+
+      if (deleteFileNumber != null) {
+        predicates.add((change, unused, unused2) -> change.deleteFileNum() >= 
deleteFileNumber);
+      }
+
+      if (timeout != null) {
+        predicates.add(
+            (change, lastTimeMs, currentTimeMs) ->
+                currentTimeMs - lastTimeMs >= timeout.toMillis());
+      }
+
+      return new TriggerEvaluator(predicates);
+    }
+  }
+
+  private interface Predicate extends Serializable {
+    boolean evaluate(TableChange event, long lastTimeMs, long currentTimeMs);
+  }
+}
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerLockFactory.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerLockFactory.java
new file mode 100644
index 0000000000..329223d27c
--- /dev/null
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerLockFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import org.apache.flink.annotation.Experimental;
+
+/** Lock interface for handling locks for the Flink Table Maintenance jobs. */
+@Experimental
+public interface TriggerLockFactory extends Serializable, Closeable {
+  void open();
+
+  Lock createLock();
+
+  Lock createRecoveryLock();
+
+  interface Lock {
+    /**
+     * Tries to acquire a lock with a given key. Anyone already holding a lock 
would prevent
+     * acquiring this lock. Not reentrant.
+     *
+     * <p>Called by {@link TriggerManager}. Implementations could assume that 
are no concurrent
+     * calls for this method.
+     *
+     * @return <code>true</code> if the lock is acquired by this job, 
<code>false</code> if the lock
+     *     is already held by someone
+     */
+    boolean tryLock();
+
+    /**
+     * Checks if the lock is already taken.
+     *
+     * @return <code>true</code> if the lock is held by someone
+     */
+    boolean isHeld();
+
+    // TODO: Fix the link to the LockRemover when we have a final name and 
implementation
+    /**
+     * Releases the lock. Should not fail if the lock is not held by anyone.
+     *
+     * <p>Called by LockRemover. Implementations could assume that are no 
concurrent calls for this
+     * method.
+     */
+    void unlock();
+  }
+}
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
new file mode 100644
index 0000000000..f4c3c1d47c
--- /dev/null
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TriggerManager starts the Maintenance Tasks by emitting {@link Trigger} 
messages which are
+ * calculated based on the incoming {@link TableChange} messages. The 
TriggerManager keeps track of
+ * the changes since the last run of the Maintenance Tasks and triggers a new 
run based on the
+ * result of the {@link TriggerEvaluator}.
+ *
+ * <p>The TriggerManager prevents overlapping Maintenance Task runs using 
{@link
+ * TriggerLockFactory.Lock}. The current implementation only handles conflicts 
within a single job.
+ * Users should avoid scheduling maintenance for the same table in different 
Flink jobs.
+ *
+ * <p>The TriggerManager should run as a global operator. {@link 
KeyedProcessFunction} is used, so
+ * the timer functions are available, but the key is not used.
+ */
+@Internal
+class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, 
Trigger>
+    implements CheckpointedFunction {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TriggerManager.class);
+
+  private final TableLoader tableLoader;
+  private final TriggerLockFactory lockFactory;
+  private final List<String> taskNames;
+  private final List<TriggerEvaluator> evaluators;
+  private final long minFireDelayMs;
+  private final long lockCheckDelayMs;
+  private transient Counter rateLimiterTriggeredCounter;
+  private transient Counter concurrentRunThrottledCounter;
+  private transient Counter nothingToTriggerCounter;
+  private transient List<Counter> triggerCounters;
+  private transient ValueState<Long> nextEvaluationTimeState;
+  private transient ListState<TableChange> accumulatedChangesState;
+  private transient ListState<Long> lastTriggerTimesState;
+  private transient Long nextEvaluationTime;
+  private transient List<TableChange> accumulatedChanges;
+  private transient List<Long> lastTriggerTimes;
+  private transient TriggerLockFactory.Lock lock;
+  private transient TriggerLockFactory.Lock recoveryLock;
+  private transient boolean shouldRestoreTasks = false;
+  private transient boolean inited = false;
+  // To keep the task scheduling fair we keep the last triggered task position 
in memory.
+  // If we find a task to trigger, then we run it, but after it is finished, 
we start from the given
+  // position to prevent "starvation" of the tasks.
+  // When there is nothing to trigger, we start from the beginning, as the 
order of the tasks might
+  // be important (RewriteDataFiles first, and then RewriteManifestFiles later)
+  private transient int startsFrom = 0;
+  private transient boolean triggered = false;
+
+  TriggerManager(
+      TableLoader tableLoader,
+      TriggerLockFactory lockFactory,
+      List<String> taskNames,
+      List<TriggerEvaluator> evaluators,
+      long minFireDelayMs,
+      long lockCheckDelayMs) {
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+    Preconditions.checkNotNull(lockFactory, "Lock factory should no be null");
+    Preconditions.checkArgument(
+        taskNames != null && !taskNames.isEmpty(), "Invalid task names: null 
or empty");
+    Preconditions.checkArgument(
+        evaluators != null && !evaluators.isEmpty(), "Invalid evaluators: null 
or empty");
+    Preconditions.checkArgument(
+        taskNames.size() == evaluators.size(), "Provide a name and evaluator 
for all of the tasks");
+    Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should 
be at least 1.");
+    Preconditions.checkArgument(
+        lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1 
ms.");
+
+    this.tableLoader = tableLoader;
+    this.lockFactory = lockFactory;
+    this.taskNames = taskNames;
+    this.evaluators = evaluators;
+    this.minFireDelayMs = minFireDelayMs;
+    this.lockCheckDelayMs = lockCheckDelayMs;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    this.rateLimiterTriggeredCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED);
+    this.concurrentRunThrottledCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.CONCURRENT_RUN_THROTTLED);
+    this.nothingToTriggerCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER);
+    this.triggerCounters =
+        taskNames.stream()
+            .map(
+                name ->
+                    getRuntimeContext()
+                        .getMetricGroup()
+                        .addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
+                        .counter(TableMaintenanceMetrics.TRIGGERED))
+            .collect(Collectors.toList());
+
+    this.nextEvaluationTimeState =
+        getRuntimeContext()
+            .getState(new 
ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG));
+    this.accumulatedChangesState =
+        getRuntimeContext()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "triggerManagerAccumulatedChange", 
TypeInformation.of(TableChange.class)));
+    this.lastTriggerTimesState =
+        getRuntimeContext()
+            .getListState(new 
ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG));
+
+    tableLoader.open();
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext context) throws Exception {
+    if (inited) {
+      // Only store state if initialized
+      nextEvaluationTimeState.update(nextEvaluationTime);
+      accumulatedChangesState.update(accumulatedChanges);
+      lastTriggerTimesState.update(lastTriggerTimes);
+      LOG.info(
+          "Storing state: nextEvaluationTime {}, accumulatedChanges {}, 
lastTriggerTimes {}",
+          nextEvaluationTime,
+          accumulatedChanges,
+          lastTriggerTimes);
+    } else {
+      LOG.info("Not initialized, state is not stored");
+    }
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) throws 
Exception {
+    LOG.info("Initializing state restored: {}", context.isRestored());
+    lockFactory.open();
+    this.lock = lockFactory.createLock();
+    this.recoveryLock = lockFactory.createRecoveryLock();
+    if (context.isRestored()) {
+      shouldRestoreTasks = true;
+    }
+  }
+
+  @Override
+  public void processElement(TableChange change, Context ctx, 
Collector<Trigger> out)
+      throws Exception {
+    init(out, ctx.timerService());
+
+    accumulatedChanges.forEach(tableChange -> tableChange.merge(change));
+
+    long current = ctx.timerService().currentProcessingTime();
+    if (nextEvaluationTime == null) {
+      checkAndFire(current, ctx.timerService(), out);
+    } else {
+      LOG.info(
+          "Trigger manager rate limiter triggered current: {}, next: {}, 
accumulated changes: {}",
+          current,
+          nextEvaluationTime,
+          accumulatedChanges);
+      rateLimiterTriggeredCounter.inc();
+    }
+  }
+
+  @Override
+  public void onTimer(long timestamp, OnTimerContext ctx, Collector<Trigger> 
out) throws Exception {
+    init(out, ctx.timerService());
+    this.nextEvaluationTime = null;
+    checkAndFire(ctx.timerService().currentProcessingTime(), 
ctx.timerService(), out);
+  }
+
+  @Override
+  public void close() throws IOException {
+    tableLoader.close();
+    lockFactory.close();
+  }
+
+  private void checkAndFire(long current, TimerService timerService, 
Collector<Trigger> out) {
+    if (shouldRestoreTasks) {
+      if (recoveryLock.isHeld()) {
+        // Recovered tasks in progress. Skip trigger check
+        LOG.debug("The recovery lock is still held at {}", current);
+        schedule(timerService, current + lockCheckDelayMs);
+        return;
+      } else {
+        LOG.info("The recovery is finished at {}", current);
+        shouldRestoreTasks = false;
+      }
+    }
+
+    Integer taskToStart =
+        nextTrigger(evaluators, accumulatedChanges, lastTriggerTimes, current, 
startsFrom);
+    if (taskToStart == null) {
+      // Nothing to execute
+      if (!triggered) {
+        nothingToTriggerCounter.inc();
+        LOG.debug("Nothing to execute at {} for collected: {}", current, 
accumulatedChanges);
+      } else {
+        LOG.debug("Execution check finished");
+      }
+
+      // Next time start from the beginning
+      startsFrom = 0;
+      triggered = false;
+      return;
+    }
+
+    if (lock.tryLock()) {
+      TableChange change = accumulatedChanges.get(taskToStart);
+      SerializableTable table =
+          (SerializableTable) 
SerializableTable.copyOf(tableLoader.loadTable());
+      out.collect(Trigger.create(current, table, taskToStart));
+      LOG.debug("Fired event with time: {}, collected: {} for {}", current, 
change, table.name());
+      triggerCounters.get(taskToStart).inc();
+      accumulatedChanges.set(taskToStart, TableChange.empty());
+      lastTriggerTimes.set(taskToStart, current);
+      schedule(timerService, current + minFireDelayMs);
+      startsFrom = (taskToStart + 1) % evaluators.size();
+      triggered = true;
+    } else {
+      // A task is already running, waiting for it to finish
+      LOG.info("Failed to acquire lock. Delaying task to {}", current + 
lockCheckDelayMs);
+
+      startsFrom = taskToStart;
+      concurrentRunThrottledCounter.inc();
+      schedule(timerService, current + lockCheckDelayMs);
+    }
+
+    timerService.registerProcessingTimeTimer(nextEvaluationTime);
+  }
+
+  private void schedule(TimerService timerService, long time) {
+    this.nextEvaluationTime = time;
+    timerService.registerProcessingTimeTimer(time);
+  }
+
+  private static Integer nextTrigger(
+      List<TriggerEvaluator> evaluators,
+      List<TableChange> changes,
+      List<Long> lastTriggerTimes,
+      long currentTime,
+      int startPos) {
+    int current = startPos;
+    do {
+      if (evaluators
+          .get(current)
+          .check(changes.get(current), lastTriggerTimes.get(current), 
currentTime)) {
+        return current;
+      }
+
+      current = (current + 1) % evaluators.size();
+    } while (current != startPos);
+
+    return null;
+  }
+
+  private void init(Collector<Trigger> out, TimerService timerService) throws 
Exception {
+    if (!inited) {
+      long current = timerService.currentProcessingTime();
+
+      // Initialize from state
+      this.nextEvaluationTime = nextEvaluationTimeState.value();
+      this.accumulatedChanges = 
Lists.newArrayList(accumulatedChangesState.get());
+      this.lastTriggerTimes = Lists.newArrayList(lastTriggerTimesState.get());
+
+      // Initialize if the state was empty
+      if (accumulatedChanges.isEmpty()) {
+        for (int i = 0; i < evaluators.size(); ++i) {
+          accumulatedChanges.add(TableChange.empty());
+          lastTriggerTimes.add(current);
+        }
+      }
+
+      if (shouldRestoreTasks) {
+        // When the job state is restored, there could be ongoing tasks.
+        // To prevent collision with the new triggers the following is done:
+        //  - add a recovery lock
+        //  - fire a recovery trigger
+        // This ensures that the tasks of the previous trigger are executed, 
and the lock is removed
+        // in the end. The result of the 'tryLock' is ignored as an already 
existing lock prevents
+        // collisions as well.
+        recoveryLock.tryLock();
+        out.collect(Trigger.recovery(current));
+        if (nextEvaluationTime == null) {
+          schedule(timerService, current + minFireDelayMs);
+        }
+      }
+
+      inited = true;
+    }
+  }
+}
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ConstantsForTests.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ConstantsForTests.java
new file mode 100644
index 0000000000..36e162d4f0
--- /dev/null
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ConstantsForTests.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+class ConstantsForTests {
+  public static final long EVENT_TIME = 10L;
+  static final long EVENT_TIME_2 = 11L;
+  static final String DUMMY_NAME = "dummy";
+
+  private ConstantsForTests() {
+    // Do not instantiate
+  }
+}
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/MetricsReporterFactoryForTests.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/MetricsReporterFactoryForTests.java
new file mode 100644
index 0000000000..7a523035b7
--- /dev/null
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/MetricsReporterFactoryForTests.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.MetricReporterFactory;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class MetricsReporterFactoryForTests implements MetricReporterFactory {
+  private static final TestMetricsReporter INSTANCE = new 
TestMetricsReporter();
+  private static final Pattern FULL_METRIC_NAME =
+      Pattern.compile(
+          "\\.taskmanager\\.[^.]+\\.[^.]+\\.([^.]+)\\.\\d+\\."
+              + TableMaintenanceMetrics.GROUP_KEY
+              + "\\.([^.]+)\\.([^.]+)");
+
+  private static Map<String, Counter> counters = Maps.newConcurrentMap();
+  private static Map<String, Gauge> gauges = Maps.newConcurrentMap();
+  private static Set<String> monitoredMetricNames;
+
+  public MetricsReporterFactoryForTests() {
+    monitoredMetricNames =
+        Arrays.stream(TableMaintenanceMetrics.class.getDeclaredFields())
+            .map(
+                f -> {
+                  try {
+                    return f.get(null).toString();
+                  } catch (IllegalAccessException e) {
+                    throw new RuntimeException(e);
+                  }
+                })
+            .collect(Collectors.toSet());
+  }
+
+  @Override
+  public MetricReporter createMetricReporter(Properties properties) {
+    return INSTANCE;
+  }
+
+  public static void reset() {
+    counters = Maps.newConcurrentMap();
+    gauges = Maps.newConcurrentMap();
+  }
+
+  public static Long counter(String name) {
+    return counterValues().get(name);
+  }
+
+  public static Long gauge(String name) {
+    return gaugeValues().get(name);
+  }
+
+  public static void assertGauges(Map<String, Long> expected) {
+    assertThat(filter(gaugeValues(), expected)).isEqualTo(filter(expected, 
expected));
+  }
+
+  public static void assertCounters(Map<String, Long> expected) {
+    assertThat(filter(counterValues(), expected)).isEqualTo(filter(expected, 
expected));
+  }
+
+  private static Map<String, Long> gaugeValues() {
+    return gauges.entrySet().stream()
+        .collect(
+            Collectors.toMap(
+                entry -> longName(entry.getKey()), entry -> (Long) 
entry.getValue().getValue()));
+  }
+
+  private static Map<String, Long> counterValues() {
+    return counters.entrySet().stream()
+        .collect(
+            Collectors.toMap(
+                entry -> longName(entry.getKey()), entry -> 
entry.getValue().getCount()));
+  }
+
+  private static Map<String, Long> filter(Map<String, Long> original, 
Map<String, Long> filter) {
+    return original.entrySet().stream()
+        .filter(
+            entry -> {
+              Long filterValue = filter.get(entry.getKey());
+              return filterValue == null || filterValue != -1;
+            })
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+  }
+
+  private static String longName(String fullName) {
+    Matcher matcher = FULL_METRIC_NAME.matcher(fullName);
+    if (!matcher.matches()) {
+      throw new RuntimeException(String.format("Can't parse simplified metrics 
name %s", fullName));
+    }
+
+    return matcher.group(1) + "." + matcher.group(2) + "." + matcher.group(3);
+  }
+
+  private static class TestMetricsReporter implements MetricReporter {
+    @Override
+    public void open(MetricConfig config) {
+      // do nothing
+    }
+
+    @Override
+    public void close() {
+      // do nothing
+    }
+
+    @Override
+    public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
+      if (monitoredMetricNames.contains(metricName)) {
+        if (metric instanceof Counter) {
+          counters.put(group.getMetricIdentifier(metricName), (Counter) 
metric);
+        }
+
+        if (metric instanceof Gauge) {
+          gauges.put(group.getMetricIdentifier(metricName), (Gauge) metric);
+        }
+      }
+    }
+
+    @Override
+    public void notifyOfRemovedMetric(Metric metric, String metricName, 
MetricGroup group) {
+      // do nothing
+    }
+  }
+}
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
index 272e0b693f..2258530865 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
@@ -20,16 +20,24 @@ package org.apache.iceberg.flink.maintenance.operator;
 
 import static 
org.apache.iceberg.flink.MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG;
 
+import java.io.File;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.iceberg.flink.FlinkCatalogFactory;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 class OperatorTestBase {
   private static final int NUMBER_TASK_MANAGERS = 1;
   private static final int SLOTS_PER_TASK_MANAGER = 8;
+  private static final TriggerLockFactory.Lock MAINTENANCE_LOCK = new 
MemoryLock();
+  private static final TriggerLockFactory.Lock RECOVERY_LOCK = new 
MemoryLock();
 
   static final String TABLE_NAME = "test_table";
 
@@ -39,7 +47,7 @@ class OperatorTestBase {
           new MiniClusterResourceConfiguration.Builder()
               .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
               .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
-              .setConfiguration(new 
Configuration(DISABLE_CLASSLOADER_CHECK_CONFIG))
+              .setConfiguration(config())
               .build());
 
   @RegisterExtension
@@ -48,4 +56,101 @@ class OperatorTestBase {
           "catalog",
           ImmutableMap.of("type", "iceberg", 
FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hadoop"),
           "db");
+
+  private static Configuration config() {
+    Configuration config = new Configuration(DISABLE_CLASSLOADER_CHECK_CONFIG);
+    MetricOptions.forReporter(config, "test_reporter")
+        .set(MetricOptions.REPORTER_FACTORY_CLASS, 
MetricsReporterFactoryForTests.class.getName());
+    return config;
+  }
+
+  protected static TriggerLockFactory lockFactory() {
+    return new TriggerLockFactory() {
+      @Override
+      public void open() {
+        MAINTENANCE_LOCK.unlock();
+        RECOVERY_LOCK.unlock();
+      }
+
+      @Override
+      public Lock createLock() {
+        return MAINTENANCE_LOCK;
+      }
+
+      @Override
+      public Lock createRecoveryLock() {
+        return RECOVERY_LOCK;
+      }
+
+      @Override
+      public void close() {
+        // do nothing
+      }
+    };
+  }
+
+  /**
+   * Close the {@link JobClient} and wait for the job closure. If the 
savepointDir is specified, it
+   * stops the job with a savepoint.
+   *
+   * @param jobClient the job to close
+   * @param savepointDir the savepointDir to store the last savepoint. If 
<code>null</code> then
+   *     stop without a savepoint.
+   * @return configuration for restarting the job from the savepoint
+   */
+  public static Configuration closeJobClient(JobClient jobClient, File 
savepointDir) {
+    Configuration conf = new Configuration();
+    if (jobClient != null) {
+      if (savepointDir != null) {
+        // Stop with savepoint
+        jobClient.stopWithSavepoint(false, savepointDir.getPath(), 
SavepointFormatType.CANONICAL);
+        // Wait until the savepoint is created and the job has been stopped
+        Awaitility.await().until(() -> 
savepointDir.listFiles(File::isDirectory).length == 1);
+        conf.set(
+            SavepointConfigOptions.SAVEPOINT_PATH,
+            savepointDir.listFiles(File::isDirectory)[0].getAbsolutePath());
+      } else {
+        jobClient.cancel();
+      }
+
+      // Wait until the job has been stopped
+      Awaitility.await().until(() -> 
jobClient.getJobStatus().get().isTerminalState());
+      return conf;
+    }
+
+    return null;
+  }
+
+  /**
+   * Close the {@link JobClient} and wait for the job closure.
+   *
+   * @param jobClient the job to close
+   */
+  public static void closeJobClient(JobClient jobClient) {
+    closeJobClient(jobClient, null);
+  }
+
+  private static class MemoryLock implements TriggerLockFactory.Lock {
+    boolean locked = false;
+
+    @Override
+    public boolean tryLock() {
+      if (locked) {
+        return false;
+      } else {
+        locked = true;
+        return true;
+      }
+    }
+
+    @Override
+    public boolean isHeld() {
+      return locked;
+    }
+
+    @Override
+    public void unlock() {
+      locked = false;
+    }
+  }
 }
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestJdbcLockFactory.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestJdbcLockFactory.java
new file mode 100644
index 0000000000..051d09d92b
--- /dev/null
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestJdbcLockFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static 
org.apache.iceberg.flink.maintenance.operator.JdbcLockFactory.INIT_LOCK_TABLES_PROPERTY;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.jdbc.JdbcCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.jupiter.api.Test;
+
+class TestJdbcLockFactory extends TestLockFactoryBase {
+  @Override
+  TriggerLockFactory lockFactory() {
+    return lockFactory("tableName");
+  }
+
+  @Test
+  void testMultiTableLock() {
+    JdbcLockFactory other = lockFactory("tableName2");
+    other.open((JdbcLockFactory) this.lockFactory);
+    TriggerLockFactory.Lock lock1 = lockFactory.createLock();
+    TriggerLockFactory.Lock lock2 = other.createLock();
+    assertThat(lock1.tryLock()).isTrue();
+    assertThat(lock2.tryLock()).isTrue();
+  }
+
+  private JdbcLockFactory lockFactory(String tableName) {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user");
+    properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password");
+    properties.put(INIT_LOCK_TABLES_PROPERTY, "true");
+
+    return new JdbcLockFactory(
+        "jdbc:sqlite:file::memory:?ic" + 
UUID.randomUUID().toString().replace("-", ""),
+        tableName,
+        properties);
+  }
+}
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockFactoryBase.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockFactoryBase.java
new file mode 100644
index 0000000000..bf9e86f253
--- /dev/null
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockFactoryBase.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+abstract class TestLockFactoryBase {
+  protected TriggerLockFactory lockFactory;
+
+  abstract TriggerLockFactory lockFactory();
+
+  @BeforeEach
+  void before() {
+    this.lockFactory = lockFactory();
+    lockFactory.open();
+  }
+
+  @AfterEach
+  void after() throws IOException {
+    lockFactory.close();
+  }
+
+  @Test
+  void testTryLock() {
+    TriggerLockFactory.Lock lock1 = lockFactory.createLock();
+    TriggerLockFactory.Lock lock2 = lockFactory.createLock();
+    assertThat(lock1.tryLock()).isTrue();
+    assertThat(lock1.tryLock()).isFalse();
+    assertThat(lock2.tryLock()).isFalse();
+  }
+
+  @Test
+  void testUnLock() {
+    TriggerLockFactory.Lock lock = lockFactory.createLock();
+    assertThat(lock.tryLock()).isTrue();
+
+    lock.unlock();
+    assertThat(lock.tryLock()).isTrue();
+  }
+
+  @Test
+  void testNoConflictWithRecoveryLock() {
+    TriggerLockFactory.Lock lock1 = lockFactory.createLock();
+    TriggerLockFactory.Lock lock2 = lockFactory.createRecoveryLock();
+    assertThat(lock1.tryLock()).isTrue();
+    assertThat(lock2.tryLock()).isTrue();
+  }
+
+  @Test
+  void testDoubleUnLock() {
+    TriggerLockFactory.Lock lock = lockFactory.createLock();
+    assertThat(lock.tryLock()).isTrue();
+
+    lock.unlock();
+    lock.unlock();
+    assertThat(lock.tryLock()).isTrue();
+    assertThat(lock.tryLock()).isFalse();
+  }
+}
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
index 876d642145..8c02601025 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iceberg.flink.maintenance.operator;
 
-import static 
org.apache.iceberg.flink.maintenance.operator.FlinkStreamingTestUtils.closeJobClient;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -35,6 +34,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
@@ -161,7 +161,8 @@ class TestMonitorSource extends OperatorTestBase {
                 }
 
                 // The first non-empty event should contain the expected value
-                return newEvent.equals(new TableChange(1, 0, size, 0L, 1));
+                return newEvent.equals(
+                    
TableChange.builder().dataFileNum(1).dataFileSize(size).commitNum(1).build());
               });
     } finally {
       closeJobClient(jobClient);
@@ -348,15 +349,18 @@ class TestMonitorSource extends OperatorTestBase {
     List<DeleteFile> deleteFiles =
         
Lists.newArrayList(table.currentSnapshot().addedDeleteFiles(table.io()).iterator());
 
-    long dataSize = dataFiles.stream().mapToLong(d -> 
d.fileSizeInBytes()).sum();
-    long deleteSize = deleteFiles.stream().mapToLong(d -> 
d.fileSizeInBytes()).sum();
-    boolean hasDelete = 
table.currentSnapshot().addedDeleteFiles(table.io()).iterator().hasNext();
-
-    return new TableChange(
-        previous.dataFileNum() + dataFiles.size(),
-        previous.deleteFileNum() + deleteFiles.size(),
-        previous.dataFileSize() + dataSize,
-        previous.deleteFileSize() + deleteSize,
-        previous.commitNum() + 1);
+    long dataSize = 
dataFiles.stream().mapToLong(ContentFile::fileSizeInBytes).sum();
+    long deleteSize = 
deleteFiles.stream().mapToLong(ContentFile::fileSizeInBytes).sum();
+
+    TableChange newChange = previous.copy();
+    newChange.merge(
+        TableChange.builder()
+            .dataFileNum(dataFiles.size())
+            .dataFileSize(dataSize)
+            .deleteFileNum(deleteFiles.size())
+            .deleteFileSize(deleteSize)
+            .commitNum(1)
+            .build());
+    return newChange;
   }
 }
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java
new file mode 100644
index 0000000000..55e64f3e84
--- /dev/null
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static 
org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.DUMMY_NAME;
+import static 
org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.EVENT_TIME;
+import static 
org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.EVENT_TIME_2;
+import static 
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.CONCURRENT_RUN_THROTTLED;
+import static 
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.GROUP_VALUE_DEFAULT;
+import static 
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.NOTHING_TO_TRIGGER;
+import static 
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED;
+import static 
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.TRIGGERED;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Stream;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+class TestTriggerManager extends OperatorTestBase {
+  private static final long DELAY = 10L;
+  private static final String NAME_1 = "name1";
+  private static final String NAME_2 = "name2";
+  private long processingTime = 0L;
+  private TriggerLockFactory lockFactory;
+  private TriggerLockFactory.Lock lock;
+  private TriggerLockFactory.Lock recoveringLock;
+
+  @BeforeEach
+  void before() {
+    sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME);
+    this.lockFactory = lockFactory();
+    lockFactory.open();
+    this.lock = lockFactory.createLock();
+    this.recoveringLock = lockFactory.createRecoveryLock();
+    lock.unlock();
+    recoveringLock.unlock();
+    MetricsReporterFactoryForTests.reset();
+  }
+
+  @AfterEach
+  void after() throws IOException {
+    lockFactory.close();
+  }
+
+  @Test
+  void testCommitNumber() throws Exception {
+    TriggerManager manager =
+        manager(
+            sql.tableLoader(TABLE_NAME), new 
TriggerEvaluator.Builder().commitNumber(3).build());
+    try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> 
testHarness =
+        harness(manager)) {
+      testHarness.open();
+
+      addEventAndCheckResult(testHarness, 
TableChange.builder().commitNum(1).build(), 0);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().commitNum(2).build(), 1);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().commitNum(3).build(), 2);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().commitNum(10).build(), 3);
+
+      // No trigger in this case
+      addEventAndCheckResult(testHarness, 
TableChange.builder().commitNum(1).build(), 3);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().commitNum(1).build(), 3);
+
+      addEventAndCheckResult(testHarness, 
TableChange.builder().commitNum(1).build(), 4);
+    }
+  }
+
+  @Test
+  void testFileNumber() throws Exception {
+    TriggerManager manager =
+        manager(sql.tableLoader(TABLE_NAME), new 
TriggerEvaluator.Builder().fileNumber(3).build());
+    try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> 
testHarness =
+        harness(manager)) {
+      testHarness.open();
+
+      addEventAndCheckResult(testHarness, 
TableChange.builder().dataFileNum(1).build(), 0);
+
+      addEventAndCheckResult(
+          testHarness, 
TableChange.builder().dataFileNum(1).deleteFileNum(1).build(), 1);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().deleteFileNum(3).build(), 2);
+      addEventAndCheckResult(
+          testHarness, 
TableChange.builder().dataFileNum(5).deleteFileNum(7).build(), 3);
+
+      // No trigger in this case
+      addEventAndCheckResult(testHarness, 
TableChange.builder().dataFileNum(1).build(), 3);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().deleteFileNum(1).build(), 3);
+
+      addEventAndCheckResult(testHarness, 
TableChange.builder().dataFileNum(1).build(), 4);
+    }
+  }
+
+  @Test
+  void testFileSize() throws Exception {
+    TriggerManager manager =
+        manager(sql.tableLoader(TABLE_NAME), new 
TriggerEvaluator.Builder().fileSize(3).build());
+    try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> 
testHarness =
+        harness(manager)) {
+      testHarness.open();
+
+      addEventAndCheckResult(testHarness, 
TableChange.builder().dataFileSize(1L).build(), 0);
+      addEventAndCheckResult(
+          testHarness, 
TableChange.builder().dataFileSize(1L).deleteFileSize(1L).build(), 1);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().deleteFileSize(3L).build(), 2);
+      addEventAndCheckResult(
+          testHarness, 
TableChange.builder().dataFileSize(5L).deleteFileSize(7L).build(), 3);
+
+      // No trigger in this case
+      addEventAndCheckResult(testHarness, 
TableChange.builder().dataFileSize(1L).build(), 3);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().deleteFileSize(1L).build(), 3);
+
+      addEventAndCheckResult(testHarness, 
TableChange.builder().dataFileSize(1L).build(), 4);
+    }
+  }
+
+  @Test
+  void testDeleteFileNumber() throws Exception {
+    TriggerManager manager =
+        manager(
+            sql.tableLoader(TABLE_NAME),
+            new TriggerEvaluator.Builder().deleteFileNumber(3).build());
+    try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> 
testHarness =
+        harness(manager)) {
+      testHarness.open();
+
+      addEventAndCheckResult(
+          testHarness, 
TableChange.builder().dataFileNum(3).deleteFileNum(1).build(), 0);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().deleteFileNum(2).build(), 1);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().deleteFileNum(3).build(), 2);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().deleteFileNum(10).build(), 3);
+
+      // No trigger in this case
+      addEventAndCheckResult(testHarness, 
TableChange.builder().deleteFileNum(1).build(), 3);
+      addEventAndCheckResult(testHarness, 
TableChange.builder().deleteFileNum(1).build(), 3);
+
+      addEventAndCheckResult(testHarness, 
TableChange.builder().deleteFileNum(1).build(), 4);
+    }
+  }
+
+  @Test
+  void testTimeout() throws Exception {
+    TriggerManager manager =
+        manager(
+            sql.tableLoader(TABLE_NAME),
+            new 
TriggerEvaluator.Builder().timeout(Duration.ofSeconds(1)).build());
+    try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> 
testHarness =
+        harness(manager)) {
+      testHarness.open();
+
+      TableChange event = 
TableChange.builder().dataFileSize(1).commitNum(1).build();
+
+      // Wait for some time
+      testHarness.processElement(event, EVENT_TIME);
+      assertThat(testHarness.extractOutputValues()).isEmpty();
+
+      // Wait for the timeout to expire
+      long newTime = EVENT_TIME + Duration.ofSeconds(1).toMillis();
+      testHarness.setProcessingTime(newTime);
+      testHarness.processElement(event, newTime);
+      assertThat(testHarness.extractOutputValues()).hasSize(1);
+
+      // Remove the lock to allow the next trigger
+      lock.unlock();
+
+      // Send a new event
+      testHarness.setProcessingTime(newTime + 1);
+      testHarness.processElement(event, newTime);
+
+      // No trigger yet
+      assertThat(testHarness.extractOutputValues()).hasSize(1);
+
+      // Send a new event
+      newTime += Duration.ofSeconds(1).toMillis();
+      testHarness.setProcessingTime(newTime);
+      testHarness.processElement(event, newTime);
+
+      // New trigger should arrive
+      assertThat(testHarness.extractOutputValues()).hasSize(2);
+    }
+  }
+
+  @Test
+  void testStateRestore() throws Exception {
+    TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+    TriggerManager manager = manager(tableLoader);
+    OperatorSubtaskState state;
+    try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> 
testHarness =
+        harness(manager)) {
+      testHarness.open();
+
+      testHarness.processElement(
+          TableChange.builder().dataFileSize(1).commitNum(1).build(), 
EVENT_TIME);
+
+      assertThat(testHarness.extractOutputValues()).isEmpty();
+
+      state = testHarness.snapshot(1, EVENT_TIME);
+    }
+
+    // Restore the state, write some more data, create a checkpoint, check the 
data which is written
+    manager = manager(tableLoader);
+    try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> 
testHarness =
+        harness(manager)) {
+      testHarness.initializeState(state);
+      testHarness.open();
+
+      // Arrives the first real change which triggers the recovery process
+      testHarness.processElement(TableChange.builder().commitNum(1).build(), 
EVENT_TIME_2);
+      assertTriggers(
+          testHarness.extractOutputValues(),
+          
Lists.newArrayList(Trigger.recovery(testHarness.getProcessingTime())));
+
+      // Remove the lock to allow the next trigger
+      recoveringLock.unlock();
+      testHarness.setProcessingTime(EVENT_TIME_2);
+      // At this point the output contains the recovery trigger and the real 
trigger
+      assertThat(testHarness.extractOutputValues()).hasSize(2);
+    }
+  }
+
+  @Test
+  void testMinFireDelay() throws Exception {
+    TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+    TriggerManager manager = manager(tableLoader, DELAY, 1);
+    try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> 
testHarness =
+        harness(manager)) {
+      testHarness.open();
+
+      addEventAndCheckResult(testHarness, 
TableChange.builder().commitNum(2).build(), 1);
+      long currentTime = testHarness.getProcessingTime();
+
+      // No new fire yet
+      addEventAndCheckResult(testHarness, 
TableChange.builder().commitNum(2).build(), 1);
+
+      // Check that the trigger fired after the delay
+      testHarness.setProcessingTime(currentTime + DELAY);
+      assertThat(testHarness.extractOutputValues()).hasSize(2);
+    }
+  }
+
+  @Test
+  void testLockCheckDelay() throws Exception {
+    TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+    TriggerManager manager = manager(tableLoader, 1, DELAY);
+    try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> 
testHarness =
+        harness(manager)) {
+      testHarness.open();
+
+      addEventAndCheckResult(testHarness, 
TableChange.builder().commitNum(2).build(), 1);
+
+      // Create a lock to prevent execution, and check that there is no result
+      assertThat(lock.tryLock()).isTrue();
+      addEventAndCheckResult(testHarness, 
TableChange.builder().commitNum(2).build(), 1);
+      long currentTime = testHarness.getProcessingTime();
+
+      // Remove the lock, and still no trigger
+      lock.unlock();
+      assertThat(testHarness.extractOutputValues()).hasSize(1);
+
+      // Check that the trigger fired after the delay
+      testHarness.setProcessingTime(currentTime + DELAY);
+      assertThat(testHarness.extractOutputValues()).hasSize(2);
+    }
+  }
+
+  /**
+   * Simulating recovery scenarios where there is a leftover table lock, and 
ongoing maintenance
+   * task.
+   *
+   * @param locked if a lock exists on the table on job recovery
+   * @param runningTask is running and continues to run after job recovery
+   */
+  @ParameterizedTest
+  @MethodSource("parametersForTestRecovery")
+  void testRecovery(boolean locked, boolean runningTask) throws Exception {
+    TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+    TriggerManager manager = manager(tableLoader);
+    OperatorSubtaskState state;
+    try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> 
testHarness =
+        harness(manager)) {
+      testHarness.open();
+      state = testHarness.snapshot(1, EVENT_TIME);
+    }
+
+    if (locked) {
+      assertThat(lock.tryLock()).isTrue();
+    }
+
+    manager = manager(tableLoader);
+    List<Trigger> expected = Lists.newArrayListWithExpectedSize(3);
+    try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> 
testHarness =
+        harness(manager)) {
+      testHarness.initializeState(state);
+      testHarness.open();
+
+      ++processingTime;
+      expected.add(Trigger.recovery(processingTime));
+      testHarness.setProcessingTime(processingTime);
+      testHarness.processElement(TableChange.builder().commitNum(2).build(), 
processingTime);
+      assertTriggers(testHarness.extractOutputValues(), expected);
+
+      // Nothing happens until the recovery is finished
+      ++processingTime;
+      testHarness.setProcessingTime(processingTime);
+      assertTriggers(testHarness.extractOutputValues(), expected);
+
+      if (runningTask) {
+        // Simulate the action of the recovered maintenance task lock removal 
when it finishes
+        lock.unlock();
+      }
+
+      // Still no results as the recovery is ongoing
+      ++processingTime;
+      testHarness.setProcessingTime(processingTime);
+      testHarness.processElement(TableChange.builder().commitNum(2).build(), 
processingTime);
+      assertTriggers(testHarness.extractOutputValues(), expected);
+
+      // Simulate the action of removing lock and recoveryLock by downstream 
lock cleaner when it
+      // received recovery trigger
+      lock.unlock();
+      recoveringLock.unlock();
+
+      // Emit only a single trigger
+      ++processingTime;
+      testHarness.setProcessingTime(processingTime);
+      // Releasing lock will create a new snapshot, and we receive this in the 
trigger
+      expected.add(
+          Trigger.create(
+              processingTime,
+              (SerializableTable) 
SerializableTable.copyOf(tableLoader.loadTable()),
+              0));
+      assertTriggers(testHarness.extractOutputValues(), expected);
+    }
+  }
+
+  @Test
+  void testTriggerMetrics() throws Exception {
+    TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    ManualSource<TableChange> source =
+        new ManualSource<>(env, TypeInformation.of(TableChange.class));
+    CollectingSink<Trigger> sink = new CollectingSink<>();
+
+    TriggerManager manager =
+        new TriggerManager(
+            tableLoader,
+            lockFactory,
+            Lists.newArrayList(NAME_1, NAME_2),
+            Lists.newArrayList(
+                new TriggerEvaluator.Builder().commitNumber(2).build(),
+                new TriggerEvaluator.Builder().commitNumber(4).build()),
+            1L,
+            1L);
+    source
+        .dataStream()
+        .keyBy(unused -> true)
+        .process(manager)
+        .name(DUMMY_NAME)
+        .forceNonParallel()
+        .sinkTo(sink);
+
+    JobClient jobClient = null;
+    try {
+      jobClient = env.executeAsync();
+
+      // This one doesn't trigger - tests NOTHING_TO_TRIGGER
+      source.sendRecord(TableChange.builder().commitNum(1).build());
+
+      Awaitility.await()
+          .until(
+              () -> {
+                Long notingCounter =
+                    MetricsReporterFactoryForTests.counter(
+                        DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + 
NOTHING_TO_TRIGGER);
+                return notingCounter != null && notingCounter.equals(1L);
+              });
+
+      // Trigger one of the tasks - tests TRIGGERED
+      source.sendRecord(TableChange.builder().commitNum(1).build());
+      // Wait until we receive the trigger
+      assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
+      assertThat(
+              MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + NAME_1 
+ "." + TRIGGERED))
+          .isEqualTo(1L);
+      lock.unlock();
+
+      // Trigger both of the tasks - tests TRIGGERED
+      source.sendRecord(TableChange.builder().commitNum(2).build());
+      // Wait until we receive the trigger
+      assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
+      lock.unlock();
+      assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
+      lock.unlock();
+      assertThat(
+              MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + NAME_1 
+ "." + TRIGGERED))
+          .isEqualTo(2L);
+      assertThat(
+              MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + NAME_2 
+ "." + TRIGGERED))
+          .isEqualTo(1L);
+
+      // Final check all the counters
+      MetricsReporterFactoryForTests.assertCounters(
+          new ImmutableMap.Builder<String, Long>()
+              .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + 
RATE_LIMITER_TRIGGERED, -1L)
+              .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + 
CONCURRENT_RUN_THROTTLED, -1L)
+              .put(DUMMY_NAME + "." + NAME_1 + "." + TRIGGERED, 2L)
+              .put(DUMMY_NAME + "." + NAME_2 + "." + TRIGGERED, 1L)
+              .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + 
NOTHING_TO_TRIGGER, 1L)
+              .build());
+    } finally {
+      closeJobClient(jobClient);
+    }
+  }
+
+  @Test
+  void testRateLimiterMetrics() throws Exception {
+    TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    ManualSource<TableChange> source =
+        new ManualSource<>(env, TypeInformation.of(TableChange.class));
+    CollectingSink<Trigger> sink = new CollectingSink<>();
+
+    // High delay, so only triggered once
+    TriggerManager manager = manager(tableLoader, 1_000_000L, 1L);
+    source
+        .dataStream()
+        .keyBy(unused -> true)
+        .process(manager)
+        .name(DUMMY_NAME)
+        .forceNonParallel()
+        .sinkTo(sink);
+
+    JobClient jobClient = null;
+    try {
+      jobClient = env.executeAsync();
+
+      // Start the first trigger
+      source.sendRecord(TableChange.builder().commitNum(2).build());
+      assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
+
+      // Remove the lock to allow the next trigger
+      lock.unlock();
+
+      // The second trigger will be blocked
+      source.sendRecord(TableChange.builder().commitNum(2).build());
+      Awaitility.await()
+          .until(
+              () ->
+                  MetricsReporterFactoryForTests.counter(
+                          DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + 
RATE_LIMITER_TRIGGERED)
+                      .equals(1L));
+
+      // Final check all the counters
+      assertCounters(1L, 0L);
+    } finally {
+      closeJobClient(jobClient);
+    }
+  }
+
+  @Test
+  void testConcurrentRunMetrics() throws Exception {
+    TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    ManualSource<TableChange> source =
+        new ManualSource<>(env, TypeInformation.of(TableChange.class));
+    CollectingSink<Trigger> sink = new CollectingSink<>();
+
+    // High delay, so only triggered once
+    TriggerManager manager = manager(tableLoader, 1L, 1_000_000L);
+    source
+        .dataStream()
+        .keyBy(unused -> true)
+        .process(manager)
+        .name(DUMMY_NAME)
+        .forceNonParallel()
+        .sinkTo(sink);
+
+    JobClient jobClient = null;
+    try {
+      jobClient = env.executeAsync();
+
+      // Start the first trigger - notice that we do not remove the lock after 
the trigger
+      source.sendRecord(TableChange.builder().commitNum(2).build());
+      assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
+
+      // The second trigger will be blocked by the lock
+      source.sendRecord(TableChange.builder().commitNum(2).build());
+      Awaitility.await()
+          .until(
+              () ->
+                  MetricsReporterFactoryForTests.counter(
+                          DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + 
CONCURRENT_RUN_THROTTLED)
+                      .equals(1L));
+
+      // Final check all the counters
+      assertCounters(0L, 1L);
+    } finally {
+      closeJobClient(jobClient);
+    }
+  }
+
+  private static Stream<Arguments> parametersForTestRecovery() {
+    return Stream.of(
+        Arguments.of(true, false),
+        Arguments.of(true, false),
+        Arguments.of(false, true),
+        Arguments.of(false, false));
+  }
+
+  private void assertCounters(long rateLimiterTrigger, long 
concurrentRunTrigger) {
+    MetricsReporterFactoryForTests.assertCounters(
+        new ImmutableMap.Builder<String, Long>()
+            .put(
+                DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + 
RATE_LIMITER_TRIGGERED,
+                rateLimiterTrigger)
+            .put(
+                DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + 
CONCURRENT_RUN_THROTTLED,
+                concurrentRunTrigger)
+            .put(DUMMY_NAME + "." + NAME_1 + "." + TRIGGERED, 1L)
+            .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + 
NOTHING_TO_TRIGGER, 0L)
+            .build());
+  }
+
+  private KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, 
Trigger> harness(
+      TriggerManager manager) throws Exception {
+    return new KeyedOneInputStreamOperatorTestHarness<>(
+        new KeyedProcessOperator<>(manager), value -> true, Types.BOOLEAN);
+  }
+
+  private void addEventAndCheckResult(
+      OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness,
+      TableChange event,
+      int expectedSize)
+      throws Exception {
+    ++processingTime;
+    testHarness.setProcessingTime(processingTime);
+    testHarness.processElement(event, processingTime);
+    assertThat(testHarness.extractOutputValues()).hasSize(expectedSize);
+    // Remove the lock to allow the next trigger
+    lock.unlock();
+  }
+
+  private TriggerManager manager(TableLoader tableLoader, TriggerEvaluator 
evaluator) {
+    return new TriggerManager(
+        tableLoader, lockFactory, Lists.newArrayList(NAME_1), 
Lists.newArrayList(evaluator), 1, 1);
+  }
+
+  private TriggerManager manager(
+      TableLoader tableLoader, long minFireDelayMs, long lockCheckDelayMs) {
+    return new TriggerManager(
+        tableLoader,
+        lockFactory,
+        Lists.newArrayList(NAME_1),
+        Lists.newArrayList(new 
TriggerEvaluator.Builder().commitNumber(2).build()),
+        minFireDelayMs,
+        lockCheckDelayMs);
+  }
+
+  private TriggerManager manager(TableLoader tableLoader) {
+    return manager(tableLoader, new 
TriggerEvaluator.Builder().commitNumber(2).build());
+  }
+
+  private static void assertTriggers(List<Trigger> expected, List<Trigger> 
actual) {
+    assertThat(actual).hasSize(expected.size());
+    for (int i = 0; i < expected.size(); ++i) {
+      Trigger expectedTrigger = expected.get(i);
+      Trigger actualTrigger = actual.get(i);
+      
assertThat(actualTrigger.timestamp()).isEqualTo(expectedTrigger.timestamp());
+      assertThat(actualTrigger.taskId()).isEqualTo(expectedTrigger.taskId());
+      
assertThat(actualTrigger.isRecovery()).isEqualTo(expectedTrigger.isRecovery());
+      if (expectedTrigger.table() == null) {
+        assertThat(actualTrigger.table()).isNull();
+      } else {
+        Iterator<Snapshot> expectedSnapshots = 
expectedTrigger.table().snapshots().iterator();
+        Iterator<Snapshot> actualSnapshots = 
actualTrigger.table().snapshots().iterator();
+        while (expectedSnapshots.hasNext()) {
+          assertThat(actualSnapshots.hasNext()).isTrue();
+          assertThat(expectedSnapshots.next().snapshotId())
+              .isEqualTo(actualSnapshots.next().snapshotId());
+        }
+      }
+    }
+  }
+}
diff --git 
a/flink/v1.20/flink/src/test/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory
 
b/flink/v1.20/flink/src/test/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory
new file mode 100644
index 0000000000..952255a52b
--- /dev/null
+++ 
b/flink/v1.20/flink/src/test/resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.iceberg.flink.maintenance.operator.MetricsReporterFactoryForTests


Reply via email to