This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new c7d3ef4436 Flink: Maintenance - MonitorSource (#10308)
c7d3ef4436 is described below
commit c7d3ef44367c46875f0367d312372fb2f246ae73
Author: pvary <[email protected]>
AuthorDate: Thu Jun 6 09:51:25 2024 +0200
Flink: Maintenance - MonitorSource (#10308)
---
.../flink/maintenance/operator/MonitorSource.java | 206 ++++++++++++
.../operator/SingleThreadedIteratorSource.java | 197 +++++++++++
.../flink/maintenance/operator/TableChange.java | 133 ++++++++
.../flink/maintenance/operator/CollectingSink.java | 115 +++++++
.../maintenance/operator/FlinkSqlExtension.java | 132 ++++++++
.../operator/FlinkStreamingTestUtils.java | 73 +++++
.../flink/maintenance/operator/ManualSource.java | 316 ++++++++++++++++++
.../maintenance/operator/OperatorTestBase.java | 51 +++
.../maintenance/operator/TestMonitorSource.java | 362 +++++++++++++++++++++
9 files changed, 1585 insertions(+)
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
new file mode 100644
index 0000000000..d74b2349b1
--- /dev/null
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import
org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader;
+import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
+import
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Monitors an Iceberg table for changes */
+@Internal
+public class MonitorSource extends SingleThreadedIteratorSource<TableChange> {
+ private static final Logger LOG =
LoggerFactory.getLogger(MonitorSource.class);
+
+ private final TableLoader tableLoader;
+ private final RateLimiterStrategy rateLimiterStrategy;
+ private final long maxReadBack;
+
+ /**
+ * Creates a {@link org.apache.flink.api.connector.source.Source} which
monitors an Iceberg table
+ * for changes.
+ *
+ * @param tableLoader used for accessing the table
+ * @param rateLimiterStrategy limits the frequency the table is checked
+ * @param maxReadBack sets the number of snapshots read before stopping
change collection
+ */
+ public MonitorSource(
+ TableLoader tableLoader, RateLimiterStrategy rateLimiterStrategy, long
maxReadBack) {
+ Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+ Preconditions.checkNotNull(rateLimiterStrategy, "Rate limiter strategy
should no be null");
+ Preconditions.checkArgument(maxReadBack > 0, "Need to read at least 1
snapshot to work");
+
+ this.tableLoader = tableLoader;
+ this.rateLimiterStrategy = rateLimiterStrategy;
+ this.maxReadBack = maxReadBack;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.CONTINUOUS_UNBOUNDED;
+ }
+
+ @Override
+ public TypeInformation<TableChange> getProducedType() {
+ return TypeInformation.of(TableChange.class);
+ }
+
+ @Override
+ Iterator<TableChange> createIterator() {
+ return new TableChangeIterator(tableLoader, null, maxReadBack);
+ }
+
+ @Override
+ SimpleVersionedSerializer<Iterator<TableChange>> iteratorSerializer() {
+ return new TableChangeIteratorSerializer(tableLoader, maxReadBack);
+ }
+
+ @Override
+ public SourceReader<TableChange, GlobalSplit<TableChange>> createReader(
+ SourceReaderContext readerContext) throws Exception {
+ RateLimiter rateLimiter = rateLimiterStrategy.createRateLimiter(1);
+ return new RateLimitedSourceReader<>(super.createReader(readerContext),
rateLimiter);
+ }
+
+ /** The Iterator which returns the latest changes on an Iceberg table. */
+ @VisibleForTesting
+ static class TableChangeIterator implements Iterator<TableChange> {
+ private Long lastSnapshotId;
+ private final long maxReadBack;
+ private final Table table;
+
+ TableChangeIterator(TableLoader tableLoader, Long lastSnapshotId, long
maxReadBack) {
+ this.lastSnapshotId = lastSnapshotId;
+ this.maxReadBack = maxReadBack;
+ tableLoader.open();
+ this.table = tableLoader.loadTable();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public TableChange next() {
+ try {
+ table.refresh();
+ Snapshot currentSnapshot = table.currentSnapshot();
+ Long current = currentSnapshot != null ? currentSnapshot.snapshotId()
: null;
+ Long checking = current;
+ TableChange event = TableChange.empty();
+ long readBack = 0;
+ while (checking != null && !checking.equals(lastSnapshotId) &&
++readBack <= maxReadBack) {
+ Snapshot snapshot = table.snapshot(checking);
+ if (snapshot != null) {
+ if (!DataOperations.REPLACE.equals(snapshot.operation())) {
+ LOG.debug("Reading snapshot {}", snapshot.snapshotId());
+ event.merge(new TableChange(snapshot, table.io()));
+ } else {
+ LOG.debug("Skipping replace snapshot {}", snapshot.snapshotId());
+ }
+
+ checking = snapshot.parentId();
+ } else {
+ // If the last snapshot has been removed from the history
+ checking = null;
+ }
+ }
+
+ lastSnapshotId = current;
+ return event;
+ } catch (Exception e) {
+ LOG.warn("Failed to fetch table changes for {}", table, e);
+ return TableChange.empty();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("lastSnapshotId", lastSnapshotId)
+ .add("maxReadBack", maxReadBack)
+ .add("table", table)
+ .toString();
+ }
+ }
+
+ private static final class TableChangeIteratorSerializer
+ implements SimpleVersionedSerializer<Iterator<TableChange>> {
+
+ private static final int CURRENT_VERSION = 1;
+ private final TableLoader tableLoader;
+ private final long maxReadBack;
+
+ TableChangeIteratorSerializer(TableLoader tableLoader, long maxReadBack) {
+ this.tableLoader = tableLoader;
+ this.maxReadBack = maxReadBack;
+ }
+
+ @Override
+ public int getVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public byte[] serialize(Iterator<TableChange> iterator) throws IOException
{
+ Preconditions.checkArgument(
+ iterator instanceof TableChangeIterator,
+ "Use TableChangeIterator iterator. Found incompatible type: %s",
+ iterator.getClass());
+
+ TableChangeIterator tableChangeIterator = (TableChangeIterator) iterator;
+ DataOutputSerializer out = new DataOutputSerializer(8);
+ long toStore =
+ tableChangeIterator.lastSnapshotId != null ?
tableChangeIterator.lastSnapshotId : -1L;
+ out.writeLong(toStore);
+ return out.getCopyOfBuffer();
+ }
+
+ @Override
+ public TableChangeIterator deserialize(int version, byte[] serialized)
throws IOException {
+ if (version == CURRENT_VERSION) {
+ DataInputDeserializer in = new DataInputDeserializer(serialized);
+ long fromStore = in.readLong();
+ return new TableChangeIterator(
+ tableLoader, fromStore != -1 ? fromStore : null, maxReadBack);
+ } else {
+ throw new IOException("Unrecognized version or corrupt state: " +
version);
+ }
+ }
+ }
+}
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/SingleThreadedIteratorSource.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/SingleThreadedIteratorSource.java
new file mode 100644
index 0000000000..20c7684d97
--- /dev/null
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/SingleThreadedIteratorSource.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * Implementation of the Source V2 API which uses an iterator to read the
elements, and uses a
+ * single thread to do so.
+ *
+ * @param <T> The return type of the source
+ */
+@Internal
+public abstract class SingleThreadedIteratorSource<T>
+ implements Source<
+ T,
+ SingleThreadedIteratorSource.GlobalSplit<T>,
+ Collection<SingleThreadedIteratorSource.GlobalSplit<T>>>,
+ ResultTypeQueryable<T> {
+ private static final String PARALLELISM_ERROR = "Parallelism should be set
to 1";
+
+ /**
+ * Creates the iterator to return the elements which then emitted by the
source.
+ *
+ * @return iterator for the elements
+ */
+ abstract Iterator<T> createIterator();
+
+ /**
+ * Serializes the iterator, which is used to save and restore the state of
the source.
+ *
+ * @return serializer for the iterator
+ */
+ abstract SimpleVersionedSerializer<Iterator<T>> iteratorSerializer();
+
+ @Override
+ public SplitEnumerator<GlobalSplit<T>, Collection<GlobalSplit<T>>>
createEnumerator(
+ SplitEnumeratorContext<GlobalSplit<T>> enumContext) {
+ Preconditions.checkArgument(enumContext.currentParallelism() == 1,
PARALLELISM_ERROR);
+ return new IteratorSourceEnumerator<>(
+ enumContext, ImmutableList.of(new GlobalSplit<>(createIterator())));
+ }
+
+ @Override
+ public SplitEnumerator<GlobalSplit<T>, Collection<GlobalSplit<T>>>
restoreEnumerator(
+ SplitEnumeratorContext<GlobalSplit<T>> enumContext,
Collection<GlobalSplit<T>> checkpoint) {
+ Preconditions.checkArgument(enumContext.currentParallelism() == 1,
PARALLELISM_ERROR);
+ return new IteratorSourceEnumerator<>(enumContext, checkpoint);
+ }
+
+ @Override
+ public SimpleVersionedSerializer<GlobalSplit<T>> getSplitSerializer() {
+ return new SplitSerializer<>(iteratorSerializer());
+ }
+
+ @Override
+ public SimpleVersionedSerializer<Collection<GlobalSplit<T>>>
getEnumeratorCheckpointSerializer() {
+ return new EnumeratorSerializer<>(iteratorSerializer());
+ }
+
+ @Override
+ public SourceReader<T, GlobalSplit<T>> createReader(SourceReaderContext
readerContext)
+ throws Exception {
+ Preconditions.checkArgument(readerContext.getIndexOfSubtask() == 0,
PARALLELISM_ERROR);
+ return new IteratorSourceReader<>(readerContext);
+ }
+
+ /** The single split of the {@link SingleThreadedIteratorSource}. */
+ static class GlobalSplit<T> implements IteratorSourceSplit<T, Iterator<T>> {
+ private final Iterator<T> iterator;
+
+ GlobalSplit(Iterator<T> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public String splitId() {
+ return "1";
+ }
+
+ @Override
+ public Iterator<T> getIterator() {
+ return iterator;
+ }
+
+ @Override
+ public IteratorSourceSplit<T, Iterator<T>> getUpdatedSplitForIterator(
+ final Iterator<T> newIterator) {
+ return new GlobalSplit<>(newIterator);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("GlobalSplit (%s)", iterator);
+ }
+ }
+
+ private static final class SplitSerializer<T>
+ implements SimpleVersionedSerializer<GlobalSplit<T>> {
+ private final SimpleVersionedSerializer<Iterator<T>> iteratorSerializer;
+
+ SplitSerializer(SimpleVersionedSerializer<Iterator<T>> iteratorSerializer)
{
+ this.iteratorSerializer = iteratorSerializer;
+ }
+
+ private static final int CURRENT_VERSION = 1;
+
+ @Override
+ public int getVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public byte[] serialize(GlobalSplit<T> split) throws IOException {
+ return iteratorSerializer.serialize(split.iterator);
+ }
+
+ @Override
+ public GlobalSplit<T> deserialize(int version, byte[] serialized) throws
IOException {
+ return new GlobalSplit<>(iteratorSerializer.deserialize(version,
serialized));
+ }
+ }
+
+ private static final class EnumeratorSerializer<T>
+ implements SimpleVersionedSerializer<Collection<GlobalSplit<T>>> {
+ private static final int CURRENT_VERSION = 1;
+ private final SimpleVersionedSerializer<Iterator<T>> iteratorSerializer;
+
+ EnumeratorSerializer(SimpleVersionedSerializer<Iterator<T>>
iteratorSerializer) {
+ this.iteratorSerializer = iteratorSerializer;
+ }
+
+ @Override
+ public int getVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public byte[] serialize(Collection<GlobalSplit<T>> checkpoint) throws
IOException {
+ Preconditions.checkArgument(checkpoint.size() < 2, PARALLELISM_ERROR);
+ if (checkpoint.isEmpty()) {
+ return new byte[] {0};
+ } else {
+ byte[] iterator =
iteratorSerializer.serialize(checkpoint.iterator().next().getIterator());
+ byte[] result = new byte[iterator.length + 1];
+ result[0] = 1;
+ System.arraycopy(iterator, 0, result, 1, iterator.length);
+ return result;
+ }
+ }
+
+ @Override
+ public Collection<GlobalSplit<T>> deserialize(int version, byte[]
serialized)
+ throws IOException {
+ if (serialized[0] == 0) {
+ return Lists.newArrayList();
+ } else {
+ byte[] iterator = new byte[serialized.length - 1];
+ System.arraycopy(serialized, 1, iterator, 0, serialized.length - 1);
+ return Lists.newArrayList(
+ new GlobalSplit<>(iteratorSerializer.deserialize(version,
iterator)));
+ }
+ }
+ }
+}
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
new file mode 100644
index 0000000000..452ed80ed0
--- /dev/null
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.Objects;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+/** Event describing changes in an Iceberg table */
+@Internal
+class TableChange {
+ private int dataFileNum;
+ private int deleteFileNum;
+ private long dataFileSize;
+ private long deleteFileSize;
+ private int commitNum;
+
+ TableChange(
+ int dataFileNum, int deleteFileNum, long dataFileSize, long
deleteFileSize, int commitNum) {
+ this.dataFileNum = dataFileNum;
+ this.deleteFileNum = deleteFileNum;
+ this.dataFileSize = dataFileSize;
+ this.deleteFileSize = deleteFileSize;
+ this.commitNum = commitNum;
+ }
+
+ TableChange(Snapshot snapshot, FileIO io) {
+ Iterable<DataFile> dataFiles = snapshot.addedDataFiles(io);
+ Iterable<DeleteFile> deleteFiles = snapshot.addedDeleteFiles(io);
+
+ dataFiles.forEach(
+ dataFile -> {
+ this.dataFileNum++;
+ this.dataFileSize += dataFile.fileSizeInBytes();
+ });
+
+ deleteFiles.forEach(
+ deleteFile -> {
+ this.deleteFileNum++;
+ this.deleteFileSize += deleteFile.fileSizeInBytes();
+ });
+
+ this.commitNum = 1;
+ }
+
+ static TableChange empty() {
+ return new TableChange(0, 0, 0L, 0L, 0);
+ }
+
+ int dataFileNum() {
+ return dataFileNum;
+ }
+
+ int deleteFileNum() {
+ return deleteFileNum;
+ }
+
+ long dataFileSize() {
+ return dataFileSize;
+ }
+
+ long deleteFileSize() {
+ return deleteFileSize;
+ }
+
+ public int commitNum() {
+ return commitNum;
+ }
+
+ public void merge(TableChange other) {
+ this.dataFileNum += other.dataFileNum;
+ this.deleteFileNum += other.deleteFileNum;
+ this.dataFileSize += other.dataFileSize;
+ this.deleteFileSize += other.deleteFileSize;
+ this.commitNum += other.commitNum;
+ }
+
+ TableChange copy() {
+ return new TableChange(dataFileNum, deleteFileNum, dataFileSize,
deleteFileSize, commitNum);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("dataFileNum", dataFileNum)
+ .add("deleteFileNum", deleteFileNum)
+ .add("dataFileSize", dataFileSize)
+ .add("deleteFileSize", deleteFileSize)
+ .add("commitNum", commitNum)
+ .toString();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ } else if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+
+ TableChange that = (TableChange) other;
+ return dataFileNum == that.dataFileNum
+ && deleteFileNum == that.deleteFileNum
+ && dataFileSize == that.dataFileSize
+ && deleteFileSize == that.deleteFileSize
+ && commitNum == that.commitNum;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dataFileNum, deleteFileNum, dataFileSize,
deleteFileSize, commitNum);
+ }
+}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CollectingSink.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CollectingSink.java
new file mode 100644
index 0000000000..a49459d61a
--- /dev/null
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CollectingSink.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/** Sink for collecting output during testing. */
+class CollectingSink<T> implements Sink<T> {
+ private static final long serialVersionUID = 1L;
+ private static final List<BlockingQueue<Object>> queues =
+ Collections.synchronizedList(Lists.newArrayListWithExpectedSize(1));
+ private static final AtomicInteger numSinks = new AtomicInteger(-1);
+ private final int index;
+
+ /** Creates a new sink which collects the elements received. */
+ CollectingSink() {
+ this.index = numSinks.incrementAndGet();
+ queues.add(new LinkedBlockingQueue<>());
+ }
+
+ /**
+ * Gets all the remaining output received by this {@link Sink}.
+ *
+ * @return all the remaining output
+ */
+ List<T> remainingOutput() {
+ return Lists.newArrayList((BlockingQueue<T>) queues.get(this.index));
+ }
+
+ /**
+ * Check if there is no remaining output received by this {@link Sink}.
+ *
+ * @return <code>true</code> if there is no remaining output
+ */
+ boolean isEmpty() {
+ return queues.get(this.index).isEmpty();
+ }
+
+ /**
+ * Wait until the next element received by the {@link Sink}.
+ *
+ * @param timeout for the poll
+ * @return The first element received by this {@link Sink}
+ * @throws TimeoutException if no element received until the timeout
+ */
+ T poll(Duration timeout) throws TimeoutException {
+ Object element;
+
+ try {
+ element = queues.get(this.index).poll(timeout.toMillis(),
TimeUnit.MILLISECONDS);
+ } catch (InterruptedException var4) {
+ throw new RuntimeException(var4);
+ }
+
+ if (element == null) {
+ throw new TimeoutException();
+ } else {
+ return (T) element;
+ }
+ }
+
+ @Override
+ public SinkWriter<T> createWriter(InitContext context) {
+ return new CollectingWriter<>(index);
+ }
+
+ private static class CollectingWriter<T> implements SinkWriter<T> {
+ private final int index;
+
+ CollectingWriter(int index) {
+ this.index = index;
+ }
+
+ @Override
+ public void write(T element, Context context) {
+ queues.get(index).add(element);
+ }
+
+ @Override
+ public void flush(boolean endOfInput) {
+ // Nothing to do here
+ }
+
+ @Override
+ public void close() {
+ // Nothing to do here
+ }
+ }
+}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkSqlExtension.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkSqlExtension.java
new file mode 100644
index 0000000000..90790b373d
--- /dev/null
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkSqlExtension.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+/**
+ * Junit 5 extension for running Flink SQL queries. {@link
+ * org.apache.flink.test.junit5.MiniClusterExtension} is used for executing
the SQL batch jobs.
+ */
+public class FlinkSqlExtension implements BeforeEachCallback,
AfterEachCallback {
+ private final String catalogName;
+ private final Map<String, String> catalogProperties;
+ private final String databaseName;
+ private final Path warehouse;
+ private final CatalogLoader catalogLoader;
+ private TableEnvironment tableEnvironment;
+
+ public FlinkSqlExtension(
+ String catalogName, Map<String, String> catalogProperties, String
databaseName) {
+ this.catalogName = catalogName;
+ this.catalogProperties = Maps.newHashMap(catalogProperties);
+ this.databaseName = databaseName;
+
+ // Add temporary dir as a warehouse location
+ try {
+ this.warehouse = Files.createTempDirectory("warehouse");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ this.catalogProperties.put(
+ CatalogProperties.WAREHOUSE_LOCATION, String.format("file://%s",
warehouse));
+ this.catalogLoader =
+ CatalogLoader.hadoop(catalogName, new Configuration(),
this.catalogProperties);
+ }
+
+ @Override
+ public void beforeEach(ExtensionContext context) {
+ // We need to recreate the tableEnvironment for every test as the
minicluster is recreated
+ this.tableEnvironment =
+
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+ exec("CREATE CATALOG %s WITH %s", catalogName,
toWithClause(catalogProperties));
+ exec("CREATE DATABASE %s.%s", catalogName, databaseName);
+ exec("USE CATALOG %s", catalogName);
+ exec("USE %s", databaseName);
+ }
+
+ @Override
+ public void afterEach(ExtensionContext context) throws IOException {
+ List<Row> tables = exec("SHOW TABLES");
+ tables.forEach(t -> exec("DROP TABLE IF EXISTS %s", t.getField(0)));
+ exec("USE CATALOG default_catalog");
+ exec("DROP CATALOG IF EXISTS %s", catalogName);
+
Files.walk(warehouse).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
+ }
+
+ /**
+ * Executes an SQL query with the given parameters. The parameter
substitution is done by {@link
+ * String#format(String, Object...)}.
+ *
+ * @param query to run
+ * @param parameters to substitute to the query
+ * @return The {@link Row}s returned by the query
+ */
+ public List<Row> exec(String query, Object... parameters) {
+ TableResult tableResult = tableEnvironment.executeSql(String.format(query,
parameters));
+ try (CloseableIterator<Row> iter = tableResult.collect()) {
+ return Lists.newArrayList(iter);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to collect table result", e);
+ }
+ }
+
+ /**
+ * Returns the {@link TableLoader} which could be used to access the given
table.
+ *
+ * @param tableName of the table
+ * @return the {@link TableLoader} for the table
+ */
+ public TableLoader tableLoader(String tableName) {
+ TableLoader tableLoader =
+ TableLoader.fromCatalog(catalogLoader,
TableIdentifier.of(databaseName, tableName));
+ tableLoader.open();
+ return tableLoader;
+ }
+
+ private static String toWithClause(Map<String, String> props) {
+ return String.format(
+ "(%s)",
+ props.entrySet().stream()
+ .map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue()))
+ .collect(Collectors.joining(",")));
+ }
+}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkStreamingTestUtils.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkStreamingTestUtils.java
new file mode 100644
index 0000000000..9cdc55cb0c
--- /dev/null
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkStreamingTestUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.File;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.awaitility.Awaitility;
+
+class FlinkStreamingTestUtils {
+ private FlinkStreamingTestUtils() {
+ // Do not instantiate
+ }
+
+ /**
+ * Close the {@link JobClient} and wait for the job closure. If the
savepointDir is specified, it
+ * stops the job with a savepoint.
+ *
+ * @param jobClient the job to close
+ * @param savepointDir the savepointDir to store the last savepoint. If
<code>null</code> then
+ * stop without a savepoint.
+ * @return configuration for restarting the job from the savepoint
+ */
+ static Configuration closeJobClient(JobClient jobClient, File savepointDir) {
+ Configuration conf = new Configuration();
+ if (jobClient != null) {
+ if (savepointDir != null) {
+ // Stop with savepoint
+ jobClient.stopWithSavepoint(false, savepointDir.getPath(),
SavepointFormatType.CANONICAL);
+ // Wait until the savepoint is created and the job has been stopped
+ Awaitility.await().until(() ->
savepointDir.listFiles(File::isDirectory).length == 1);
+ conf.set(
+ SavepointConfigOptions.SAVEPOINT_PATH,
+ savepointDir.listFiles(File::isDirectory)[0].getAbsolutePath());
+ } else {
+ jobClient.cancel();
+ }
+
+ // Wait until the job has been stopped
+ Awaitility.await().until(() ->
jobClient.getJobStatus().get().isTerminalState());
+ return conf;
+ }
+
+ return null;
+ }
+
+ /**
+ * Close the {@link JobClient} and wait for the job closure.
+ *
+ * @param jobClient the job to close
+ */
+ static void closeJobClient(JobClient jobClient) {
+ closeJobClient(jobClient, null);
+ }
+}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java
new file mode 100644
index 0000000000..e08742a89d
--- /dev/null
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Queues;
+import org.jetbrains.annotations.Nullable;
+
+/** Testing source implementation for Flink sources which can be triggered
manually. */
+class ManualSource<T>
+ implements Source<T, ManualSource.DummySplit,
ManualSource.DummyCheckpoint>,
+ ResultTypeQueryable<T> {
+
+ private static final long serialVersionUID = 1L;
+ private static final List<ArrayDeque<Tuple2<?, Long>>> queues =
+ Collections.synchronizedList(Lists.newArrayList());
+ private static final List<CompletableFuture<Void>> availabilities =
+ Collections.synchronizedList(Lists.newArrayList());
+ private static int numSources = 0;
+ private final TypeInformation<T> type;
+ private final int index;
+ private transient DataStream<T> stream;
+ private final transient StreamExecutionEnvironment env;
+
+ /**
+ * Creates a new source for testing.
+ *
+ * @param env to register the source
+ * @param type of the events returned by the source
+ */
+ ManualSource(StreamExecutionEnvironment env, TypeInformation<T> type) {
+ this.type = type;
+ this.env = env;
+ this.index = numSources++;
+ queues.add(Queues.newArrayDeque());
+ availabilities.add(new CompletableFuture<>());
+ }
+
+ /**
+ * Emit a new record from the source.
+ *
+ * @param event to emit
+ */
+ void sendRecord(T event) {
+ this.sendInternal(Tuple2.of(event, null));
+ }
+
+ /**
+ * Emit a new record with the given event time from the source.
+ *
+ * @param event to emit
+ * @param eventTime of the event
+ */
+ void sendRecord(T event, long eventTime) {
+ this.sendInternal(Tuple2.of(event, eventTime));
+ }
+
+ /**
+ * Emit a watermark from the source.
+ *
+ * @param timeStamp of the watermark
+ */
+ void sendWatermark(long timeStamp) {
+ this.sendInternal(Tuple2.of(null, timeStamp));
+ }
+
+ /** Mark the source as finished. */
+ void markFinished() {
+ this.sendWatermark(Long.MAX_VALUE);
+ this.sendInternal(Tuple2.of(null, null));
+ }
+
+ /**
+ * Get the {@link DataStream} for this source.
+ *
+ * @return the stream emitted by this source
+ */
+ DataStream<T> dataStream() {
+ if (this.stream == null) {
+ this.stream =
+ this.env
+ .fromSource(this, WatermarkStrategy.noWatermarks(),
"ManualSource-" + index, type)
+ .forceNonParallel();
+ }
+
+ return this.stream;
+ }
+
+ private void sendInternal(Tuple2<?, Long> tuple) {
+ queues.get(index).offer(tuple);
+ availabilities.get(index).complete(null);
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.CONTINUOUS_UNBOUNDED;
+ }
+
+ @Override
+ public SplitEnumerator<DummySplit, DummyCheckpoint> createEnumerator(
+ SplitEnumeratorContext<DummySplit> enumContext) {
+ return new DummyCheckpointEnumerator();
+ }
+
+ @Override
+ public SplitEnumerator<DummySplit, DummyCheckpoint> restoreEnumerator(
+ SplitEnumeratorContext<DummySplit> enumContext, DummyCheckpoint
checkpoint) {
+ return new DummyCheckpointEnumerator();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<DummySplit> getSplitSerializer() {
+ return new NoOpDummySplitSerializer();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<DummyCheckpoint>
getEnumeratorCheckpointSerializer() {
+ return new NoOpDummyCheckpointSerializer();
+ }
+
+ @Override
+ public SourceReader<T, DummySplit> createReader(SourceReaderContext
sourceReaderContext) {
+ return new SourceReader<T, DummySplit>() {
+ @Override
+ public void start() {
+ // Do nothing
+ }
+
+ @Override
+ public InputStatus pollNext(ReaderOutput<T> output) {
+ Tuple2<T, Long> next = (Tuple2<T, Long>) queues.get(index).poll();
+
+ if (next != null) {
+ if (next.f0 == null) {
+ // No more input
+ return InputStatus.END_OF_INPUT;
+ }
+
+ if (next.f1 == null) {
+ // No event time set
+ output.collect(next.f0);
+ } else {
+ // With event time
+ output.collect(next.f0, next.f1);
+ }
+ }
+
+ availabilities.set(index, new CompletableFuture<>());
+ return queues.get(index).isEmpty()
+ ? InputStatus.NOTHING_AVAILABLE
+ : InputStatus.MORE_AVAILABLE;
+ }
+
+ @Override
+ public List<DummySplit> snapshotState(long checkpointId) {
+ return Lists.newArrayList(new DummySplit());
+ }
+
+ @Override
+ public CompletableFuture<Void> isAvailable() {
+ return availabilities.get(index);
+ }
+
+ @Override
+ public void addSplits(List<DummySplit> splits) {
+ // do nothing
+ }
+
+ @Override
+ public void notifyNoMoreSplits() {
+ // do nothing
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+ };
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return this.type;
+ }
+
+ /**
+ * Placeholder because the ManualSource itself implicitly represents the
only split and does not
+ * require an actual split object.
+ */
+ public static class DummySplit implements SourceSplit {
+ @Override
+ public String splitId() {
+ return "dummy";
+ }
+ }
+
+ /**
+ * Placeholder because the ManualSource does not support fault-tolerance and
thus does not require
+ * actual checkpointing.
+ */
+ public static class DummyCheckpoint {}
+
+ /** Placeholder because the ManualSource does not need enumeration, but
checkpointing needs it. */
+ private static class DummyCheckpointEnumerator
+ implements SplitEnumerator<DummySplit, DummyCheckpoint> {
+
+ @Override
+ public void start() {
+ // do nothing
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId, @Nullable String
requesterHostname) {
+ // do nothing
+ }
+
+ @Override
+ public void addSplitsBack(List<DummySplit> splits, int subtaskId) {
+ // do nothing
+ }
+
+ @Override
+ public void addReader(int subtaskId) {
+ // do nothing
+ }
+
+ @Override
+ public DummyCheckpoint snapshotState(long checkpointId) {
+ return new DummyCheckpoint();
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+ }
+
+ /**
+ * Not used - only required to avoid NullPointerException. The split is not
transferred from the
+ * enumerator, it is implicitly represented by the ManualSource.
+ */
+ private static class NoOpDummySplitSerializer implements
SimpleVersionedSerializer<DummySplit> {
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public byte[] serialize(DummySplit split) {
+ return new byte[0];
+ }
+
+ @Override
+ public DummySplit deserialize(int version, byte[] serialized) {
+ return new DummySplit();
+ }
+ }
+
+ /**
+ * Not used - only required to avoid NullPointerException. The split is not
transferred from the
+ * enumerator, it is implicitly represented by the ManualSource.
+ */
+ private static class NoOpDummyCheckpointSerializer
+ implements SimpleVersionedSerializer<DummyCheckpoint> {
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public byte[] serialize(DummyCheckpoint split) {
+ return new byte[0];
+ }
+
+ @Override
+ public DummyCheckpoint deserialize(int version, byte[] serialized) {
+ return new DummyCheckpoint();
+ }
+ }
+}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
new file mode 100644
index 0000000000..269ae681b0
--- /dev/null
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static
org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.iceberg.flink.FlinkCatalogFactory;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class OperatorTestBase {
+ private static final int NUMBER_TASK_MANAGERS = 1;
+ private static final int SLOTS_PER_TASK_MANAGER = 8;
+
+ static final String TABLE_NAME = "test_table";
+
+ @RegisterExtension
+ protected static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
+ .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
+ .setConfiguration(new
Configuration(DISABLE_CLASSLOADER_CHECK_CONFIG))
+ .build());
+
+ @RegisterExtension
+ final FlinkSqlExtension sql =
+ new FlinkSqlExtension(
+ "catalog",
+ ImmutableMap.of("type", "iceberg",
FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hadoop"),
+ "db");
+}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
new file mode 100644
index 0000000000..876d642145
--- /dev/null
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static
org.apache.iceberg.flink.maintenance.operator.FlinkStreamingTestUtils.closeJobClient;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class TestMonitorSource extends OperatorTestBase {
+ private static final TableChange EMPTY_EVENT = TableChange.empty();
+ private static final RateLimiterStrategy HIGH_RATE =
RateLimiterStrategy.perSecond(100.0);
+ private static final RateLimiterStrategy LOW_RATE =
RateLimiterStrategy.perSecond(1.0 / 10000.0);
+
+ @TempDir private File checkpointDir;
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testChangeReaderIterator(boolean withDelete) {
+ if (withDelete) {
+ sql.exec(
+ "CREATE TABLE %s (id int, data varchar, PRIMARY KEY(`id`) NOT
ENFORCED) WITH ('format-version'='2', 'write.upsert.enabled'='true')",
+ TABLE_NAME);
+ } else {
+ sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME);
+ }
+
+ TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+ tableLoader.open();
+ Table table = tableLoader.loadTable();
+
+ MonitorSource.TableChangeIterator iterator =
+ new MonitorSource.TableChangeIterator(tableLoader, null,
Long.MAX_VALUE);
+
+ // For an empty table we get an empty result
+ assertThat(iterator.next()).isEqualTo(EMPTY_EVENT);
+
+ // Add a single commit and get back the commit data in the event
+ sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME);
+ table.refresh();
+ TableChange expected = tableChangeWithLastSnapshot(table,
TableChange.empty());
+ assertThat(iterator.next()).isEqualTo(expected);
+ // Make sure that consecutive calls do not return the data again
+ assertThat(iterator.next()).isEqualTo(EMPTY_EVENT);
+
+ // Add two more commits, but fetch the data in one loop
+ sql.exec("INSERT INTO %s VALUES (2, 'b')", TABLE_NAME);
+ table.refresh();
+ expected = tableChangeWithLastSnapshot(table, TableChange.empty());
+
+ sql.exec("INSERT INTO %s VALUES (3, 'c')", TABLE_NAME);
+ table.refresh();
+ expected = tableChangeWithLastSnapshot(table, expected);
+
+ assertThat(iterator.next()).isEqualTo(expected);
+ // Make sure that consecutive calls do not return the data again
+ assertThat(iterator.next()).isEqualTo(EMPTY_EVENT);
+ }
+
+ /**
+ * Create a table and check that the source returns the data as new commits
arrive to the table.
+ */
+ @Test
+ void testSource() throws Exception {
+ sql.exec(
+ "CREATE TABLE %s (id int, data varchar) "
+ + "WITH ('flink.max-continuous-empty-commits'='100000')",
+ TABLE_NAME);
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+ tableLoader.open();
+ Table table = tableLoader.loadTable();
+ DataStream<TableChange> events =
+ env.fromSource(
+ new MonitorSource(tableLoader, HIGH_RATE, Long.MAX_VALUE),
+ WatermarkStrategy.noWatermarks(),
+ "TableChangeSource")
+ .forceNonParallel();
+
+ // Sink to collect the results
+ CollectingSink<TableChange> result = new CollectingSink<>();
+ events.sinkTo(result);
+
+ JobClient jobClient = null;
+ try {
+ // First result is an empty event
+ jobClient = env.executeAsync("Table Change Source Test");
+ assertThat(result.poll(Duration.ofSeconds(5L))).isEqualTo(EMPTY_EVENT);
+
+ // Insert some data
+ File dataDir = new File(new Path(table.location(),
"data").toUri().getPath());
+ dataDir.mkdir();
+ GenericAppenderHelper dataAppender =
+ new GenericAppenderHelper(table, FileFormat.PARQUET,
dataDir.toPath());
+ List<Record> batch1 = RandomGenericData.generate(table.schema(), 2, 1);
+ dataAppender.appendToTable(batch1);
+
+ // Wait until the changes are committed
+ Awaitility.await()
+ .until(
+ () -> {
+ table.refresh();
+ return table.currentSnapshot() != null;
+ });
+
+ table.refresh();
+ long size = firstFileLength(table);
+
+ // Wait until the first non-empty event has arrived, and check the
expected result
+ Awaitility.await()
+ .until(
+ () -> {
+ TableChange newEvent = result.poll(Duration.ofSeconds(5L));
+ // Fetch every empty event from the beginning
+ while (newEvent.equals(EMPTY_EVENT)) {
+ newEvent = result.poll(Duration.ofSeconds(5L));
+ }
+
+ // The first non-empty event should contain the expected value
+ return newEvent.equals(new TableChange(1, 0, size, 0L, 1));
+ });
+ } finally {
+ closeJobClient(jobClient);
+ }
+ }
+
+ /** Check that the {@link MonitorSource} operator state is restored
correctly. */
+ @Test
+ void testStateRestore(@TempDir File savepointDir) throws Exception {
+ sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME);
+ sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME);
+
+ Configuration config = new Configuration();
+ config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
+ config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" +
checkpointDir.getPath());
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.enableCheckpointing(1000);
+
+ TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+ tableLoader.open();
+ DataStream<TableChange> events =
+ env.fromSource(
+ new MonitorSource(tableLoader, HIGH_RATE, Long.MAX_VALUE),
+ WatermarkStrategy.noWatermarks(),
+ "TableChangeSource")
+ .forceNonParallel();
+
+ // Sink to collect the results
+ CollectingSink<TableChange> result = new CollectingSink<>();
+ events.sinkTo(result);
+
+ // Start the job
+ Configuration conf;
+ JobClient jobClient = null;
+ AtomicReference<TableChange> firstNonEmptyEvent = new AtomicReference<>();
+ try {
+ jobClient = env.executeAsync("Table Change Source Test");
+
+ Awaitility.await()
+ .until(
+ () -> {
+ TableChange newEvent = result.poll(Duration.ofSeconds(5L));
+ // Fetch every empty event from the beginning
+ while (newEvent.equals(EMPTY_EVENT)) {
+ newEvent = result.poll(Duration.ofSeconds(5L));
+ }
+
+ // The first non-empty event should contain the expected value
+ firstNonEmptyEvent.set(newEvent);
+ return true;
+ });
+ } finally {
+ // Stop with savepoint
+ conf = closeJobClient(jobClient, savepointDir);
+ }
+
+ // Restore from savepoint, create the same topology with a different env
+ env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
+ events =
+ env.fromSource(
+ new MonitorSource(tableLoader, LOW_RATE, Long.MAX_VALUE),
+ WatermarkStrategy.noWatermarks(),
+ "TableChangeSource")
+ .forceNonParallel();
+ CollectingSink<TableChange> resultWithSavepoint = new CollectingSink<>();
+ events.sinkTo(resultWithSavepoint);
+
+ // Make sure that the job with restored source does not read new records
from the table
+ JobClient clientWithSavepoint = null;
+ try {
+ clientWithSavepoint = env.executeAsync("Table Change Source test with
savepoint");
+
+
assertThat(resultWithSavepoint.poll(Duration.ofSeconds(5L))).isEqualTo(EMPTY_EVENT);
+ } finally {
+ closeJobClient(clientWithSavepoint, null);
+ }
+
+ // Restore without savepoint
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ events =
+ env.fromSource(
+ new MonitorSource(tableLoader, LOW_RATE, Long.MAX_VALUE),
+ WatermarkStrategy.noWatermarks(),
+ "TableChangeSource")
+ .forceNonParallel();
+ CollectingSink<TableChange> resultWithoutSavepoint = new
CollectingSink<>();
+ events.sinkTo(resultWithoutSavepoint);
+
+ // Make sure that a new job without state reads the event as expected
+ JobClient clientWithoutSavepoint = null;
+ try {
+ clientWithoutSavepoint = env.executeAsync("Table Change Source Test
without savepoint");
+ assertThat(resultWithoutSavepoint.poll(Duration.ofSeconds(5L)))
+ .isEqualTo(firstNonEmptyEvent.get());
+ } finally {
+ closeJobClient(clientWithoutSavepoint);
+ }
+ }
+
+ @Test
+ void testNotOneParallelismThrows() {
+ sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+ tableLoader.open();
+
+ env.fromSource(
+ new MonitorSource(tableLoader, HIGH_RATE, Long.MAX_VALUE),
+ WatermarkStrategy.noWatermarks(),
+ "TableChangeSource")
+ .setParallelism(2)
+ .print();
+
+ assertThatThrownBy(env::execute)
+ .isInstanceOf(JobExecutionException.class)
+ .rootCause()
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Parallelism should be set to 1");
+ }
+
+ @Test
+ void testMaxReadBack() {
+ sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME);
+ sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME);
+ sql.exec("INSERT INTO %s VALUES (2, 'b')", TABLE_NAME);
+ sql.exec("INSERT INTO %s VALUES (3, 'c')", TABLE_NAME);
+
+ TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+ tableLoader.open();
+
+ MonitorSource.TableChangeIterator iterator =
+ new MonitorSource.TableChangeIterator(tableLoader, null, 1);
+
+ // For a single maxReadBack we only get a single change
+ assertThat(iterator.next().commitNum()).isEqualTo(1);
+
+ iterator = new MonitorSource.TableChangeIterator(tableLoader, null, 2);
+
+ // Expecting 2 commits/snapshots for maxReadBack=2
+ assertThat(iterator.next().commitNum()).isEqualTo(2);
+
+ iterator = new MonitorSource.TableChangeIterator(tableLoader, null,
Long.MAX_VALUE);
+
+ // For maxReadBack Long.MAX_VALUE we get every change
+ assertThat(iterator.next().commitNum()).isEqualTo(3);
+ }
+
+ @Test
+ void testSkipReplace() {
+ sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME);
+ sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME);
+
+ TableLoader tableLoader = sql.tableLoader(TABLE_NAME);
+ tableLoader.open();
+
+ MonitorSource.TableChangeIterator iterator =
+ new MonitorSource.TableChangeIterator(tableLoader, null,
Long.MAX_VALUE);
+
+ // Read the current snapshot
+ assertThat(iterator.next().commitNum()).isEqualTo(1);
+
+ // Create a DataOperations.REPLACE snapshot
+ Table table = tableLoader.loadTable();
+ DataFile dataFile =
+
table.snapshots().iterator().next().addedDataFiles(table.io()).iterator().next();
+ RewriteFiles rewrite = tableLoader.loadTable().newRewrite();
+ // Replace the file with itself for testing purposes
+ rewrite.deleteFile(dataFile);
+ rewrite.addFile(dataFile);
+ rewrite.commit();
+
+ // Check that the rewrite is ignored
+ assertThat(iterator.next()).isEqualTo(EMPTY_EVENT);
+ }
+
+ private static long firstFileLength(Table table) {
+ return
table.currentSnapshot().addedDataFiles(table.io()).iterator().next().fileSizeInBytes();
+ }
+
+ private static TableChange tableChangeWithLastSnapshot(Table table,
TableChange previous) {
+ List<DataFile> dataFiles =
+
Lists.newArrayList(table.currentSnapshot().addedDataFiles(table.io()).iterator());
+ List<DeleteFile> deleteFiles =
+
Lists.newArrayList(table.currentSnapshot().addedDeleteFiles(table.io()).iterator());
+
+ long dataSize = dataFiles.stream().mapToLong(d ->
d.fileSizeInBytes()).sum();
+ long deleteSize = deleteFiles.stream().mapToLong(d ->
d.fileSizeInBytes()).sum();
+ boolean hasDelete =
table.currentSnapshot().addedDeleteFiles(table.io()).iterator().hasNext();
+
+ return new TableChange(
+ previous.dataFileNum() + dataFiles.size(),
+ previous.deleteFileNum() + deleteFiles.size(),
+ previous.dataFileSize() + dataSize,
+ previous.deleteFileSize() + deleteSize,
+ previous.commitNum() + 1);
+ }
+}