This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new ab0594bf71 Flink: Port #10484 to v1.19 (#11010) (#11117)
ab0594bf71 is described below
commit ab0594bf71a6884ee0e196470bfe4b4d3baa58b9
Author: pvary <[email protected]>
AuthorDate: Thu Sep 12 16:56:17 2024 +0300
Flink: Port #10484 to v1.19 (#11010) (#11117)
---
.../maintenance/operator/JdbcLockFactory.java | 4 +
.../flink/maintenance/operator/LockRemover.java | 144 +++++++++++++++++++++
.../operator/TableMaintenanceMetrics.java | 5 +
.../flink/maintenance/operator/TaskResult.java | 65 ++++++++++
.../flink/maintenance/operator/TriggerManager.java | 14 +-
.../flink/maintenance/operator/ManualSource.java | 16 ++-
.../maintenance/operator/TestLockRemover.java | 3 +-
.../maintenance/operator/TestLockRemover.java | 3 +-
8 files changed, 238 insertions(+), 16 deletions(-)
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java
index f22be33aea..085fbfecd2 100644
---
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java
@@ -38,6 +38,10 @@ import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * JDBC table backed implementation of the {@link
+ * org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory}.
+ */
public class JdbcLockFactory implements TriggerLockFactory {
private static final Logger LOG =
LoggerFactory.getLogger(JdbcLockFactory.class);
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemover.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemover.java
new file mode 100644
index 0000000000..3c3761ef2f
--- /dev/null
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemover.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages locks and collect {@link org.apache.flink.metrics.Metric} for the
Maintenance Tasks.
+ *
+ * <p>The assumptions about the locks are the following:
+ *
+ * <ul>
+ * <li>Every {@link TaskResult} is followed by a {@link Watermark} for
normal {@link Trigger}s
+ * <li>For the {@link Trigger#recovery(long)} {@link Watermark} there is no
element to process
+ * </ul>
+ *
+ * When processing the inputs there are 3 possibilities:
+ *
+ * <ul>
+ * <li>Normal execution - we receive a {@link TaskResult} and then a {@link
Watermark} - unlocking
+ * the lock is handled by the {@link #processElement(StreamRecord)}
+ * <li>Recovery without ongoing execution (unlocking the recoveryLock) - we
receive the {@link
+ * Trigger#recovery(long)} {@link Watermark} without any {@link
TaskResult} - unlocking the
+ * {@link TriggerLockFactory#createRecoveryLock()} and a possible {@link
+ * TriggerLockFactory#createLock()} is handled by the {@link
#processWatermark(Watermark)}
+ * (the {@link #lastProcessedTaskStartEpoch} is 0 in this case)
+ * <li>Recovery with an ongoing execution - we receive a {@link TaskResult}
and then a {@link
+ * Watermark} - unlocking the {@link TriggerLockFactory#createLock()} is
handled by the {@link
+ * #processElement(StreamRecord)}, unlocking the {@link
+ * TriggerLockFactory#createRecoveryLock()} is handled by the {@link
+ * #processWatermark(Watermark)} (the {@link
#lastProcessedTaskStartEpoch} is the start time
+ * of the old task)
+ * </ul>
+ */
+@Internal
+public class LockRemover extends AbstractStreamOperator<Void>
+ implements OneInputStreamOperator<TaskResult, Void> {
+ private static final Logger LOG = LoggerFactory.getLogger(LockRemover.class);
+
+ private final TriggerLockFactory lockFactory;
+ private final List<String> maintenanceTaskNames;
+
+ private transient List<Counter> succeededTaskResultCounters;
+ private transient List<Counter> failedTaskResultCounters;
+ private transient List<AtomicLong> taskLastRunDurationMs;
+ private transient TriggerLockFactory.Lock lock;
+ private transient TriggerLockFactory.Lock recoveryLock;
+ private transient long lastProcessedTaskStartEpoch = 0L;
+
+ public LockRemover(TriggerLockFactory lockFactory, List<String>
maintenanceTaskNames) {
+ Preconditions.checkNotNull(lockFactory, "Lock factory should no be null");
+ Preconditions.checkArgument(
+ maintenanceTaskNames != null && !maintenanceTaskNames.isEmpty(),
+ "Invalid maintenance task names: null or empty");
+
+ this.lockFactory = lockFactory;
+ this.maintenanceTaskNames = maintenanceTaskNames;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ this.succeededTaskResultCounters =
+ Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size());
+ this.failedTaskResultCounters =
Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size());
+ this.taskLastRunDurationMs =
Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size());
+ for (String name : maintenanceTaskNames) {
+ succeededTaskResultCounters.add(
+ getRuntimeContext()
+ .getMetricGroup()
+ .addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
+ .counter(TableMaintenanceMetrics.SUCCEEDED_TASK_COUNTER));
+ failedTaskResultCounters.add(
+ getRuntimeContext()
+ .getMetricGroup()
+ .addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
+ .counter(TableMaintenanceMetrics.FAILED_TASK_COUNTER));
+ AtomicLong duration = new AtomicLong(0);
+ taskLastRunDurationMs.add(duration);
+ getRuntimeContext()
+ .getMetricGroup()
+ .addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
+ .gauge(TableMaintenanceMetrics.LAST_RUN_DURATION_MS, duration::get);
+ }
+
+ this.lock = lockFactory.createLock();
+ this.recoveryLock = lockFactory.createRecoveryLock();
+ }
+
+ @Override
+ public void processElement(StreamRecord<TaskResult> streamRecord) {
+ TaskResult taskResult = streamRecord.getValue();
+ LOG.info(
+ "Processing result {} for task {}",
+ taskResult,
+ maintenanceTaskNames.get(taskResult.taskIndex()));
+ long duration = System.currentTimeMillis() - taskResult.startEpoch();
+ lock.unlock();
+ this.lastProcessedTaskStartEpoch = taskResult.startEpoch();
+
+ // Update the metrics
+ taskLastRunDurationMs.get(taskResult.taskIndex()).set(duration);
+ if (taskResult.success()) {
+ succeededTaskResultCounters.get(taskResult.taskIndex()).inc();
+ } else {
+ failedTaskResultCounters.get(taskResult.taskIndex()).inc();
+ }
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) {
+ if (mark.getTimestamp() > lastProcessedTaskStartEpoch) {
+ lock.unlock();
+ recoveryLock.unlock();
+ }
+ }
+}
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java
index ec0fd920c3..1a04461aed 100644
---
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java
@@ -28,6 +28,11 @@ public class TableMaintenanceMetrics {
public static final String TRIGGERED = "triggered";
public static final String NOTHING_TO_TRIGGER = "nothingToTrigger";
+ // LockRemover metrics
+ public static final String SUCCEEDED_TASK_COUNTER = "succeededTasks";
+ public static final String FAILED_TASK_COUNTER = "failedTasks";
+ public static final String LAST_RUN_DURATION_MS = "lastRunDurationMs";
+
private TableMaintenanceMetrics() {
// do not instantiate
}
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResult.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResult.java
new file mode 100644
index 0000000000..06f10f1c1d
--- /dev/null
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResult.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.List;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+/** The result of a single Maintenance Task. */
+@Internal
+public class TaskResult {
+ private final int taskIndex;
+ private final long startEpoch;
+ private final boolean success;
+ private final List<Exception> exceptions;
+
+ public TaskResult(int taskIndex, long startEpoch, boolean success,
List<Exception> exceptions) {
+ this.taskIndex = taskIndex;
+ this.startEpoch = startEpoch;
+ this.success = success;
+ this.exceptions = exceptions;
+ }
+
+ public int taskIndex() {
+ return taskIndex;
+ }
+
+ public long startEpoch() {
+ return startEpoch;
+ }
+
+ public boolean success() {
+ return success;
+ }
+
+ public List<Exception> exceptions() {
+ return exceptions;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("taskIndex", taskIndex)
+ .add("startEpoch", startEpoch)
+ .add("success", success)
+ .add("exceptions", exceptions)
+ .toString();
+ }
+}
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
index f4c3c1d47c..dc95b27af0 100644
---
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
@@ -63,7 +63,7 @@ class TriggerManager extends KeyedProcessFunction<Boolean,
TableChange, Trigger>
private final TableLoader tableLoader;
private final TriggerLockFactory lockFactory;
- private final List<String> taskNames;
+ private final List<String> maintenanceTaskNames;
private final List<TriggerEvaluator> evaluators;
private final long minFireDelayMs;
private final long lockCheckDelayMs;
@@ -92,25 +92,27 @@ class TriggerManager extends KeyedProcessFunction<Boolean,
TableChange, Trigger>
TriggerManager(
TableLoader tableLoader,
TriggerLockFactory lockFactory,
- List<String> taskNames,
+ List<String> maintenanceTaskNames,
List<TriggerEvaluator> evaluators,
long minFireDelayMs,
long lockCheckDelayMs) {
Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
Preconditions.checkNotNull(lockFactory, "Lock factory should no be null");
Preconditions.checkArgument(
- taskNames != null && !taskNames.isEmpty(), "Invalid task names: null
or empty");
+ maintenanceTaskNames != null && !maintenanceTaskNames.isEmpty(),
+ "Invalid maintenance task names: null or empty");
Preconditions.checkArgument(
evaluators != null && !evaluators.isEmpty(), "Invalid evaluators: null
or empty");
Preconditions.checkArgument(
- taskNames.size() == evaluators.size(), "Provide a name and evaluator
for all of the tasks");
+ maintenanceTaskNames.size() == evaluators.size(),
+ "Provide a name and evaluator for all of the maintenance tasks");
Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should
be at least 1.");
Preconditions.checkArgument(
lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1
ms.");
this.tableLoader = tableLoader;
this.lockFactory = lockFactory;
- this.taskNames = taskNames;
+ this.maintenanceTaskNames = maintenanceTaskNames;
this.evaluators = evaluators;
this.minFireDelayMs = minFireDelayMs;
this.lockCheckDelayMs = lockCheckDelayMs;
@@ -137,7 +139,7 @@ class TriggerManager extends KeyedProcessFunction<Boolean,
TableChange, Trigger>
TableMaintenanceMetrics.GROUP_KEY,
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
.counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER);
this.triggerCounters =
- taskNames.stream()
+ maintenanceTaskNames.stream()
.map(
name ->
getRuntimeContext()
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java
index 38bb9c393f..679b3ec508 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java
@@ -22,6 +22,7 @@ import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
@@ -156,23 +157,26 @@ class ManualSource<T>
@Override
public SourceReader<T, DummySplit> createReader(SourceReaderContext
sourceReaderContext) {
- return new SourceReader<T, DummySplit>() {
+ return new SourceReader<>() {
@Override
public void start() {
// Do nothing
}
+ @SuppressWarnings("unchecked")
@Override
public InputStatus pollNext(ReaderOutput<T> output) {
Tuple2<T, Long> next = (Tuple2<T, Long>) QUEUES.get(index).poll();
if (next != null) {
if (next.f0 == null) {
- // No more input
- return InputStatus.END_OF_INPUT;
- }
-
- if (next.f1 == null) {
+ if (next.f1 == null) {
+ // No more input
+ return InputStatus.END_OF_INPUT;
+ } else {
+ output.emitWatermark(new Watermark(next.f1));
+ }
+ } else if (next.f1 == null) {
// No event time set
output.collect(next.f0);
} else {
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java
similarity index 99%
copy from
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java
copy to
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java
index ccb90ec33d..cffcc4eb04 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java
@@ -24,7 +24,6 @@ import static
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetr
import static org.assertj.core.api.Assertions.assertThat;
import java.io.File;
-import java.time.Duration;
import java.util.Collection;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -100,8 +99,8 @@ class TestLockRemover extends OperatorTestBase {
Configuration config = new Configuration();
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" +
checkpointDir.getPath());
- config.set(CheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofMillis(10));
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.enableCheckpointing(10);
ManualSource<TaskResult> source = new ManualSource<>(env,
TypeInformation.of(TaskResult.class));
source.dataStream().global().sinkTo(new
SinkTest()).name(sinkName).setParallelism(1);
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java
index ccb90ec33d..cffcc4eb04 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java
@@ -24,7 +24,6 @@ import static
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetr
import static org.assertj.core.api.Assertions.assertThat;
import java.io.File;
-import java.time.Duration;
import java.util.Collection;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -100,8 +99,8 @@ class TestLockRemover extends OperatorTestBase {
Configuration config = new Configuration();
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" +
checkpointDir.getPath());
- config.set(CheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofMillis(10));
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.enableCheckpointing(10);
ManualSource<TaskResult> source = new ManualSource<>(env,
TypeInformation.of(TaskResult.class));
source.dataStream().global().sinkTo(new
SinkTest()).name(sinkName).setParallelism(1);