pvary commented on code in PR #15996: URL: https://github.com/apache/iceberg/pull/15996#discussion_r3266739442
########## flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/EqualityConvertPlanner.java: ########## @@ -0,0 +1,676 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.OutputTag; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotChanges; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.api.Trigger; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.ContentFileUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Planner for the equality delete conversion pipeline. For each trigger, it picks the oldest + * staging snapshot that hasn't been converted yet and emits {@link ReadCommand}s describing the + * files its downstream readers and workers must process. + * + * <p>Each trigger runs two steps in order: + * + * <ol> + * <li>{@link #ensureIndexCurrent}: updates {@link #lastStagingSnapshotId} from main's history, + * bootstraps the worker index from main on first run, and reindexes when external commits + * (e.g. compaction) have advanced main past the currently-indexed snapshot. + * <li>{@link #processStagingSnapshot}: resolve the chosen staging snapshot's eq deletes against + * the (now-current) index, pass through any DV files, and index the snapshot's new data files + * for the next cycle. + * </ol> + * + * Watermarks separate phases that gate the worker's keyed state. The contract is documented on + * {@link #advancePhase()}. + * + * <p>An {@link EqualityConvertPlanResult} with the current cycle's metadata is emitted via the + * {@link #METADATA_STREAM} side output after the read commands. + * + * <p>Assumes a single equality-field set supplied via the builder; staging eq-deletes with a + * different {@code equalityFieldIds} fail fast in {@link #retrieveStagingFiles}. Concurrent writes + * on the target branch are handled by {@link #ensureIndexCurrent} reindexing from the new main + * snapshot; commit-time conflicts are caught by {@code RowDelta.validateFromSnapshot}. + */ +@Internal +public class EqualityConvertPlanner extends AbstractStreamOperator<ReadCommand> + implements OneInputStreamOperator<Trigger, ReadCommand> { + + private static final Logger LOG = LoggerFactory.getLogger(EqualityConvertPlanner.class); + + public static final OutputTag<EqualityConvertPlanResult> METADATA_STREAM = + new OutputTag<>("metadata-stream") {}; + + private static final String PROCESSED_EQ_DELETE_FILE_NUM_METRIC = "processedEqDeleteFileNum"; + private static final String PROCESSED_STAGING_SNAPSHOT_NUM_METRIC = "processedStagingSnapshotNum"; + private static final String SKIPPED_NO_OP_CYCLES_METRIC = "skippedNoOpCycles"; + private static final String REINDEX_COUNT_METRIC = "reindexCount"; + + private final String tableName; + private final String taskName; + private final TableLoader tableLoader; + private final String stagingBranch; + private final String targetBranch; + // Equality-field-id list the worker keys on. Supplied via the builder; every staging + // eq-delete's equalityFieldIds() must match exactly. + private final List<Integer> eqFieldIds; + + // Main snapshot id the worker's index reflects. + private transient ListState<Long> indexSnapshotState; + + private transient Table table; + + private transient Long lastMainSnapshotId; + private transient Long lastStagingSnapshotId; + private transient Long indexSnapshotId; + + private transient long nextPhaseTs; + + // Per-trigger cache of SnapshotChanges to avoid re-parsing manifest lists. + private transient Map<Long, SnapshotChanges> snapshotChangesCache; + + private transient Counter processedEqDeleteFileNumCounter; + private transient Counter processedStagingSnapshotNumCounter; + private transient Counter skippedNoOpCyclesCounter; + private transient Counter reindexCounter; + + public EqualityConvertPlanner( + String tableName, + String taskName, + TableLoader tableLoader, + String stagingBranch, + String targetBranch, + List<Integer> eqFieldIds) { + this.tableName = tableName; + this.taskName = taskName; + this.tableLoader = tableLoader; + this.stagingBranch = stagingBranch; + this.targetBranch = targetBranch; + Preconditions.checkArgument( + eqFieldIds != null && !eqFieldIds.isEmpty(), "eqFieldIds must not be null or empty"); + this.eqFieldIds = ImmutableList.copyOf(eqFieldIds); + } + + @Override + public void open() throws Exception { + super.open(); + if (!tableLoader.isOpen()) { + tableLoader.open(); + } + + table = tableLoader.loadTable(); + + MetricGroup taskMetricGroup = + TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, taskName, 0); + this.processedEqDeleteFileNumCounter = + taskMetricGroup.counter(PROCESSED_EQ_DELETE_FILE_NUM_METRIC); + this.processedStagingSnapshotNumCounter = + taskMetricGroup.counter(PROCESSED_STAGING_SNAPSHOT_NUM_METRIC); + this.skippedNoOpCyclesCounter = taskMetricGroup.counter(SKIPPED_NO_OP_CYCLES_METRIC); + this.reindexCounter = taskMetricGroup.counter(REINDEX_COUNT_METRIC); + this.snapshotChangesCache = Maps.newHashMap(); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + indexSnapshotState = + context + .getOperatorStateStore() + .getListState(new ListStateDescriptor<>("indexSnapshotId", Types.LONG)); + + indexSnapshotId = null; + for (Long stateValue : indexSnapshotState.get()) { + Preconditions.checkState( + indexSnapshotId == null, "indexSnapshotId state should hold at most one value"); + indexSnapshotId = stateValue; + } + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + indexSnapshotState.clear(); + if (indexSnapshotId != null) { + indexSnapshotState.add(indexSnapshotId); + } + } + + @Override + public void processElement(StreamRecord<Trigger> element) throws Exception { + long triggerTs = element.getTimestamp(); + nextPhaseTs = Math.max(triggerTs, nextPhaseTs + 1); + snapshotChangesCache.clear(); + + try { + table.refresh(); + Snapshot mainSnapshot = table.snapshot(targetBranch); + Long currentMainSnapshotId = mainSnapshot != null ? mainSnapshot.snapshotId() : null; + + ensureIndexCurrent(mainSnapshot); + + Snapshot nextToProcess = + nextUnprocessedStagingSnapshot(table.snapshot(stagingBranch), mainSnapshot); + + if (nextToProcess == null) { + LOG.info("Nothing new to convert on staging branch '{}'.", stagingBranch); + emitNoOpResult(triggerTs, currentMainSnapshotId); + return; + } + + processStagingSnapshot(nextToProcess, triggerTs, currentMainSnapshotId); + } catch (Exception e) { + LOG.error("Error processing equality deletes for table {} task {}", tableName, taskName, e); + output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e)); + Snapshot mainSnapshot = table.snapshot(targetBranch); + Long currentMainSnapshotId = mainSnapshot != null ? mainSnapshot.snapshotId() : null; + emitNoOpResult(triggerTs, currentMainSnapshotId); + } + } + + /** + * Brings the worker's index up to date with the current state of the target branch: + * + * <ul> + * <li>Updates {@link #lastStagingSnapshotId} from the most recent committer marker on main. + * <li>Bootstraps the index from main on the first trigger with a non-null main snapshot. + * <li>Reindexes from main when external commits (e.g. compaction or direct writes) have + * advanced main past the currently-indexed snapshot. + * </ul> + * + * <p>No-op when main hasn't moved since the last trigger. Otherwise the history walk is bounded + * to commits added since {@link #lastMainSnapshotId}. + */ + private void ensureIndexCurrent(Snapshot mainSnapshot) { + Long currentMainSnapshotId = mainSnapshot != null ? mainSnapshot.snapshotId() : null; + + if (Objects.equals(lastMainSnapshotId, currentMainSnapshotId)) { + return; + } + + LastCommittedWork info = discoverLastCommittedWork(mainSnapshot); + updateLastStagingSnapshotId(info); + + boolean bootstrap = mainSnapshot != null && indexSnapshotId == null; + boolean reindex = indexSnapshotId != null && info.externalCommitCount() > 0; + if (bootstrap || reindex) { + LOG.info( + "{} worker index from main snapshot {} for field IDs {}.", + bootstrap ? "Bootstrapping" : "Reindexing", + currentMainSnapshotId, + eqFieldIds); + indexSnapshotId = currentMainSnapshotId; + emitMainDataReadCommands(mainSnapshot); + if (reindex) { + reindexCounter.inc(); + } + } + + lastMainSnapshotId = currentMainSnapshotId; + } + + private void updateLastStagingSnapshotId(LastCommittedWork info) { + if (info.lastCommittedStaging() != null) { + lastStagingSnapshotId = info.lastCommittedStaging(); + return; + } + + Preconditions.checkState( + lastMainSnapshotId == null || lastStagingSnapshotId == null || info.reachedLastInspected(), + "No COMMITTED_STAGING_SNAPSHOT marker reachable on target branch '%s' for table %s, " + + "but a prior marker was seen (lastStagingSnapshotId=%s). Target may have been " + + "rewritten (rollback, replace_main, or snapshot expiration). " + + "Manual intervention required.", + targetBranch, + tableName, + lastStagingSnapshotId); + } + + /** + * Walks main back from head looking for the most recent snapshot tagged with {@link + * EqualityConvertCommitter#COMMITTED_STAGING_SNAPSHOT_PROPERTY}. Returns the staging snapshot id + * recorded there (or {@code null} if not reached) and the count of intervening external commits. + * + * <p>The walk stops at whichever comes first: + * + * <ul> + * <li>The first snapshot carrying the committer marker. + * <li>{@link #lastMainSnapshotId} — anything older was inspected on a previous trigger. + * </ul> + */ + private LastCommittedWork discoverLastCommittedWork(Snapshot mainSnapshot) { + Long lastCommittedStaging = null; + int externalCount = 0; + boolean reachedLastInspected = false; + Snapshot current = mainSnapshot; + while (current != null) { + if (lastMainSnapshotId != null && current.snapshotId() == lastMainSnapshotId) { + reachedLastInspected = true; + break; + } + + String prop = + current.summary().get(EqualityConvertCommitter.COMMITTED_STAGING_SNAPSHOT_PROPERTY); + if (prop != null) { + lastCommittedStaging = Long.parseLong(prop); + break; + } + + externalCount++; + current = parentOf(current); + } + + return new LastCommittedWork(lastCommittedStaging, externalCount, reachedLastInspected); + } + + private record LastCommittedWork( + Long lastCommittedStaging, int externalCommitCount, boolean reachedLastInspected) {} + + /** + * Walks staging history from head back to the stop point and returns the oldest unprocessed + * snapshot to convert this cycle, or {@code null} if there's nothing new. + * + * <p>The stop point is: + * + * <ul> + * <li>the last-processed staging snapshot, if known; + * <li>otherwise, on cold start with {@code stagingBranch != targetBranch}, the common ancestor + * with target; + * <li>otherwise, on cold start with {@code stagingBranch == targetBranch}, the head's parent. + * </ul> + * + * <p>When {@code stagingBranch == targetBranch}, the writer commits eq-deletes and data files + * directly to target; the converter performs in-place eq-delete-to-DV compaction. Older snapshots + * already on target are assumed converted, so on cold start only the head is in scope. + */ + private Snapshot nextUnprocessedStagingSnapshot(Snapshot stagingHead, Snapshot mainSnapshot) { + if (stagingHead == null) { + return null; + } + + Long stopAt; + if (lastStagingSnapshotId != null) { + stopAt = lastStagingSnapshotId; + } else if (stagingBranch.equals(targetBranch)) { + // On cold start with a shared branch, everything before head is already on target. Review Comment: I'm a bit confused here. Can we talk a bit about the use-case here? My thought was, that we have a normal Flink Sink which writes to the table, and in the postcommit topology we have the EQResolver to resolve everything. In this case there is a concurrency issue between the writer and the resolver. The writer might create eq deletes and the resolver might skip them because of the scheduler not starting the EQ resolver immediatelly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
