mxm commented on code in PR #15996:
URL: https://github.com/apache/iceberg/pull/15996#discussion_r3225168960


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/EqualityConvertPlanner.java:
##########
@@ -0,0 +1,595 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotChanges;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.ContentFileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Planner for the equality delete conversion pipeline. For each trigger, it 
picks the oldest
+ * staging snapshot that hasn't been converted yet and emits {@link 
ReadCommand}s describing the
+ * files its downstream readers and workers must process.
+ *
+ * <p>Emissions are grouped into phases separated by watermarks at the 
boundaries that gate the
+ * worker's keyed state:
+ *
+ * <ol>
+ *   <li>Main data — read once per new equality-field-set to create the 
worker's key index.
+ *   <li>Equality + positional deletes — eq deletes are resolved against the 
index in the worker;
+ *       pos deletes bypass the worker via a side output and are part of the 
same DV merge.
+ *   <li>New staging data — added to the index for the next cycle.
+ * </ol>
+ *
+ * The planner also detects external commits on main (not produced by the 
converter itself) and
+ * re-emits main data so the worker can rebuild its index before resolving the 
current cycle's
+ * equality deletes.
+ *
+ * <p>An {@link EqualityConvertPlanResult} with the current cycle's metadata 
is emitted via the
+ * {@link #METADATA_STREAM} side output after the read commands.
+ */
+@Internal
+public class EqualityConvertPlanner extends AbstractStreamOperator<ReadCommand>
+    implements OneInputStreamOperator<Trigger, ReadCommand> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(EqualityConvertPlanner.class);
+
+  public static final OutputTag<EqualityConvertPlanResult> METADATA_STREAM =
+      new OutputTag<>("metadata-stream") {};
+
+  private static final String PROCESSED_EQ_DELETE_FILE_NUM_METRIC = 
"processedEqDeleteFileNum";
+  private static final String PROCESSED_STAGING_SNAPSHOT_NUM_METRIC = 
"processedStagingSnapshotNum";
+  private static final String SKIPPED_NO_OP_CYCLES_METRIC = 
"skippedNoOpCycles";
+  private static final String REINDEX_COUNT_METRIC = "reindexCount";
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+  private final TableLoader tableLoader;
+  private final String stagingBranch;
+  private final String targetBranch;
+
+  // Main snapshot id the worker's index reflects.
+  private transient ListState<Long> indexSnapshotState;
+  // Equality field sets for which main data has been emitted.
+  private transient ListState<List<Integer>> mainIndexEmittedState;
+
+  private transient Table table;
+
+  private transient Long lastMainSnapshotId;
+  private transient Long lastStagingSnapshotId;
+  private transient Long indexSnapshotId;
+
+  // Sorted equality-field-id lists for which main data has already been 
emitted to create the
+  // worker index. One entry per distinct key schema. Checkpointed so 
first-time emission survives
+  // restarts.
+  private transient Set<List<Integer>> mainIndexEmittedSet;
+
+  private transient long nextPhaseTs;
+
+  private transient Counter processedEqDeleteFileNumCounter;
+  private transient Counter processedStagingSnapshotNumCounter;
+  private transient Counter skippedNoOpCyclesCounter;
+  private transient Counter reindexCounter;
+
+  public EqualityConvertPlanner(
+      String tableName,
+      String taskName,
+      int taskIndex,
+      TableLoader tableLoader,
+      String stagingBranch,
+      String targetBranch) {
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+    this.tableLoader = tableLoader;
+    this.stagingBranch = stagingBranch;
+    this.targetBranch = targetBranch;
+  }
+
+  @Override
+  public void open() throws Exception {
+    super.open();
+    if (!tableLoader.isOpen()) {
+      tableLoader.open();
+    }
+
+    table = tableLoader.loadTable();
+
+    MetricGroup taskMetricGroup =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex);
+    this.processedEqDeleteFileNumCounter =
+        taskMetricGroup.counter(PROCESSED_EQ_DELETE_FILE_NUM_METRIC);
+    this.processedStagingSnapshotNumCounter =
+        taskMetricGroup.counter(PROCESSED_STAGING_SNAPSHOT_NUM_METRIC);
+    this.skippedNoOpCyclesCounter = 
taskMetricGroup.counter(SKIPPED_NO_OP_CYCLES_METRIC);
+    this.reindexCounter = taskMetricGroup.counter(REINDEX_COUNT_METRIC);
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
+    mainIndexEmittedState =
+        context
+            .getOperatorStateStore()
+            .getListState(new ListStateDescriptor<>("mainIndexEmitted", 
Types.LIST(Types.INT)));
+    indexSnapshotState =
+        context
+            .getOperatorStateStore()
+            .getListState(new ListStateDescriptor<>("indexSnapshotId", 
Types.LONG));
+
+    indexSnapshotId = getValue(indexSnapshotState);
+
+    mainIndexEmittedSet = Sets.newHashSet();
+    for (List<Integer> fieldSet : mainIndexEmittedState.get()) {
+      mainIndexEmittedSet.add(fieldSet);
+    }
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    super.snapshotState(context);
+    storeValue(indexSnapshotState, indexSnapshotId);
+
+    mainIndexEmittedState.clear();
+    for (List<Integer> fieldSet : mainIndexEmittedSet) {
+      mainIndexEmittedState.add(Lists.newArrayList(fieldSet));
+    }
+  }
+
+  @Override
+  public void processElement(StreamRecord<Trigger> element) throws Exception {
+    long triggerTs = element.getTimestamp();
+    nextPhaseTs = Math.max(triggerTs, nextPhaseTs + 1);
+
+    try {
+      table.refresh();
+      Snapshot mainSnapshot = table.snapshot(targetBranch);
+      Long currentMainSnapshotId = mainSnapshot != null ? 
mainSnapshot.snapshotId() : null;
+
+      updateState(mainSnapshot);
+
+      Snapshot stagingSnapshot = table.snapshot(stagingBranch);
+      if (stagingSnapshot == null) {
+        LOG.info("No snapshot on staging branch '{}', nothing to convert.", 
stagingBranch);
+        emitNoOpResult(triggerTs, currentMainSnapshotId);
+        return;
+      }
+
+      Snapshot nextSnapshot = findNextSnapshot(stagingSnapshot);
+      if (nextSnapshot == null) {
+        LOG.info("No new staging snapshots to process on branch '{}'.", 
stagingBranch);
+        emitNoOpResult(triggerTs, currentMainSnapshotId);
+        return;
+      }
+
+      emitReadCommandsForSnapshot(nextSnapshot, triggerTs, 
currentMainSnapshotId);
+    } catch (Exception e) {
+      LOG.error(
+          "Error processing equality deletes for table {} task {}[{}]",
+          tableName,
+          taskName,
+          taskIndex,
+          e);
+      output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
+      Snapshot mainSnapshot = table.snapshot(targetBranch);
+      Long currentMainSnapshotId = mainSnapshot != null ? 
mainSnapshot.snapshotId() : null;
+      emitNoOpResult(triggerTs, currentMainSnapshotId);
+    }
+  }
+
+  private void updateState(Snapshot mainSnapshot) {
+    Long currentMainSnapshotId = mainSnapshot != null ? 
mainSnapshot.snapshotId() : null;
+
+    int externalCount = 0;
+    Snapshot current = mainSnapshot;
+    while (current != null) {
+      String prop =
+          
current.summary().get(EqualityConvertCommitter.COMMITTED_STAGING_SNAPSHOT_PROPERTY);
+      if (prop != null) {
+        // First property found going back from main head is our most recent 
commit.
+        lastStagingSnapshotId = Long.parseLong(prop);
+        break;
+      }
+
+      externalCount++;
+      current = current.parentId() != null ? 
table.snapshot(current.parentId()) : null;
+    }
+
+    boolean mainChanged =
+        lastMainSnapshotId != null && 
!lastMainSnapshotId.equals(currentMainSnapshotId);
+    if (mainChanged && externalCount > 0 && !mainIndexEmittedSet.isEmpty()) {
+      LOG.info(
+          "Detected {} external commit(s) on branch '{}' since previous cycle, 
reindexing.",
+          externalCount,
+          targetBranch);
+      indexSnapshotId = currentMainSnapshotId;
+      // Full reindex: re-emits all data files from the current main snapshot 
for every equality
+      // field set that was previously indexed. This is triggered when an 
external commit (e.g.
+      // compaction) changes the main branch and the existing index may 
reference stale file paths.
+      emitReindexFromMain(mainSnapshot);
+      reindexCounter.inc();
+    } else if (mainIndexEmittedSet.isEmpty()) {
+      indexSnapshotId = currentMainSnapshotId;
+    }
+
+    lastMainSnapshotId = currentMainSnapshotId;
+  }
+
+  private void emitReadCommandsForSnapshot(
+      Snapshot stagingSnapshot, long triggerTs, Long currentMainSnapshotId) {
+    List<DataFile> newDataFiles = Lists.newArrayList();
+    List<DeleteFile> stagingDVFiles = Lists.newArrayList();
+    List<DeleteFile> posDeleteFiles = Lists.newArrayList();
+    Map<List<Integer>, List<DeleteFile>> deletesByFieldIds = Maps.newHashMap();
+    SnapshotChanges changes = 
SnapshotChanges.builderFor(table).snapshot(stagingSnapshot).build();
+
+    // We do not support staging snapshots that remove data files: compaction 
/ overwrite rewrites
+    // on the staging branch would require us to rewrite the corresponding DVs 
against the new data
+    // files on target, which isn't implemented here. Fail fast so the 
operator doesn't silently
+    // drop work.
+    if (changes.removedDataFiles().iterator().hasNext()) {
+      throw new IllegalStateException(
+          String.format(
+              "Staging snapshot %s on branch '%s' removes data files; "
+                  + "equality delete conversion does not support rewrites on 
the staging branch. "
+                  + "Run compaction on the target branch instead.",
+              stagingSnapshot.snapshotId(), stagingBranch));
+    }
+
+    // SnapshotChanges' iterators are backed by manifest readers which reuse 
the same DataFile /
+    // DeleteFile instance across iterations, so we have to copy before 
buffering.
+    for (DataFile dataFile : changes.addedDataFiles()) {
+      newDataFiles.add(dataFile.copy());
+    }
+
+    for (DeleteFile deleteFile : changes.addedDeleteFiles()) {
+      if (deleteFile.content() == FileContent.EQUALITY_DELETES) {
+        deletesByFieldIds
+            .computeIfAbsent(deleteFile.equalityFieldIds(), k -> 
Lists.newArrayList())
+            .add(deleteFile.copy());
+      } else if (ContentFileUtil.isDV(deleteFile)) {
+        stagingDVFiles.add(deleteFile.copy());
+      } else {
+        // V2 positional delete files can still appear on the staging branch 
(e.g. from V2 writers
+        // or legacy commits). The converter does not produce new positional 
deletes — it reads
+        // these files and feeds their (path, pos) pairs into the same DV 
merge as the eq-delete
+        // resolutions so the absorbed positions land in the target branch as 
DVs.
+        posDeleteFiles.add(deleteFile.copy());
+      }
+    }
+
+    boolean hasEqDeletes = !deletesByFieldIds.isEmpty();
+
+    if (newDataFiles.isEmpty()
+        && !hasEqDeletes
+        && stagingDVFiles.isEmpty()
+        && posDeleteFiles.isEmpty()) {
+      LOG.info("No new files on staging branch '{}' to convert.", 
stagingBranch);
+      emitNoOpResult(triggerTs, currentMainSnapshotId);
+      return;
+    }
+
+    // Emit read commands in phases separated by watermarks. The watermarks 
are required to ensure
+    // the worker task processes each phase separately:
+    //   Phase 0: main data files (index refresh)
+    //   Phase 1: equality delete files  (resolution against the index)
+    //          + positional deletes files (converted directly to DVPositions, 
bypass worker)
+    //   Phase 2: staging data files (index update for next cycle)
+    emitMainDataPhase(deletesByFieldIds);
+    emitDeletePhase(deletesByFieldIds, posDeleteFiles);
+    emitSnapshotDataPhase(stagingSnapshot, newDataFiles, mainIndexEmittedSet);
+
+    LOG.info(
+        "Emitted read commands for {} new data files from staging branch 
'{}'.",
+        newDataFiles.size(),
+        stagingBranch);
+
+    processedStagingSnapshotNumCounter.inc();
+
+    output.collect(
+        METADATA_STREAM,
+        new StreamRecord<>(
+            new EqualityConvertPlanResult(
+                newDataFiles,
+                stagingDVFiles,
+                stagingSnapshot.snapshotId(),
+                currentMainSnapshotId,
+                triggerTs,
+                nextPhaseTs)));
+
+    output.emitWatermark(new Watermark(nextPhaseTs));
+  }
+
+  private void emitMainDataPhase(Map<List<Integer>, List<DeleteFile>> 
deletesByFieldIds) {
+    Snapshot mainSnapshot = table.snapshot(targetBranch);
+
+    for (Map.Entry<List<Integer>, List<DeleteFile>> entry : 
deletesByFieldIds.entrySet()) {
+      List<Integer> fieldIds = entry.getKey();
+      List<Integer> sortedFieldIds = Lists.newArrayList(fieldIds);
+      Collections.sort(sortedFieldIds);
+
+      if (!mainIndexEmittedSet.contains(sortedFieldIds)) {
+        emitMainDataReadCommands(mainSnapshot, fieldIds, nextPhaseTs);
+        mainIndexEmittedSet.add(sortedFieldIds);
+      }
+    }
+
+    advancePhase();
+  }
+
+  private void emitDeletePhase(
+      Map<List<Integer>, List<DeleteFile>> deletesByFieldIds, List<DeleteFile> 
posDeleteFiles) {
+    for (Map.Entry<List<Integer>, List<DeleteFile>> entry : 
deletesByFieldIds.entrySet()) {
+      List<Integer> fieldIds = entry.getKey();
+      for (DeleteFile deleteFile : entry.getValue()) {
+        PartitionSpec spec = table.specs().get(deleteFile.specId());
+        output.collect(
+            new StreamRecord<>(
+                ReadCommand.eqDeleteFile(deleteFile, spec, fieldIds, 
indexSnapshotId),
+                nextPhaseTs));
+        processedEqDeleteFileNumCounter.inc();
+      }
+    }
+
+    for (DeleteFile deleteFile : posDeleteFiles) {

Review Comment:
   We now reject V2 positional deletes (if any).



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/EqualityConvertPlanner.java:
##########
@@ -0,0 +1,595 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotChanges;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.ContentFileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Planner for the equality delete conversion pipeline. For each trigger, it 
picks the oldest
+ * staging snapshot that hasn't been converted yet and emits {@link 
ReadCommand}s describing the
+ * files its downstream readers and workers must process.
+ *
+ * <p>Emissions are grouped into phases separated by watermarks at the 
boundaries that gate the
+ * worker's keyed state:
+ *
+ * <ol>
+ *   <li>Main data — read once per new equality-field-set to create the 
worker's key index.
+ *   <li>Equality + positional deletes — eq deletes are resolved against the 
index in the worker;
+ *       pos deletes bypass the worker via a side output and are part of the 
same DV merge.
+ *   <li>New staging data — added to the index for the next cycle.
+ * </ol>
+ *
+ * The planner also detects external commits on main (not produced by the 
converter itself) and
+ * re-emits main data so the worker can rebuild its index before resolving the 
current cycle's
+ * equality deletes.
+ *
+ * <p>An {@link EqualityConvertPlanResult} with the current cycle's metadata 
is emitted via the
+ * {@link #METADATA_STREAM} side output after the read commands.
+ */
+@Internal
+public class EqualityConvertPlanner extends AbstractStreamOperator<ReadCommand>
+    implements OneInputStreamOperator<Trigger, ReadCommand> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(EqualityConvertPlanner.class);
+
+  public static final OutputTag<EqualityConvertPlanResult> METADATA_STREAM =
+      new OutputTag<>("metadata-stream") {};
+
+  private static final String PROCESSED_EQ_DELETE_FILE_NUM_METRIC = 
"processedEqDeleteFileNum";
+  private static final String PROCESSED_STAGING_SNAPSHOT_NUM_METRIC = 
"processedStagingSnapshotNum";
+  private static final String SKIPPED_NO_OP_CYCLES_METRIC = 
"skippedNoOpCycles";
+  private static final String REINDEX_COUNT_METRIC = "reindexCount";
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+  private final TableLoader tableLoader;
+  private final String stagingBranch;
+  private final String targetBranch;
+
+  // Main snapshot id the worker's index reflects.
+  private transient ListState<Long> indexSnapshotState;
+  // Equality field sets for which main data has been emitted.
+  private transient ListState<List<Integer>> mainIndexEmittedState;
+
+  private transient Table table;
+
+  private transient Long lastMainSnapshotId;
+  private transient Long lastStagingSnapshotId;
+  private transient Long indexSnapshotId;
+
+  // Sorted equality-field-id lists for which main data has already been 
emitted to create the
+  // worker index. One entry per distinct key schema. Checkpointed so 
first-time emission survives
+  // restarts.
+  private transient Set<List<Integer>> mainIndexEmittedSet;
+
+  private transient long nextPhaseTs;
+
+  private transient Counter processedEqDeleteFileNumCounter;
+  private transient Counter processedStagingSnapshotNumCounter;
+  private transient Counter skippedNoOpCyclesCounter;
+  private transient Counter reindexCounter;
+
+  public EqualityConvertPlanner(
+      String tableName,
+      String taskName,
+      int taskIndex,
+      TableLoader tableLoader,
+      String stagingBranch,
+      String targetBranch) {
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+    this.tableLoader = tableLoader;
+    this.stagingBranch = stagingBranch;
+    this.targetBranch = targetBranch;
+  }
+
+  @Override
+  public void open() throws Exception {
+    super.open();
+    if (!tableLoader.isOpen()) {
+      tableLoader.open();
+    }
+
+    table = tableLoader.loadTable();
+
+    MetricGroup taskMetricGroup =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex);
+    this.processedEqDeleteFileNumCounter =
+        taskMetricGroup.counter(PROCESSED_EQ_DELETE_FILE_NUM_METRIC);
+    this.processedStagingSnapshotNumCounter =
+        taskMetricGroup.counter(PROCESSED_STAGING_SNAPSHOT_NUM_METRIC);
+    this.skippedNoOpCyclesCounter = 
taskMetricGroup.counter(SKIPPED_NO_OP_CYCLES_METRIC);
+    this.reindexCounter = taskMetricGroup.counter(REINDEX_COUNT_METRIC);
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
+    mainIndexEmittedState =
+        context
+            .getOperatorStateStore()
+            .getListState(new ListStateDescriptor<>("mainIndexEmitted", 
Types.LIST(Types.INT)));
+    indexSnapshotState =
+        context
+            .getOperatorStateStore()
+            .getListState(new ListStateDescriptor<>("indexSnapshotId", 
Types.LONG));
+
+    indexSnapshotId = getValue(indexSnapshotState);
+
+    mainIndexEmittedSet = Sets.newHashSet();
+    for (List<Integer> fieldSet : mainIndexEmittedState.get()) {
+      mainIndexEmittedSet.add(fieldSet);
+    }
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    super.snapshotState(context);
+    storeValue(indexSnapshotState, indexSnapshotId);
+
+    mainIndexEmittedState.clear();
+    for (List<Integer> fieldSet : mainIndexEmittedSet) {
+      mainIndexEmittedState.add(Lists.newArrayList(fieldSet));
+    }
+  }
+
+  @Override
+  public void processElement(StreamRecord<Trigger> element) throws Exception {
+    long triggerTs = element.getTimestamp();
+    nextPhaseTs = Math.max(triggerTs, nextPhaseTs + 1);
+
+    try {
+      table.refresh();
+      Snapshot mainSnapshot = table.snapshot(targetBranch);
+      Long currentMainSnapshotId = mainSnapshot != null ? 
mainSnapshot.snapshotId() : null;
+
+      updateState(mainSnapshot);
+
+      Snapshot stagingSnapshot = table.snapshot(stagingBranch);
+      if (stagingSnapshot == null) {
+        LOG.info("No snapshot on staging branch '{}', nothing to convert.", 
stagingBranch);
+        emitNoOpResult(triggerTs, currentMainSnapshotId);
+        return;
+      }
+
+      Snapshot nextSnapshot = findNextSnapshot(stagingSnapshot);
+      if (nextSnapshot == null) {
+        LOG.info("No new staging snapshots to process on branch '{}'.", 
stagingBranch);
+        emitNoOpResult(triggerTs, currentMainSnapshotId);
+        return;
+      }
+
+      emitReadCommandsForSnapshot(nextSnapshot, triggerTs, 
currentMainSnapshotId);
+    } catch (Exception e) {
+      LOG.error(
+          "Error processing equality deletes for table {} task {}[{}]",
+          tableName,
+          taskName,
+          taskIndex,
+          e);
+      output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
+      Snapshot mainSnapshot = table.snapshot(targetBranch);
+      Long currentMainSnapshotId = mainSnapshot != null ? 
mainSnapshot.snapshotId() : null;
+      emitNoOpResult(triggerTs, currentMainSnapshotId);
+    }
+  }
+
+  private void updateState(Snapshot mainSnapshot) {
+    Long currentMainSnapshotId = mainSnapshot != null ? 
mainSnapshot.snapshotId() : null;
+
+    int externalCount = 0;
+    Snapshot current = mainSnapshot;
+    while (current != null) {
+      String prop =
+          
current.summary().get(EqualityConvertCommitter.COMMITTED_STAGING_SNAPSHOT_PROPERTY);
+      if (prop != null) {
+        // First property found going back from main head is our most recent 
commit.
+        lastStagingSnapshotId = Long.parseLong(prop);
+        break;
+      }
+
+      externalCount++;
+      current = current.parentId() != null ? 
table.snapshot(current.parentId()) : null;
+    }
+
+    boolean mainChanged =
+        lastMainSnapshotId != null && 
!lastMainSnapshotId.equals(currentMainSnapshotId);
+    if (mainChanged && externalCount > 0 && !mainIndexEmittedSet.isEmpty()) {
+      LOG.info(
+          "Detected {} external commit(s) on branch '{}' since previous cycle, 
reindexing.",
+          externalCount,
+          targetBranch);
+      indexSnapshotId = currentMainSnapshotId;
+      // Full reindex: re-emits all data files from the current main snapshot 
for every equality
+      // field set that was previously indexed. This is triggered when an 
external commit (e.g.
+      // compaction) changes the main branch and the existing index may 
reference stale file paths.
+      emitReindexFromMain(mainSnapshot);
+      reindexCounter.inc();
+    } else if (mainIndexEmittedSet.isEmpty()) {
+      indexSnapshotId = currentMainSnapshotId;
+    }
+
+    lastMainSnapshotId = currentMainSnapshotId;
+  }
+
+  private void emitReadCommandsForSnapshot(
+      Snapshot stagingSnapshot, long triggerTs, Long currentMainSnapshotId) {
+    List<DataFile> newDataFiles = Lists.newArrayList();
+    List<DeleteFile> stagingDVFiles = Lists.newArrayList();
+    List<DeleteFile> posDeleteFiles = Lists.newArrayList();
+    Map<List<Integer>, List<DeleteFile>> deletesByFieldIds = Maps.newHashMap();
+    SnapshotChanges changes = 
SnapshotChanges.builderFor(table).snapshot(stagingSnapshot).build();
+
+    // We do not support staging snapshots that remove data files: compaction 
/ overwrite rewrites
+    // on the staging branch would require us to rewrite the corresponding DVs 
against the new data
+    // files on target, which isn't implemented here. Fail fast so the 
operator doesn't silently
+    // drop work.
+    if (changes.removedDataFiles().iterator().hasNext()) {
+      throw new IllegalStateException(
+          String.format(
+              "Staging snapshot %s on branch '%s' removes data files; "
+                  + "equality delete conversion does not support rewrites on 
the staging branch. "
+                  + "Run compaction on the target branch instead.",
+              stagingSnapshot.snapshotId(), stagingBranch));
+    }
+
+    // SnapshotChanges' iterators are backed by manifest readers which reuse 
the same DataFile /
+    // DeleteFile instance across iterations, so we have to copy before 
buffering.
+    for (DataFile dataFile : changes.addedDataFiles()) {
+      newDataFiles.add(dataFile.copy());
+    }
+
+    for (DeleteFile deleteFile : changes.addedDeleteFiles()) {
+      if (deleteFile.content() == FileContent.EQUALITY_DELETES) {
+        deletesByFieldIds
+            .computeIfAbsent(deleteFile.equalityFieldIds(), k -> 
Lists.newArrayList())
+            .add(deleteFile.copy());
+      } else if (ContentFileUtil.isDV(deleteFile)) {
+        stagingDVFiles.add(deleteFile.copy());
+      } else {
+        // V2 positional delete files can still appear on the staging branch 
(e.g. from V2 writers
+        // or legacy commits). The converter does not produce new positional 
deletes — it reads
+        // these files and feeds their (path, pos) pairs into the same DV 
merge as the eq-delete
+        // resolutions so the absorbed positions land in the target branch as 
DVs.
+        posDeleteFiles.add(deleteFile.copy());
+      }
+    }
+
+    boolean hasEqDeletes = !deletesByFieldIds.isEmpty();
+
+    if (newDataFiles.isEmpty()
+        && !hasEqDeletes
+        && stagingDVFiles.isEmpty()
+        && posDeleteFiles.isEmpty()) {
+      LOG.info("No new files on staging branch '{}' to convert.", 
stagingBranch);
+      emitNoOpResult(triggerTs, currentMainSnapshotId);
+      return;
+    }
+
+    // Emit read commands in phases separated by watermarks. The watermarks 
are required to ensure
+    // the worker task processes each phase separately:
+    //   Phase 0: main data files (index refresh)
+    //   Phase 1: equality delete files  (resolution against the index)
+    //          + positional deletes files (converted directly to DVPositions, 
bypass worker)
+    //   Phase 2: staging data files (index update for next cycle)
+    emitMainDataPhase(deletesByFieldIds);
+    emitDeletePhase(deletesByFieldIds, posDeleteFiles);
+    emitSnapshotDataPhase(stagingSnapshot, newDataFiles, mainIndexEmittedSet);
+
+    LOG.info(
+        "Emitted read commands for {} new data files from staging branch 
'{}'.",
+        newDataFiles.size(),
+        stagingBranch);
+
+    processedStagingSnapshotNumCounter.inc();
+
+    output.collect(
+        METADATA_STREAM,
+        new StreamRecord<>(
+            new EqualityConvertPlanResult(
+                newDataFiles,
+                stagingDVFiles,
+                stagingSnapshot.snapshotId(),
+                currentMainSnapshotId,
+                triggerTs,
+                nextPhaseTs)));
+
+    output.emitWatermark(new Watermark(nextPhaseTs));

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to