This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new ab0594bf71 Flink: Port #10484 to v1.19 (#11010) (#11117)
ab0594bf71 is described below

commit ab0594bf71a6884ee0e196470bfe4b4d3baa58b9
Author: pvary <[email protected]>
AuthorDate: Thu Sep 12 16:56:17 2024 +0300

    Flink: Port #10484 to v1.19 (#11010) (#11117)
---
 .../maintenance/operator/JdbcLockFactory.java      |   4 +
 .../flink/maintenance/operator/LockRemover.java    | 144 +++++++++++++++++++++
 .../operator/TableMaintenanceMetrics.java          |   5 +
 .../flink/maintenance/operator/TaskResult.java     |  65 ++++++++++
 .../flink/maintenance/operator/TriggerManager.java |  14 +-
 .../flink/maintenance/operator/ManualSource.java   |  16 ++-
 .../maintenance/operator/TestLockRemover.java      |   3 +-
 .../maintenance/operator/TestLockRemover.java      |   3 +-
 8 files changed, 238 insertions(+), 16 deletions(-)

diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java
index f22be33aea..085fbfecd2 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java
@@ -38,6 +38,10 @@ import org.apache.iceberg.util.PropertyUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * JDBC table backed implementation of the {@link
+ * org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory}.
+ */
 public class JdbcLockFactory implements TriggerLockFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(JdbcLockFactory.class);
 
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemover.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemover.java
new file mode 100644
index 0000000000..3c3761ef2f
--- /dev/null
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemover.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages locks and collect {@link org.apache.flink.metrics.Metric} for the 
Maintenance Tasks.
+ *
+ * <p>The assumptions about the locks are the following:
+ *
+ * <ul>
+ *   <li>Every {@link TaskResult} is followed by a {@link Watermark} for 
normal {@link Trigger}s
+ *   <li>For the {@link Trigger#recovery(long)} {@link Watermark} there is no 
element to process
+ * </ul>
+ *
+ * When processing the inputs there are 3 possibilities:
+ *
+ * <ul>
+ *   <li>Normal execution - we receive a {@link TaskResult} and then a {@link 
Watermark} - unlocking
+ *       the lock is handled by the {@link #processElement(StreamRecord)}
+ *   <li>Recovery without ongoing execution (unlocking the recoveryLock) - we 
receive the {@link
+ *       Trigger#recovery(long)} {@link Watermark} without any {@link 
TaskResult} - unlocking the
+ *       {@link TriggerLockFactory#createRecoveryLock()} and a possible {@link
+ *       TriggerLockFactory#createLock()} is handled by the {@link 
#processWatermark(Watermark)}
+ *       (the {@link #lastProcessedTaskStartEpoch} is 0 in this case)
+ *   <li>Recovery with an ongoing execution - we receive a {@link TaskResult} 
and then a {@link
+ *       Watermark} - unlocking the {@link TriggerLockFactory#createLock()} is 
handled by the {@link
+ *       #processElement(StreamRecord)}, unlocking the {@link
+ *       TriggerLockFactory#createRecoveryLock()} is handled by the {@link
+ *       #processWatermark(Watermark)} (the {@link 
#lastProcessedTaskStartEpoch} is the start time
+ *       of the old task)
+ * </ul>
+ */
+@Internal
+public class LockRemover extends AbstractStreamOperator<Void>
+    implements OneInputStreamOperator<TaskResult, Void> {
+  private static final Logger LOG = LoggerFactory.getLogger(LockRemover.class);
+
+  private final TriggerLockFactory lockFactory;
+  private final List<String> maintenanceTaskNames;
+
+  private transient List<Counter> succeededTaskResultCounters;
+  private transient List<Counter> failedTaskResultCounters;
+  private transient List<AtomicLong> taskLastRunDurationMs;
+  private transient TriggerLockFactory.Lock lock;
+  private transient TriggerLockFactory.Lock recoveryLock;
+  private transient long lastProcessedTaskStartEpoch = 0L;
+
+  public LockRemover(TriggerLockFactory lockFactory, List<String> 
maintenanceTaskNames) {
+    Preconditions.checkNotNull(lockFactory, "Lock factory should no be null");
+    Preconditions.checkArgument(
+        maintenanceTaskNames != null && !maintenanceTaskNames.isEmpty(),
+        "Invalid maintenance task names: null or empty");
+
+    this.lockFactory = lockFactory;
+    this.maintenanceTaskNames = maintenanceTaskNames;
+  }
+
+  @Override
+  public void open() throws Exception {
+    super.open();
+    this.succeededTaskResultCounters =
+        Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size());
+    this.failedTaskResultCounters = 
Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size());
+    this.taskLastRunDurationMs = 
Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size());
+    for (String name : maintenanceTaskNames) {
+      succeededTaskResultCounters.add(
+          getRuntimeContext()
+              .getMetricGroup()
+              .addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
+              .counter(TableMaintenanceMetrics.SUCCEEDED_TASK_COUNTER));
+      failedTaskResultCounters.add(
+          getRuntimeContext()
+              .getMetricGroup()
+              .addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
+              .counter(TableMaintenanceMetrics.FAILED_TASK_COUNTER));
+      AtomicLong duration = new AtomicLong(0);
+      taskLastRunDurationMs.add(duration);
+      getRuntimeContext()
+          .getMetricGroup()
+          .addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
+          .gauge(TableMaintenanceMetrics.LAST_RUN_DURATION_MS, duration::get);
+    }
+
+    this.lock = lockFactory.createLock();
+    this.recoveryLock = lockFactory.createRecoveryLock();
+  }
+
+  @Override
+  public void processElement(StreamRecord<TaskResult> streamRecord) {
+    TaskResult taskResult = streamRecord.getValue();
+    LOG.info(
+        "Processing result {} for task {}",
+        taskResult,
+        maintenanceTaskNames.get(taskResult.taskIndex()));
+    long duration = System.currentTimeMillis() - taskResult.startEpoch();
+    lock.unlock();
+    this.lastProcessedTaskStartEpoch = taskResult.startEpoch();
+
+    // Update the metrics
+    taskLastRunDurationMs.get(taskResult.taskIndex()).set(duration);
+    if (taskResult.success()) {
+      succeededTaskResultCounters.get(taskResult.taskIndex()).inc();
+    } else {
+      failedTaskResultCounters.get(taskResult.taskIndex()).inc();
+    }
+  }
+
+  @Override
+  public void processWatermark(Watermark mark) {
+    if (mark.getTimestamp() > lastProcessedTaskStartEpoch) {
+      lock.unlock();
+      recoveryLock.unlock();
+    }
+  }
+}
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java
index ec0fd920c3..1a04461aed 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java
@@ -28,6 +28,11 @@ public class TableMaintenanceMetrics {
   public static final String TRIGGERED = "triggered";
   public static final String NOTHING_TO_TRIGGER = "nothingToTrigger";
 
+  // LockRemover metrics
+  public static final String SUCCEEDED_TASK_COUNTER = "succeededTasks";
+  public static final String FAILED_TASK_COUNTER = "failedTasks";
+  public static final String LAST_RUN_DURATION_MS = "lastRunDurationMs";
+
   private TableMaintenanceMetrics() {
     // do not instantiate
   }
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResult.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResult.java
new file mode 100644
index 0000000000..06f10f1c1d
--- /dev/null
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResult.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.List;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+/** The result of a single Maintenance Task. */
+@Internal
+public class TaskResult {
+  private final int taskIndex;
+  private final long startEpoch;
+  private final boolean success;
+  private final List<Exception> exceptions;
+
+  public TaskResult(int taskIndex, long startEpoch, boolean success, 
List<Exception> exceptions) {
+    this.taskIndex = taskIndex;
+    this.startEpoch = startEpoch;
+    this.success = success;
+    this.exceptions = exceptions;
+  }
+
+  public int taskIndex() {
+    return taskIndex;
+  }
+
+  public long startEpoch() {
+    return startEpoch;
+  }
+
+  public boolean success() {
+    return success;
+  }
+
+  public List<Exception> exceptions() {
+    return exceptions;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("taskIndex", taskIndex)
+        .add("startEpoch", startEpoch)
+        .add("success", success)
+        .add("exceptions", exceptions)
+        .toString();
+  }
+}
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
index f4c3c1d47c..dc95b27af0 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
@@ -63,7 +63,7 @@ class TriggerManager extends KeyedProcessFunction<Boolean, 
TableChange, Trigger>
 
   private final TableLoader tableLoader;
   private final TriggerLockFactory lockFactory;
-  private final List<String> taskNames;
+  private final List<String> maintenanceTaskNames;
   private final List<TriggerEvaluator> evaluators;
   private final long minFireDelayMs;
   private final long lockCheckDelayMs;
@@ -92,25 +92,27 @@ class TriggerManager extends KeyedProcessFunction<Boolean, 
TableChange, Trigger>
   TriggerManager(
       TableLoader tableLoader,
       TriggerLockFactory lockFactory,
-      List<String> taskNames,
+      List<String> maintenanceTaskNames,
       List<TriggerEvaluator> evaluators,
       long minFireDelayMs,
       long lockCheckDelayMs) {
     Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
     Preconditions.checkNotNull(lockFactory, "Lock factory should no be null");
     Preconditions.checkArgument(
-        taskNames != null && !taskNames.isEmpty(), "Invalid task names: null 
or empty");
+        maintenanceTaskNames != null && !maintenanceTaskNames.isEmpty(),
+        "Invalid maintenance task names: null or empty");
     Preconditions.checkArgument(
         evaluators != null && !evaluators.isEmpty(), "Invalid evaluators: null 
or empty");
     Preconditions.checkArgument(
-        taskNames.size() == evaluators.size(), "Provide a name and evaluator 
for all of the tasks");
+        maintenanceTaskNames.size() == evaluators.size(),
+        "Provide a name and evaluator for all of the maintenance tasks");
     Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should 
be at least 1.");
     Preconditions.checkArgument(
         lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1 
ms.");
 
     this.tableLoader = tableLoader;
     this.lockFactory = lockFactory;
-    this.taskNames = taskNames;
+    this.maintenanceTaskNames = maintenanceTaskNames;
     this.evaluators = evaluators;
     this.minFireDelayMs = minFireDelayMs;
     this.lockCheckDelayMs = lockCheckDelayMs;
@@ -137,7 +139,7 @@ class TriggerManager extends KeyedProcessFunction<Boolean, 
TableChange, Trigger>
                 TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
             .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER);
     this.triggerCounters =
-        taskNames.stream()
+        maintenanceTaskNames.stream()
             .map(
                 name ->
                     getRuntimeContext()
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java
index 38bb9c393f..679b3ec508 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java
@@ -22,6 +22,7 @@ import java.util.ArrayDeque;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
@@ -156,23 +157,26 @@ class ManualSource<T>
 
   @Override
   public SourceReader<T, DummySplit> createReader(SourceReaderContext 
sourceReaderContext) {
-    return new SourceReader<T, DummySplit>() {
+    return new SourceReader<>() {
       @Override
       public void start() {
         // Do nothing
       }
 
+      @SuppressWarnings("unchecked")
       @Override
       public InputStatus pollNext(ReaderOutput<T> output) {
         Tuple2<T, Long> next = (Tuple2<T, Long>) QUEUES.get(index).poll();
 
         if (next != null) {
           if (next.f0 == null) {
-            // No more input
-            return InputStatus.END_OF_INPUT;
-          }
-
-          if (next.f1 == null) {
+            if (next.f1 == null) {
+              // No more input
+              return InputStatus.END_OF_INPUT;
+            } else {
+              output.emitWatermark(new Watermark(next.f1));
+            }
+          } else if (next.f1 == null) {
             // No event time set
             output.collect(next.f0);
           } else {
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java
similarity index 99%
copy from 
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java
copy to 
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java
index ccb90ec33d..cffcc4eb04 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java
@@ -24,7 +24,6 @@ import static 
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetr
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.File;
-import java.time.Duration;
 import java.util.Collection;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -100,8 +99,8 @@ class TestLockRemover extends OperatorTestBase {
     Configuration config = new Configuration();
     config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
     config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" + 
checkpointDir.getPath());
-    config.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, 
Duration.ofMillis(10));
     StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
+    env.enableCheckpointing(10);
     ManualSource<TaskResult> source = new ManualSource<>(env, 
TypeInformation.of(TaskResult.class));
     source.dataStream().global().sinkTo(new 
SinkTest()).name(sinkName).setParallelism(1);
 
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java
index ccb90ec33d..cffcc4eb04 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java
@@ -24,7 +24,6 @@ import static 
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetr
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.File;
-import java.time.Duration;
 import java.util.Collection;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -100,8 +99,8 @@ class TestLockRemover extends OperatorTestBase {
     Configuration config = new Configuration();
     config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
     config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" + 
checkpointDir.getPath());
-    config.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, 
Duration.ofMillis(10));
     StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
+    env.enableCheckpointing(10);
     ManualSource<TaskResult> source = new ManualSource<>(env, 
TypeInformation.of(TaskResult.class));
     source.dataStream().global().sinkTo(new 
SinkTest()).name(sinkName).setParallelism(1);
 

Reply via email to