pvary commented on code in PR #15996:
URL: https://github.com/apache/iceberg/pull/15996#discussion_r3280284004


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/EqualityConvertCommitter.java:
##########
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.ContentFileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Commits data files and DVs to the main branch. Receives {@link 
DVMergeResult}s from parallel
+ * {@link EqualityConvertDVResolver} instances (input 1) and an {@link 
EqualityConvertPlanResult}
+ * from the planner (input 2). Assembles the final file lists and commits 
using a {@link RowDelta}
+ * operation once the plan result and done-timestamp watermark have both 
arrived.
+ *
+ * <p>Watermarks are absorbed while a cycle is active.
+ *
+ * <p>Emits a {@link Trigger} after each cycle (commit, no-op, or error) so 
the downstream {@link
+ * TaskResultAggregator} can track task completion. This is the sole source of 
Trigger records for
+ * the Aggregator.
+ *
+ * <p>No-op vs error: a no-op cycle (empty plan result from {@code
+ * EqualityConvertPlanner.emitNoOpResult}) returns early in {@code 
commitIfNeeded} without writing
+ * anything; the Trigger emit in {@code processWatermark} still happens, so 
the maintenance task
+ * completes cleanly. Errors are reported via {@link 
TaskResultAggregator#ERROR_STREAM} side output;
+ * the Aggregator collects them and surfaces failure on its own watermark.
+ *
+ * <p>The committer is intentionally stateless: {@code bufferedResults} and 
{@code planResult} are
+ * not checkpointed. On restart Flink replays the upstream pipeline (planner 
-> reader -> worker ->
+ * DV resolver) from the planner's last-checkpointed position, so both fields 
are rebuilt from the
+ * replayed stream before the cycle's done-timestamp watermark fires. 
Idempotency at commit time is
+ * ensured by the {@link #COMMITTED_STAGING_SNAPSHOT_PROPERTY} check in {@code 
isAlreadyCommitted}:
+ * if the staging snapshot is already on target, the replayed commit is 
skipped.
+ *
+ * <p>On restart the planner re-derives its position from {@link
+ * #COMMITTED_STAGING_SNAPSHOT_PROPERTY} on main and replays any in-process 
cycle.
+ */
+@Internal
+public class EqualityConvertCommitter extends AbstractStreamOperator<Trigger>
+    implements TwoInputStreamOperator<DVMergeResult, 
EqualityConvertPlanResult, Trigger> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(EqualityConvertCommitter.class);
+
+  static final String COMMITTED_STAGING_SNAPSHOT_PROPERTY = 
"equality-convert-staging-snapshot";
+
+  private static final String ADDED_DV_NUM_METRIC = "addedDvNum";
+  private static final String COMMIT_DURATION_MS_METRIC = "commitDurationMs";
+
+  private final String tableName;
+  private final String taskName;
+  private final TableLoader tableLoader;
+  private final String targetBranch;
+  private final boolean stagingOnTargetBranch;
+
+  private transient Table table;
+  private transient List<DVMergeResult> bufferedResults;
+  private transient EqualityConvertPlanResult planResult;
+
+  private transient Counter errorCounter;
+  private transient Counter addedDataFileNumCounter;
+  private transient Counter addedDataFileSizeCounter;
+  private transient Counter addedDvNumCounter;
+  private transient Counter commitDurationMsCounter;
+
+  public EqualityConvertCommitter(
+      String tableName,
+      String taskName,
+      TableLoader tableLoader,
+      String stagingBranch,
+      String targetBranch) {
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.tableLoader = tableLoader;
+    this.targetBranch = targetBranch;
+    this.stagingOnTargetBranch = stagingBranch.equals(targetBranch);
+  }
+
+  @Override
+  public void open() throws Exception {
+    super.open();
+    if (!tableLoader.isOpen()) {
+      tableLoader.open();
+    }
+
+    this.table = tableLoader.loadTable();
+    this.bufferedResults = Lists.newArrayList();
+
+    MetricGroup taskMetricGroup =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, 0);
+    this.errorCounter = 
taskMetricGroup.counter(TableMaintenanceMetrics.ERROR_COUNTER);
+    this.addedDataFileNumCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_NUM_METRIC);
+    this.addedDataFileSizeCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_SIZE_METRIC);
+    this.addedDvNumCounter = taskMetricGroup.counter(ADDED_DV_NUM_METRIC);
+    this.commitDurationMsCounter = 
taskMetricGroup.counter(COMMIT_DURATION_MS_METRIC);
+  }
+
+  @Override
+  public void processElement1(StreamRecord<DVMergeResult> record) {
+    bufferedResults.add(record.getValue());
+  }
+
+  @Override
+  public void processElement2(StreamRecord<EqualityConvertPlanResult> record) {
+    planResult = record.getValue();
+  }
+
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+    if (planResult != null && mark.getTimestamp() >= 
planResult.doneTimestamp()) {
+      try {
+        commitIfNeeded();
+      } catch (Exception e) {
+        LOG.error(
+            "Failed to commit equality convert result for table {} task {}",
+            tableName,
+            taskName,
+            e);
+        output.collect(TaskResultAggregator.ERROR_STREAM, new 
StreamRecord<>(e));
+        errorCounter.inc();
+      }
+
+      // Emit Trigger for the Aggregator (even on error or no-op).
+      output.collect(new 
StreamRecord<>(Trigger.create(planResult.triggerTimestamp(), 0)));
+
+      bufferedResults.clear();
+      planResult = null;
+    }
+
+    // Always forward watermarks to prevent stalling downstream.
+    super.processWatermark(mark);
+  }
+
+  @Override
+  public void close() throws Exception {
+    super.close();
+    tableLoader.close();
+  }
+
+  private void commitIfNeeded() {
+    for (DVMergeResult result : bufferedResults) {
+      if (result.hasError()) {
+        LOG.warn(
+            "Skipping commit for table {} task {}: a DV resolver reported an 
error.",
+            tableName,
+            taskName);
+        return;
+      }
+    }
+
+    // No-op cycle: the planner emitted an empty plan result (see
+    // EqualityConvertPlanner.emitNoOpResult) because the next staging 
snapshot was filtered by
+    // shouldSkip or there was nothing new on staging. processWatermark still 
forwards a Trigger to
+    // TaskResultAggregator, so the maintenance task completes cleanly.
+    if (planResult.noOp()) {
+      return;
+    }
+
+    table.refresh();
+
+    // Idempotency: skip if this staging snapshot was already committed 
(replay after failure).
+    if (planResult.stagingSnapshotId() > 0
+        && isAlreadyCommitted(table, targetBranch, 
planResult.stagingSnapshotId())) {
+      LOG.info(
+          "Staging snapshot {} already committed to branch '{}', skipping.",
+          planResult.stagingSnapshotId(),
+          targetBranch);
+      return;
+    }
+
+    List<DeleteFile> allDvFiles = Lists.newArrayList();
+    List<DeleteFile> allRewrittenDvFiles = Lists.newArrayList();
+    for (DVMergeResult result : bufferedResults) {
+      allDvFiles.addAll(result.dvFiles());
+      allRewrittenDvFiles.addAll(result.rewrittenDvFiles());
+    }
+
+    RowDelta rowDelta = buildRowDelta(planResult.dataFiles(), allDvFiles, 
allRewrittenDvFiles);
+
+    long startNano = System.nanoTime();
+    rowDelta.commit();
+    long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startNano);
+    commitDurationMsCounter.inc(durationMs);
+
+    LOG.info(
+        "Committed {} data files and {} DV files to branch '{}' for table {} 
in {} ms. "
+            + "Processed staging snapshot {}.",
+        planResult.dataFiles().size(),
+        allDvFiles.size(),
+        targetBranch,
+        tableName,
+        durationMs,
+        planResult.stagingSnapshotId());
+
+    // Only count files actually added by this commit. When sameBranch, the 
writer already
+    // committed the data files to target and buildRowDelta does not re-add 
them.
+    if (!stagingOnTargetBranch) {
+      for (DataFile dataFile : planResult.dataFiles()) {
+        addedDataFileNumCounter.inc();
+        addedDataFileSizeCounter.inc(dataFile.fileSizeInBytes());
+      }
+    }
+
+    addedDvNumCounter.inc(allDvFiles.size());
+  }
+
+  private RowDelta buildRowDelta(
+      List<DataFile> dataFiles, List<DeleteFile> allDvFiles, List<DeleteFile> 
allRewrittenDvFiles) {
+    RowDelta rowDelta = table.newRowDelta();
+
+    // Fail the commit on external target-branch activity since the planner's 
snapshot. The next
+    // trigger detects the change and reindexes; replay is guarded by 
isAlreadyCommitted.
+
+    if (planResult.mainSnapshotId() != null) {
+      rowDelta.validateFromSnapshot(planResult.mainSnapshotId());
+    }
+
+    rowDelta.validateNoConflictingDataFiles();
+    rowDelta.validateNoConflictingDeleteFiles();
+
+    Set<String> referencedDataFiles = Sets.newHashSet();
+    for (DeleteFile dvFile : allDvFiles) {
+      if (dvFile.referencedDataFile() != null) {
+        referencedDataFiles.add(dvFile.referencedDataFile());
+      }
+    }
+
+    if (!referencedDataFiles.isEmpty()) {
+      rowDelta.validateDataFilesExist(referencedDataFiles);
+    }
+
+    // When stagingBranch == targetBranch, the writer already committed data 
files and DVs to the
+    // target branch. Re-adding them here would produce duplicate manifest 
entries. Skip those
+    // paths; only the new DVs from the resolver need to be added.
+    if (!stagingOnTargetBranch) {
+      for (DataFile dataFile : dataFiles) {
+        rowDelta.addRows(dataFile);
+      }
+    }
+
+    for (DeleteFile dvFile : allDvFiles) {
+      rowDelta.addDeletes(dvFile);
+    }
+
+    if (!stagingOnTargetBranch) {
+      addStagingDeletes(rowDelta, allDvFiles, planResult.stagingDVFiles());
+    }
+
+    removeRewrittenDVs(rowDelta, allRewrittenDvFiles, 
planResult.stagingDVFiles());
+
+    rowDelta.toBranch(targetBranch);
+    rowDelta.set(
+        COMMITTED_STAGING_SNAPSHOT_PROPERTY, 
String.valueOf(planResult.stagingSnapshotId()));
+    return rowDelta;
+  }
+
+  /** Adds staging delete files, skipping DVs that overlap with conversion DVs 
(V3 rule). */
+  private static void addStagingDeletes(
+      RowDelta rowDelta, List<DeleteFile> allDvFiles, List<DeleteFile> 
stagingDVFiles) {
+    Set<String> dvCoveredDataFiles = Sets.newHashSet();
+    for (DeleteFile dvFile : allDvFiles) {
+      if (dvFile.referencedDataFile() != null) {
+        dvCoveredDataFiles.add(dvFile.referencedDataFile());
+      }
+    }
+
+    for (DeleteFile stagingDelete : stagingDVFiles) {
+      // V3 allows one DV per data file. When a staging snapshot contains both 
a writer-committed
+      // DV and an eq-delete that resolves to additional positions in the same 
data file, the
+      // resolver folds the staging DV into a new merged DV (via
+      // EqualityConvertDVResolver.collectExistingDVs). Skip the superseded 
staging DV; adding both
+      // would commit two DVs for the same data file.
+      if (ContentFileUtil.isDV(stagingDelete)
+          && stagingDelete.referencedDataFile() != null
+          && dvCoveredDataFiles.contains(stagingDelete.referencedDataFile())) {
+        continue;
+      }
+
+      rowDelta.addDeletes(stagingDelete);
+    }
+  }
+
+  /** Removes rewritten DVs, filtering out staging DVs that don't exist on the 
target branch. */
+  private static void removeRewrittenDVs(
+      RowDelta rowDelta, List<DeleteFile> allRewrittenDvFiles, 
List<DeleteFile> stagingDVFiles) {
+    Set<String> stagingDeleteLocations = Sets.newHashSet();
+    for (DeleteFile sd : stagingDVFiles) {
+      stagingDeleteLocations.add(sd.location());
+    }
+
+    for (DeleteFile rewrittenDv : allRewrittenDvFiles) {
+      if (!stagingDeleteLocations.contains(rewrittenDv.location())) {
+        rowDelta.removeDeletes(rewrittenDv);
+      }
+    }
+  }
+
+  private static boolean isAlreadyCommitted(
+      Table table, String targetBranch, long stagingSnapshotId) {
+    Snapshot current = table.snapshot(targetBranch);
+    while (current != null) {
+      String prop = current.summary().get(COMMITTED_STAGING_SNAPSHOT_PROPERTY);
+      if (prop != null) {
+        return Long.parseLong(prop) == stagingSnapshotId;

Review Comment:
   If we accidentally restart from an older state this could cause duplicated 
commits.
   Should we use `>=`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to