This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new c7d3ef4436 Flink: Maintenance - MonitorSource (#10308)
c7d3ef4436 is described below

commit c7d3ef44367c46875f0367d312372fb2f246ae73
Author: pvary <[email protected]>
AuthorDate: Thu Jun 6 09:51:25 2024 +0200

    Flink: Maintenance - MonitorSource (#10308)
---
 .../flink/maintenance/operator/MonitorSource.java  | 206 ++++++++++++
 .../operator/SingleThreadedIteratorSource.java     | 197 +++++++++++
 .../flink/maintenance/operator/TableChange.java    | 133 ++++++++
 .../flink/maintenance/operator/CollectingSink.java | 115 +++++++
 .../maintenance/operator/FlinkSqlExtension.java    | 132 ++++++++
 .../operator/FlinkStreamingTestUtils.java          |  73 +++++
 .../flink/maintenance/operator/ManualSource.java   | 316 ++++++++++++++++++
 .../maintenance/operator/OperatorTestBase.java     |  51 +++
 .../maintenance/operator/TestMonitorSource.java    | 362 +++++++++++++++++++++
 9 files changed, 1585 insertions(+)

diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
new file mode 100644
index 0000000000..d74b2349b1
--- /dev/null
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader;
+import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Monitors an Iceberg table for changes */
+@Internal
+public class MonitorSource extends SingleThreadedIteratorSource<TableChange> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MonitorSource.class);
+
+  private final TableLoader tableLoader;
+  private final RateLimiterStrategy rateLimiterStrategy;
+  private final long maxReadBack;
+
+  /**
+   * Creates a {@link org.apache.flink.api.connector.source.Source} which 
monitors an Iceberg table
+   * for changes.
+   *
+   * @param tableLoader used for accessing the table
+   * @param rateLimiterStrategy limits the frequency the table is checked
+   * @param maxReadBack sets the number of snapshots read before stopping 
change collection
+   */
+  public MonitorSource(
+      TableLoader tableLoader, RateLimiterStrategy rateLimiterStrategy, long 
maxReadBack) {
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+    Preconditions.checkNotNull(rateLimiterStrategy, "Rate limiter strategy 
should no be null");
+    Preconditions.checkArgument(maxReadBack > 0, "Need to read at least 1 
snapshot to work");
+
+    this.tableLoader = tableLoader;
+    this.rateLimiterStrategy = rateLimiterStrategy;
+    this.maxReadBack = maxReadBack;
+  }
+
+  @Override
+  public Boundedness getBoundedness() {
+    return Boundedness.CONTINUOUS_UNBOUNDED;
+  }
+
+  @Override
+  public TypeInformation<TableChange> getProducedType() {
+    return TypeInformation.of(TableChange.class);
+  }
+
+  @Override
+  Iterator<TableChange> createIterator() {
+    return new TableChangeIterator(tableLoader, null, maxReadBack);
+  }
+
+  @Override
+  SimpleVersionedSerializer<Iterator<TableChange>> iteratorSerializer() {
+    return new TableChangeIteratorSerializer(tableLoader, maxReadBack);
+  }
+
+  @Override
+  public SourceReader<TableChange, GlobalSplit<TableChange>> createReader(
+      SourceReaderContext readerContext) throws Exception {
+    RateLimiter rateLimiter = rateLimiterStrategy.createRateLimiter(1);
+    return new RateLimitedSourceReader<>(super.createReader(readerContext), 
rateLimiter);
+  }
+
+  /** The Iterator which returns the latest changes on an Iceberg table. */
+  @VisibleForTesting
+  static class TableChangeIterator implements Iterator<TableChange> {
+    private Long lastSnapshotId;
+    private final long maxReadBack;
+    private final Table table;
+
+    TableChangeIterator(TableLoader tableLoader, Long lastSnapshotId, long 
maxReadBack) {
+      this.lastSnapshotId = lastSnapshotId;
+      this.maxReadBack = maxReadBack;
+      tableLoader.open();
+      this.table = tableLoader.loadTable();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return true;
+    }
+
+    @Override
+    public TableChange next() {
+      try {
+        table.refresh();
+        Snapshot currentSnapshot = table.currentSnapshot();
+        Long current = currentSnapshot != null ? currentSnapshot.snapshotId() 
: null;
+        Long checking = current;
+        TableChange event = TableChange.empty();
+        long readBack = 0;
+        while (checking != null && !checking.equals(lastSnapshotId) && 
++readBack <= maxReadBack) {
+          Snapshot snapshot = table.snapshot(checking);
+          if (snapshot != null) {
+            if (!DataOperations.REPLACE.equals(snapshot.operation())) {
+              LOG.debug("Reading snapshot {}", snapshot.snapshotId());
+              event.merge(new TableChange(snapshot, table.io()));
+            } else {
+              LOG.debug("Skipping replace snapshot {}", snapshot.snapshotId());
+            }
+
+            checking = snapshot.parentId();
+          } else {
+            // If the last snapshot has been removed from the history
+            checking = null;
+          }
+        }
+
+        lastSnapshotId = current;
+        return event;
+      } catch (Exception e) {
+        LOG.warn("Failed to fetch table changes for {}", table, e);
+        return TableChange.empty();
+      }
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("lastSnapshotId", lastSnapshotId)
+          .add("maxReadBack", maxReadBack)
+          .add("table", table)
+          .toString();
+    }
+  }
+
+  private static final class TableChangeIteratorSerializer
+      implements SimpleVersionedSerializer<Iterator<TableChange>> {
+
+    private static final int CURRENT_VERSION = 1;
+    private final TableLoader tableLoader;
+    private final long maxReadBack;
+
+    TableChangeIteratorSerializer(TableLoader tableLoader, long maxReadBack) {
+      this.tableLoader = tableLoader;
+      this.maxReadBack = maxReadBack;
+    }
+
+    @Override
+    public int getVersion() {
+      return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(Iterator<TableChange> iterator) throws IOException 
{
+      Preconditions.checkArgument(
+          iterator instanceof TableChangeIterator,
+          "Use TableChangeIterator iterator. Found incompatible type: %s",
+          iterator.getClass());
+
+      TableChangeIterator tableChangeIterator = (TableChangeIterator) iterator;
+      DataOutputSerializer out = new DataOutputSerializer(8);
+      long toStore =
+          tableChangeIterator.lastSnapshotId != null ? 
tableChangeIterator.lastSnapshotId : -1L;
+      out.writeLong(toStore);
+      return out.getCopyOfBuffer();
+    }
+
+    @Override
+    public TableChangeIterator deserialize(int version, byte[] serialized) 
throws IOException {
+      if (version == CURRENT_VERSION) {
+        DataInputDeserializer in = new DataInputDeserializer(serialized);
+        long fromStore = in.readLong();
+        return new TableChangeIterator(
+            tableLoader, fromStore != -1 ? fromStore : null, maxReadBack);
+      } else {
+        throw new IOException("Unrecognized version or corrupt state: " + 
version);
+      }
+    }
+  }
+}
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/SingleThreadedIteratorSource.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/SingleThreadedIteratorSource.java
new file mode 100644
index 0000000000..20c7684d97
--- /dev/null
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/SingleThreadedIteratorSource.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * Implementation of the Source V2 API which uses an iterator to read the 
elements, and uses a
+ * single thread to do so.
+ *
+ * @param <T> The return type of the source
+ */
+@Internal
+public abstract class SingleThreadedIteratorSource<T>
+    implements Source<
+            T,
+            SingleThreadedIteratorSource.GlobalSplit<T>,
+            Collection<SingleThreadedIteratorSource.GlobalSplit<T>>>,
+        ResultTypeQueryable<T> {
+  private static final String PARALLELISM_ERROR = "Parallelism should be set 
to 1";
+
+  /**
+   * Creates the iterator to return the elements which then emitted by the 
source.
+   *
+   * @return iterator for the elements
+   */
+  abstract Iterator<T> createIterator();
+
+  /**
+   * Serializes the iterator, which is used to save and restore the state of 
the source.
+   *
+   * @return serializer for the iterator
+   */
+  abstract SimpleVersionedSerializer<Iterator<T>> iteratorSerializer();
+
+  @Override
+  public SplitEnumerator<GlobalSplit<T>, Collection<GlobalSplit<T>>> 
createEnumerator(
+      SplitEnumeratorContext<GlobalSplit<T>> enumContext) {
+    Preconditions.checkArgument(enumContext.currentParallelism() == 1, 
PARALLELISM_ERROR);
+    return new IteratorSourceEnumerator<>(
+        enumContext, ImmutableList.of(new GlobalSplit<>(createIterator())));
+  }
+
+  @Override
+  public SplitEnumerator<GlobalSplit<T>, Collection<GlobalSplit<T>>> 
restoreEnumerator(
+      SplitEnumeratorContext<GlobalSplit<T>> enumContext, 
Collection<GlobalSplit<T>> checkpoint) {
+    Preconditions.checkArgument(enumContext.currentParallelism() == 1, 
PARALLELISM_ERROR);
+    return new IteratorSourceEnumerator<>(enumContext, checkpoint);
+  }
+
+  @Override
+  public SimpleVersionedSerializer<GlobalSplit<T>> getSplitSerializer() {
+    return new SplitSerializer<>(iteratorSerializer());
+  }
+
+  @Override
+  public SimpleVersionedSerializer<Collection<GlobalSplit<T>>> 
getEnumeratorCheckpointSerializer() {
+    return new EnumeratorSerializer<>(iteratorSerializer());
+  }
+
+  @Override
+  public SourceReader<T, GlobalSplit<T>> createReader(SourceReaderContext 
readerContext)
+      throws Exception {
+    Preconditions.checkArgument(readerContext.getIndexOfSubtask() == 0, 
PARALLELISM_ERROR);
+    return new IteratorSourceReader<>(readerContext);
+  }
+
+  /** The single split of the {@link SingleThreadedIteratorSource}. */
+  static class GlobalSplit<T> implements IteratorSourceSplit<T, Iterator<T>> {
+    private final Iterator<T> iterator;
+
+    GlobalSplit(Iterator<T> iterator) {
+      this.iterator = iterator;
+    }
+
+    @Override
+    public String splitId() {
+      return "1";
+    }
+
+    @Override
+    public Iterator<T> getIterator() {
+      return iterator;
+    }
+
+    @Override
+    public IteratorSourceSplit<T, Iterator<T>> getUpdatedSplitForIterator(
+        final Iterator<T> newIterator) {
+      return new GlobalSplit<>(newIterator);
+    }
+
+    @Override
+    public String toString() {
+      return String.format("GlobalSplit (%s)", iterator);
+    }
+  }
+
+  private static final class SplitSerializer<T>
+      implements SimpleVersionedSerializer<GlobalSplit<T>> {
+    private final SimpleVersionedSerializer<Iterator<T>> iteratorSerializer;
+
+    SplitSerializer(SimpleVersionedSerializer<Iterator<T>> iteratorSerializer) 
{
+      this.iteratorSerializer = iteratorSerializer;
+    }
+
+    private static final int CURRENT_VERSION = 1;
+
+    @Override
+    public int getVersion() {
+      return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(GlobalSplit<T> split) throws IOException {
+      return iteratorSerializer.serialize(split.iterator);
+    }
+
+    @Override
+    public GlobalSplit<T> deserialize(int version, byte[] serialized) throws 
IOException {
+      return new GlobalSplit<>(iteratorSerializer.deserialize(version, 
serialized));
+    }
+  }
+
+  private static final class EnumeratorSerializer<T>
+      implements SimpleVersionedSerializer<Collection<GlobalSplit<T>>> {
+    private static final int CURRENT_VERSION = 1;
+    private final SimpleVersionedSerializer<Iterator<T>> iteratorSerializer;
+
+    EnumeratorSerializer(SimpleVersionedSerializer<Iterator<T>> 
iteratorSerializer) {
+      this.iteratorSerializer = iteratorSerializer;
+    }
+
+    @Override
+    public int getVersion() {
+      return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(Collection<GlobalSplit<T>> checkpoint) throws 
IOException {
+      Preconditions.checkArgument(checkpoint.size() < 2, PARALLELISM_ERROR);
+      if (checkpoint.isEmpty()) {
+        return new byte[] {0};
+      } else {
+        byte[] iterator = 
iteratorSerializer.serialize(checkpoint.iterator().next().getIterator());
+        byte[] result = new byte[iterator.length + 1];
+        result[0] = 1;
+        System.arraycopy(iterator, 0, result, 1, iterator.length);
+        return result;
+      }
+    }
+
+    @Override
+    public Collection<GlobalSplit<T>> deserialize(int version, byte[] 
serialized)
+        throws IOException {
+      if (serialized[0] == 0) {
+        return Lists.newArrayList();
+      } else {
+        byte[] iterator = new byte[serialized.length - 1];
+        System.arraycopy(serialized, 1, iterator, 0, serialized.length - 1);
+        return Lists.newArrayList(
+            new GlobalSplit<>(iteratorSerializer.deserialize(version, 
iterator)));
+      }
+    }
+  }
+}
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
new file mode 100644
index 0000000000..452ed80ed0
--- /dev/null
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.Objects;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+/** Event describing changes in an Iceberg table */
+@Internal
+class TableChange {
+  private int dataFileNum;
+  private int deleteFileNum;
+  private long dataFileSize;
+  private long deleteFileSize;
+  private int commitNum;
+
+  TableChange(
+      int dataFileNum, int deleteFileNum, long dataFileSize, long 
deleteFileSize, int commitNum) {
+    this.dataFileNum = dataFileNum;
+    this.deleteFileNum = deleteFileNum;
+    this.dataFileSize = dataFileSize;
+    this.deleteFileSize = deleteFileSize;
+    this.commitNum = commitNum;
+  }
+
+  TableChange(Snapshot snapshot, FileIO io) {
+    Iterable<DataFile> dataFiles = snapshot.addedDataFiles(io);
+    Iterable<DeleteFile> deleteFiles = snapshot.addedDeleteFiles(io);
+
+    dataFiles.forEach(
+        dataFile -> {
+          this.dataFileNum++;
+          this.dataFileSize += dataFile.fileSizeInBytes();
+        });
+
+    deleteFiles.forEach(
+        deleteFile -> {
+          this.deleteFileNum++;
+          this.deleteFileSize += deleteFile.fileSizeInBytes();
+        });
+
+    this.commitNum = 1;
+  }
+
+  static TableChange empty() {
+    return new TableChange(0, 0, 0L, 0L, 0);
+  }
+
+  int dataFileNum() {
+    return dataFileNum;
+  }
+
+  int deleteFileNum() {
+    return deleteFileNum;
+  }
+
+  long dataFileSize() {
+    return dataFileSize;
+  }
+
+  long deleteFileSize() {
+    return deleteFileSize;
+  }
+
+  public int commitNum() {
+    return commitNum;
+  }
+
+  public void merge(TableChange other) {
+    this.dataFileNum += other.dataFileNum;
+    this.deleteFileNum += other.deleteFileNum;
+    this.dataFileSize += other.dataFileSize;
+    this.deleteFileSize += other.deleteFileSize;
+    this.commitNum += other.commitNum;
+  }
+
+  TableChange copy() {
+    return new TableChange(dataFileNum, deleteFileNum, dataFileSize, 
deleteFileSize, commitNum);
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("dataFileNum", dataFileNum)
+        .add("deleteFileNum", deleteFileNum)
+        .add("dataFileSize", dataFileSize)
+        .add("deleteFileSize", deleteFileSize)
+        .add("commitNum", commitNum)
+        .toString();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    } else if (other == null || getClass() != other.getClass()) {
+      return false;
+    }
+
+    TableChange that = (TableChange) other;
+    return dataFileNum == that.dataFileNum
+        && deleteFileNum == that.deleteFileNum
+        && dataFileSize == that.dataFileSize
+        && deleteFileSize == that.deleteFileSize
+        && commitNum == that.commitNum;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(dataFileNum, deleteFileNum, dataFileSize, 
deleteFileSize, commitNum);
+  }
+}
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CollectingSink.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CollectingSink.java
new file mode 100644
index 0000000000..a49459d61a
--- /dev/null
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CollectingSink.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/** Sink for collecting output during testing. */
+class CollectingSink<T> implements Sink<T> {
+  private static final long serialVersionUID = 1L;
+  private static final List<BlockingQueue<Object>> queues =
+      Collections.synchronizedList(Lists.newArrayListWithExpectedSize(1));
+  private static final AtomicInteger numSinks = new AtomicInteger(-1);
+  private final int index;
+
+  /** Creates a new sink which collects the elements received. */
+  CollectingSink() {
+    this.index = numSinks.incrementAndGet();
+    queues.add(new LinkedBlockingQueue<>());
+  }
+
+  /**
+   * Gets all the remaining output received by this {@link Sink}.
+   *
+   * @return all the remaining output
+   */
+  List<T> remainingOutput() {
+    return Lists.newArrayList((BlockingQueue<T>) queues.get(this.index));
+  }
+
+  /**
+   * Check if there is no remaining output received by this {@link Sink}.
+   *
+   * @return <code>true</code> if there is no remaining output
+   */
+  boolean isEmpty() {
+    return queues.get(this.index).isEmpty();
+  }
+
+  /**
+   * Wait until the next element received by the {@link Sink}.
+   *
+   * @param timeout for the poll
+   * @return The first element received by this {@link Sink}
+   * @throws TimeoutException if no element received until the timeout
+   */
+  T poll(Duration timeout) throws TimeoutException {
+    Object element;
+
+    try {
+      element = queues.get(this.index).poll(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
+    } catch (InterruptedException var4) {
+      throw new RuntimeException(var4);
+    }
+
+    if (element == null) {
+      throw new TimeoutException();
+    } else {
+      return (T) element;
+    }
+  }
+
+  @Override
+  public SinkWriter<T> createWriter(InitContext context) {
+    return new CollectingWriter<>(index);
+  }
+
+  private static class CollectingWriter<T> implements SinkWriter<T> {
+    private final int index;
+
+    CollectingWriter(int index) {
+      this.index = index;
+    }
+
+    @Override
+    public void write(T element, Context context) {
+      queues.get(index).add(element);
+    }
+
+    @Override
+    public void flush(boolean endOfInput) {
+      // Nothing to do here
+    }
+
+    @Override
+    public void close() {
+      // Nothing to do here
+    }
+  }
+}
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkSqlExtension.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkSqlExtension.java
new file mode 100644
index 0000000000..90790b373d
--- /dev/null
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkSqlExtension.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+/**
+ * Junit 5 extension for running Flink SQL queries. {@link
+ * org.apache.flink.test.junit5.MiniClusterExtension} is used for executing 
the SQL batch jobs.
+ */
+public class FlinkSqlExtension implements BeforeEachCallback, 
AfterEachCallback {
+  private final String catalogName;
+  private final Map<String, String> catalogProperties;
+  private final String databaseName;
+  private final Path warehouse;
+  private final CatalogLoader catalogLoader;
+  private TableEnvironment tableEnvironment;
+
+  public FlinkSqlExtension(
+      String catalogName, Map<String, String> catalogProperties, String 
databaseName) {
+    this.catalogName = catalogName;
+    this.catalogProperties = Maps.newHashMap(catalogProperties);
+    this.databaseName = databaseName;
+
+    // Add temporary dir as a warehouse location
+    try {
+      this.warehouse = Files.createTempDirectory("warehouse");
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    this.catalogProperties.put(
+        CatalogProperties.WAREHOUSE_LOCATION, String.format("file://%s", 
warehouse));
+    this.catalogLoader =
+        CatalogLoader.hadoop(catalogName, new Configuration(), 
this.catalogProperties);
+  }
+
+  @Override
+  public void beforeEach(ExtensionContext context) {
+    // We need to recreate the tableEnvironment for every test as the 
minicluster is recreated
+    this.tableEnvironment =
+        
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+    exec("CREATE CATALOG %s WITH %s", catalogName, 
toWithClause(catalogProperties));
+    exec("CREATE DATABASE %s.%s", catalogName, databaseName);
+    exec("USE CATALOG %s", catalogName);
+    exec("USE %s", databaseName);
+  }
+
+  @Override
+  public void afterEach(ExtensionContext context) throws IOException {
+    List<Row> tables = exec("SHOW TABLES");
+    tables.forEach(t -> exec("DROP TABLE IF EXISTS %s", t.getField(0)));
+    exec("USE CATALOG default_catalog");
+    exec("DROP CATALOG IF EXISTS %s", catalogName);
+    
Files.walk(warehouse).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
+  }
+
+  /**
+   * Executes an SQL query with the given parameters. The parameter 
substitution is done by {@link
+   * String#format(String, Object...)}.
+   *
+   * @param query to run
+   * @param parameters to substitute to the query
+   * @return The {@link Row}s returned by the query
+   */
+  public List<Row> exec(String query, Object... parameters) {
+    TableResult tableResult = tableEnvironment.executeSql(String.format(query, 
parameters));
+    try (CloseableIterator<Row> iter = tableResult.collect()) {
+      return Lists.newArrayList(iter);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to collect table result", e);
+    }
+  }
+
+  /**
+   * Returns the {@link TableLoader} which could be used to access the given 
table.
+   *
+   * @param tableName of the table
+   * @return the {@link TableLoader} for the table
+   */
+  public TableLoader tableLoader(String tableName) {
+    TableLoader tableLoader =
+        TableLoader.fromCatalog(catalogLoader, 
TableIdentifier.of(databaseName, tableName));
+    tableLoader.open();
+    return tableLoader;
+  }
+
+  private static String toWithClause(Map<String, String> props) {
+    return String.format(
+        "(%s)",
+        props.entrySet().stream()
+            .map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue()))
+            .collect(Collectors.joining(",")));
+  }
+}
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkStreamingTestUtils.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkStreamingTestUtils.java
new file mode 100644
index 0000000000..9cdc55cb0c
--- /dev/null
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkStreamingTestUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.File;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.awaitility.Awaitility;
+
+class FlinkStreamingTestUtils {
+  private FlinkStreamingTestUtils() {
+    // Do not instantiate
+  }
+
+  /**
+   * Close the {@link JobClient} and wait for the job closure. If the 
savepointDir is specified, it
+   * stops the job with a savepoint.
+   *
+   * @param jobClient the job to close
+   * @param savepointDir the savepointDir to store the last savepoint. If 
<code>null</code> then
+   *     stop without a savepoint.
+   * @return configuration for restarting the job from the savepoint
+   */
+  static Configuration closeJobClient(JobClient jobClient, File savepointDir) {
+    Configuration conf = new Configuration();
+    if (jobClient != null) {
+      if (savepointDir != null) {
+        // Stop with savepoint
+        jobClient.stopWithSavepoint(false, savepointDir.getPath(), 
SavepointFormatType.CANONICAL);
+        // Wait until the savepoint is created and the job has been stopped
+        Awaitility.await().until(() -> 
savepointDir.listFiles(File::isDirectory).length == 1);
+        conf.set(
+            SavepointConfigOptions.SAVEPOINT_PATH,
+            savepointDir.listFiles(File::isDirectory)[0].getAbsolutePath());
+      } else {
+        jobClient.cancel();
+      }
+
+      // Wait until the job has been stopped
+      Awaitility.await().until(() -> 
jobClient.getJobStatus().get().isTerminalState());
+      return conf;
+    }
+
+    return null;
+  }
+
+  /**
+   * Close the {@link JobClient} and wait for the job closure.
+   *
+   * @param jobClient the job to close
+   */
+  static void closeJobClient(JobClient jobClient) {
+    closeJobClient(jobClient, null);
+  }
+}
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java
new file mode 100644
index 0000000000..e08742a89d
--- /dev/null
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Queues;
+import org.jetbrains.annotations.Nullable;
+
+/** Testing source implementation for Flink sources which can be triggered 
manually. */
+class ManualSource<T>
+    implements Source<T, ManualSource.DummySplit, 
ManualSource.DummyCheckpoint>,
+        ResultTypeQueryable<T> {
+
+  private static final long serialVersionUID = 1L;
+  private static final List<ArrayDeque<Tuple2<?, Long>>> queues =
+      Collections.synchronizedList(Lists.newArrayList());
+  private static final List<CompletableFuture<Void>> availabilities =
+      Collections.synchronizedList(Lists.newArrayList());
+  private static int numSources = 0;
+  private final TypeInformation<T> type;
+  private final int index;
+  private transient DataStream<T> stream;
+  private final transient StreamExecutionEnvironment env;
+
+  /**
+   * Creates a new source for testing.
+   *
+   * @param env to register the source
+   * @param type of the events returned by the source
+   */
+  ManualSource(StreamExecutionEnvironment env, TypeInformation<T> type) {
+    this.type = type;
+    this.env = env;
+    this.index = numSources++;
+    queues.add(Queues.newArrayDeque());
+    availabilities.add(new CompletableFuture<>());
+  }
+
+  /**
+   * Emit a new record from the source.
+   *
+   * @param event to emit
+   */
+  void sendRecord(T event) {
+    this.sendInternal(Tuple2.of(event, null));
+  }
+
+  /**
+   * Emit a new record with the given event time from the source.
+   *
+   * @param event to emit
+   * @param eventTime of the event
+   */
+  void sendRecord(T event, long eventTime) {
+    this.sendInternal(Tuple2.of(event, eventTime));
+  }
+
+  /**
+   * Emit a watermark from the source.
+   *
+   * @param timeStamp of the watermark
+   */
+  void sendWatermark(long timeStamp) {
+    this.sendInternal(Tuple2.of(null, timeStamp));
+  }
+
+  /** Mark the source as finished. */
+  void markFinished() {
+    this.sendWatermark(Long.MAX_VALUE);
+    this.sendInternal(Tuple2.of(null, null));
+  }
+
+  /**
+   * Get the {@link DataStream} for this source.
+   *
+   * @return the stream emitted by this source
+   */
+  DataStream<T> dataStream() {
+    if (this.stream == null) {
+      this.stream =
+          this.env
+              .fromSource(this, WatermarkStrategy.noWatermarks(), 
"ManualSource-" + index, type)
+              .forceNonParallel();
+    }
+
+    return this.stream;
+  }
+
+  private void sendInternal(Tuple2<?, Long> tuple) {
+    queues.get(index).offer(tuple);
+    availabilities.get(index).complete(null);
+  }
+
+  @Override
+  public Boundedness getBoundedness() {
+    return Boundedness.CONTINUOUS_UNBOUNDED;
+  }
+
+  @Override
+  public SplitEnumerator<DummySplit, DummyCheckpoint> createEnumerator(
+      SplitEnumeratorContext<DummySplit> enumContext) {
+    return new DummyCheckpointEnumerator();
+  }
+
+  @Override
+  public SplitEnumerator<DummySplit, DummyCheckpoint> restoreEnumerator(
+      SplitEnumeratorContext<DummySplit> enumContext, DummyCheckpoint 
checkpoint) {
+    return new DummyCheckpointEnumerator();
+  }
+
+  @Override
+  public SimpleVersionedSerializer<DummySplit> getSplitSerializer() {
+    return new NoOpDummySplitSerializer();
+  }
+
+  @Override
+  public SimpleVersionedSerializer<DummyCheckpoint> 
getEnumeratorCheckpointSerializer() {
+    return new NoOpDummyCheckpointSerializer();
+  }
+
+  @Override
+  public SourceReader<T, DummySplit> createReader(SourceReaderContext 
sourceReaderContext) {
+    return new SourceReader<T, DummySplit>() {
+      @Override
+      public void start() {
+        // Do nothing
+      }
+
+      @Override
+      public InputStatus pollNext(ReaderOutput<T> output) {
+        Tuple2<T, Long> next = (Tuple2<T, Long>) queues.get(index).poll();
+
+        if (next != null) {
+          if (next.f0 == null) {
+            // No more input
+            return InputStatus.END_OF_INPUT;
+          }
+
+          if (next.f1 == null) {
+            // No event time set
+            output.collect(next.f0);
+          } else {
+            // With event time
+            output.collect(next.f0, next.f1);
+          }
+        }
+
+        availabilities.set(index, new CompletableFuture<>());
+        return queues.get(index).isEmpty()
+            ? InputStatus.NOTHING_AVAILABLE
+            : InputStatus.MORE_AVAILABLE;
+      }
+
+      @Override
+      public List<DummySplit> snapshotState(long checkpointId) {
+        return Lists.newArrayList(new DummySplit());
+      }
+
+      @Override
+      public CompletableFuture<Void> isAvailable() {
+        return availabilities.get(index);
+      }
+
+      @Override
+      public void addSplits(List<DummySplit> splits) {
+        // do nothing
+      }
+
+      @Override
+      public void notifyNoMoreSplits() {
+        // do nothing
+      }
+
+      @Override
+      public void close() {
+        // do nothing
+      }
+    };
+  }
+
+  @Override
+  public TypeInformation<T> getProducedType() {
+    return this.type;
+  }
+
+  /**
+   * Placeholder because the ManualSource itself implicitly represents the 
only split and does not
+   * require an actual split object.
+   */
+  public static class DummySplit implements SourceSplit {
+    @Override
+    public String splitId() {
+      return "dummy";
+    }
+  }
+
+  /**
+   * Placeholder because the ManualSource does not support fault-tolerance and 
thus does not require
+   * actual checkpointing.
+   */
+  public static class DummyCheckpoint {}
+
+  /** Placeholder because the ManualSource does not need enumeration, but 
checkpointing needs it. */
+  private static class DummyCheckpointEnumerator
+      implements SplitEnumerator<DummySplit, DummyCheckpoint> {
+
+    @Override
+    public void start() {
+      // do nothing
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
+      // do nothing
+    }
+
+    @Override
+    public void addSplitsBack(List<DummySplit> splits, int subtaskId) {
+      // do nothing
+    }
+
+    @Override
+    public void addReader(int subtaskId) {
+      // do nothing
+    }
+
+    @Override
+    public DummyCheckpoint snapshotState(long checkpointId) {
+      return new DummyCheckpoint();
+    }
+
+    @Override
+    public void close() {
+      // do nothing
+    }
+  }
+
+  /**
+   * Not used - only required to avoid NullPointerException. The split is not 
transferred from the
+   * enumerator, it is implicitly represented by the ManualSource.
+   */
+  private static class NoOpDummySplitSerializer implements 
SimpleVersionedSerializer<DummySplit> {
+    @Override
+    public int getVersion() {
+      return 0;
+    }
+
+    @Override
+    public byte[] serialize(DummySplit split) {
+      return new byte[0];
+    }
+
+    @Override
+    public DummySplit deserialize(int version, byte[] serialized) {
+      return new DummySplit();
+    }
+  }
+
+  /**
+   * Not used - only required to avoid NullPointerException. The split is not 
transferred from the
+   * enumerator, it is implicitly represented by the ManualSource.
+   */
+  private static class NoOpDummyCheckpointSerializer
+      implements SimpleVersionedSerializer<DummyCheckpoint> {
+    @Override
+    public int getVersion() {
+      return 0;
+    }
+
+    @Override
+    public byte[] serialize(DummyCheckpoint split) {
+      return new byte[0];
+    }
+
+    @Override
+    public DummyCheckpoint deserialize(int version, byte[] serialized) {
+      return new DummyCheckpoint();
+    }
+  }
+}
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
new file mode 100644
index 0000000000..269ae681b0
--- /dev/null
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static 
org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.iceberg.flink.FlinkCatalogFactory;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class OperatorTestBase {
+  private static final int NUMBER_TASK_MANAGERS = 1;
+  private static final int SLOTS_PER_TASK_MANAGER = 8;
+
+  static final String TABLE_NAME = "test_table";
+
+  @RegisterExtension
+  protected static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+      new MiniClusterExtension(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
+              .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
+              .setConfiguration(new 
Configuration(DISABLE_CLASSLOADER_CHECK_CONFIG))
+              .build());
+
+  @RegisterExtension
+  final FlinkSqlExtension sql =
+      new FlinkSqlExtension(
+          "catalog",
+          ImmutableMap.of("type", "iceberg", 
FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hadoop"),
+          "db");
+}
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
new file mode 100644
index 0000000000..876d642145
--- /dev/null
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static 
org.apache.iceberg.flink.maintenance.operator.FlinkStreamingTestUtils.closeJobClient;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class TestMonitorSource extends OperatorTestBase {
+  private static final TableChange EMPTY_EVENT = TableChange.empty();
+  private static final RateLimiterStrategy HIGH_RATE = 
RateLimiterStrategy.perSecond(100.0);
+  private static final RateLimiterStrategy LOW_RATE = 
RateLimiterStrategy.perSecond(1.0 / 10000.0);
+
+  @TempDir private File checkpointDir;
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testChangeReaderIterator(boolean withDelete) {
+    if (withDelete) {
+      sql.exec(
+          "CREATE TABLE %s (id int, data varchar, PRIMARY KEY(`id`) NOT 
ENFORCED) WITH ('format-version'='2', 'write.upsert.enabled'='true')",
+          TABLE_NAME);
+    } else {
+      sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME);
+    }
+
+    TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+    tableLoader.open();
+    Table table = tableLoader.loadTable();
+
+    MonitorSource.TableChangeIterator iterator =
+        new MonitorSource.TableChangeIterator(tableLoader, null, 
Long.MAX_VALUE);
+
+    // For an empty table we get an empty result
+    assertThat(iterator.next()).isEqualTo(EMPTY_EVENT);
+
+    // Add a single commit and get back the commit data in the event
+    sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME);
+    table.refresh();
+    TableChange expected = tableChangeWithLastSnapshot(table, 
TableChange.empty());
+    assertThat(iterator.next()).isEqualTo(expected);
+    // Make sure that consecutive calls do not return the data again
+    assertThat(iterator.next()).isEqualTo(EMPTY_EVENT);
+
+    // Add two more commits, but fetch the data in one loop
+    sql.exec("INSERT INTO %s VALUES (2, 'b')", TABLE_NAME);
+    table.refresh();
+    expected = tableChangeWithLastSnapshot(table, TableChange.empty());
+
+    sql.exec("INSERT INTO %s VALUES (3, 'c')", TABLE_NAME);
+    table.refresh();
+    expected = tableChangeWithLastSnapshot(table, expected);
+
+    assertThat(iterator.next()).isEqualTo(expected);
+    // Make sure that consecutive calls do not return the data again
+    assertThat(iterator.next()).isEqualTo(EMPTY_EVENT);
+  }
+
+  /**
+   * Create a table and check that the source returns the data as new commits 
arrive to the table.
+   */
+  @Test
+  void testSource() throws Exception {
+    sql.exec(
+        "CREATE TABLE %s (id int, data varchar) "
+            + "WITH ('flink.max-continuous-empty-commits'='100000')",
+        TABLE_NAME);
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+    tableLoader.open();
+    Table table = tableLoader.loadTable();
+    DataStream<TableChange> events =
+        env.fromSource(
+                new MonitorSource(tableLoader, HIGH_RATE, Long.MAX_VALUE),
+                WatermarkStrategy.noWatermarks(),
+                "TableChangeSource")
+            .forceNonParallel();
+
+    // Sink to collect the results
+    CollectingSink<TableChange> result = new CollectingSink<>();
+    events.sinkTo(result);
+
+    JobClient jobClient = null;
+    try {
+      // First result is an empty event
+      jobClient = env.executeAsync("Table Change Source Test");
+      assertThat(result.poll(Duration.ofSeconds(5L))).isEqualTo(EMPTY_EVENT);
+
+      // Insert some data
+      File dataDir = new File(new Path(table.location(), 
"data").toUri().getPath());
+      dataDir.mkdir();
+      GenericAppenderHelper dataAppender =
+          new GenericAppenderHelper(table, FileFormat.PARQUET, 
dataDir.toPath());
+      List<Record> batch1 = RandomGenericData.generate(table.schema(), 2, 1);
+      dataAppender.appendToTable(batch1);
+
+      // Wait until the changes are committed
+      Awaitility.await()
+          .until(
+              () -> {
+                table.refresh();
+                return table.currentSnapshot() != null;
+              });
+
+      table.refresh();
+      long size = firstFileLength(table);
+
+      // Wait until the first non-empty event has arrived, and check the 
expected result
+      Awaitility.await()
+          .until(
+              () -> {
+                TableChange newEvent = result.poll(Duration.ofSeconds(5L));
+                // Fetch every empty event from the beginning
+                while (newEvent.equals(EMPTY_EVENT)) {
+                  newEvent = result.poll(Duration.ofSeconds(5L));
+                }
+
+                // The first non-empty event should contain the expected value
+                return newEvent.equals(new TableChange(1, 0, size, 0L, 1));
+              });
+    } finally {
+      closeJobClient(jobClient);
+    }
+  }
+
+  /** Check that the {@link MonitorSource} operator state is restored 
correctly. */
+  @Test
+  void testStateRestore(@TempDir File savepointDir) throws Exception {
+    sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME);
+    sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME);
+
+    Configuration config = new Configuration();
+    config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
+    config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" + 
checkpointDir.getPath());
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
+    env.enableCheckpointing(1000);
+
+    TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+    tableLoader.open();
+    DataStream<TableChange> events =
+        env.fromSource(
+                new MonitorSource(tableLoader, HIGH_RATE, Long.MAX_VALUE),
+                WatermarkStrategy.noWatermarks(),
+                "TableChangeSource")
+            .forceNonParallel();
+
+    // Sink to collect the results
+    CollectingSink<TableChange> result = new CollectingSink<>();
+    events.sinkTo(result);
+
+    // Start the job
+    Configuration conf;
+    JobClient jobClient = null;
+    AtomicReference<TableChange> firstNonEmptyEvent = new AtomicReference<>();
+    try {
+      jobClient = env.executeAsync("Table Change Source Test");
+
+      Awaitility.await()
+          .until(
+              () -> {
+                TableChange newEvent = result.poll(Duration.ofSeconds(5L));
+                // Fetch every empty event from the beginning
+                while (newEvent.equals(EMPTY_EVENT)) {
+                  newEvent = result.poll(Duration.ofSeconds(5L));
+                }
+
+                // The first non-empty event should contain the expected value
+                firstNonEmptyEvent.set(newEvent);
+                return true;
+              });
+    } finally {
+      // Stop with savepoint
+      conf = closeJobClient(jobClient, savepointDir);
+    }
+
+    // Restore from savepoint, create the same topology with a different env
+    env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
+    events =
+        env.fromSource(
+                new MonitorSource(tableLoader, LOW_RATE, Long.MAX_VALUE),
+                WatermarkStrategy.noWatermarks(),
+                "TableChangeSource")
+            .forceNonParallel();
+    CollectingSink<TableChange> resultWithSavepoint = new CollectingSink<>();
+    events.sinkTo(resultWithSavepoint);
+
+    // Make sure that the job with restored source does not read new records 
from the table
+    JobClient clientWithSavepoint = null;
+    try {
+      clientWithSavepoint = env.executeAsync("Table Change Source test with 
savepoint");
+
+      
assertThat(resultWithSavepoint.poll(Duration.ofSeconds(5L))).isEqualTo(EMPTY_EVENT);
+    } finally {
+      closeJobClient(clientWithSavepoint, null);
+    }
+
+    // Restore without savepoint
+    env = StreamExecutionEnvironment.getExecutionEnvironment();
+    events =
+        env.fromSource(
+                new MonitorSource(tableLoader, LOW_RATE, Long.MAX_VALUE),
+                WatermarkStrategy.noWatermarks(),
+                "TableChangeSource")
+            .forceNonParallel();
+    CollectingSink<TableChange> resultWithoutSavepoint = new 
CollectingSink<>();
+    events.sinkTo(resultWithoutSavepoint);
+
+    // Make sure that a new job without state reads the event as expected
+    JobClient clientWithoutSavepoint = null;
+    try {
+      clientWithoutSavepoint = env.executeAsync("Table Change Source Test 
without savepoint");
+      assertThat(resultWithoutSavepoint.poll(Duration.ofSeconds(5L)))
+          .isEqualTo(firstNonEmptyEvent.get());
+    } finally {
+      closeJobClient(clientWithoutSavepoint);
+    }
+  }
+
+  @Test
+  void testNotOneParallelismThrows() {
+    sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+    tableLoader.open();
+
+    env.fromSource(
+            new MonitorSource(tableLoader, HIGH_RATE, Long.MAX_VALUE),
+            WatermarkStrategy.noWatermarks(),
+            "TableChangeSource")
+        .setParallelism(2)
+        .print();
+
+    assertThatThrownBy(env::execute)
+        .isInstanceOf(JobExecutionException.class)
+        .rootCause()
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Parallelism should be set to 1");
+  }
+
+  @Test
+  void testMaxReadBack() {
+    sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME);
+    sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME);
+    sql.exec("INSERT INTO %s VALUES (2, 'b')", TABLE_NAME);
+    sql.exec("INSERT INTO %s VALUES (3, 'c')", TABLE_NAME);
+
+    TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+    tableLoader.open();
+
+    MonitorSource.TableChangeIterator iterator =
+        new MonitorSource.TableChangeIterator(tableLoader, null, 1);
+
+    // For a single maxReadBack we only get a single change
+    assertThat(iterator.next().commitNum()).isEqualTo(1);
+
+    iterator = new MonitorSource.TableChangeIterator(tableLoader, null, 2);
+
+    // Expecting 2 commits/snapshots for maxReadBack=2
+    assertThat(iterator.next().commitNum()).isEqualTo(2);
+
+    iterator = new MonitorSource.TableChangeIterator(tableLoader, null, 
Long.MAX_VALUE);
+
+    // For maxReadBack Long.MAX_VALUE we get every change
+    assertThat(iterator.next().commitNum()).isEqualTo(3);
+  }
+
+  @Test
+  void testSkipReplace() {
+    sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME);
+    sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME);
+
+    TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+    tableLoader.open();
+
+    MonitorSource.TableChangeIterator iterator =
+        new MonitorSource.TableChangeIterator(tableLoader, null, 
Long.MAX_VALUE);
+
+    // Read the current snapshot
+    assertThat(iterator.next().commitNum()).isEqualTo(1);
+
+    // Create a DataOperations.REPLACE snapshot
+    Table table = tableLoader.loadTable();
+    DataFile dataFile =
+        
table.snapshots().iterator().next().addedDataFiles(table.io()).iterator().next();
+    RewriteFiles rewrite = tableLoader.loadTable().newRewrite();
+    // Replace the file with itself for testing purposes
+    rewrite.deleteFile(dataFile);
+    rewrite.addFile(dataFile);
+    rewrite.commit();
+
+    // Check that the rewrite is ignored
+    assertThat(iterator.next()).isEqualTo(EMPTY_EVENT);
+  }
+
+  private static long firstFileLength(Table table) {
+    return 
table.currentSnapshot().addedDataFiles(table.io()).iterator().next().fileSizeInBytes();
+  }
+
+  private static TableChange tableChangeWithLastSnapshot(Table table, 
TableChange previous) {
+    List<DataFile> dataFiles =
+        
Lists.newArrayList(table.currentSnapshot().addedDataFiles(table.io()).iterator());
+    List<DeleteFile> deleteFiles =
+        
Lists.newArrayList(table.currentSnapshot().addedDeleteFiles(table.io()).iterator());
+
+    long dataSize = dataFiles.stream().mapToLong(d -> 
d.fileSizeInBytes()).sum();
+    long deleteSize = deleteFiles.stream().mapToLong(d -> 
d.fileSizeInBytes()).sum();
+    boolean hasDelete = 
table.currentSnapshot().addedDeleteFiles(table.io()).iterator().hasNext();
+
+    return new TableChange(
+        previous.dataFileNum() + dataFiles.size(),
+        previous.deleteFileNum() + deleteFiles.size(),
+        previous.dataFileSize() + dataSize,
+        previous.deleteFileSize() + deleteSize,
+        previous.commitNum() + 1);
+  }
+}

Reply via email to