fhueske commented on code in PR #28329: URL: https://github.com/apache/flink/pull/28329#discussion_r3461343173
########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/snapshot/LateralSnapshotJoinOperator.java: ########## @@ -0,0 +1,1038 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.snapshot; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.functions.DefaultOpenContext; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.generated.JoinCondition; +import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters; +import org.apache.flink.table.runtime.operators.metrics.SimpleGauge; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledFuture; + +/** + * Stream operator implementing the {@code LATERAL SNAPSHOT} processing-time temporal table join. + * + * <p>The operator runs in two operator-wide phases that progress only forward, LOAD then JOIN: LOAD + * bootstraps the build-side table from its changelog up to {@code loadCompletedTime}, buffering + * probe rows meanwhile; JOIN then continuously joins probe rows against the materialized build-side + * state. How each input is handled depends on the current phase: + * + * <ul> + * <li>Build-side (input2 / right) changes are handled the same way in both phases: they are + * buffered in {@code buildChangeBuffer} and applied lazily to a per-key multi-set in {@code + * buildTableState} on the next per-key access (build- or probe-side) once the build-side + * watermark has advanced past the buffer's tag, or at the flip. The buffered changelog is + * applied in event-time order: changes are sorted by the build-side row-time attribute + * ({@code buildRowtimeIndex}), and for equal row-times retractions ({@code -U}/{@code -D}) + * are applied before accumulations ({@code +U}/{@code +I}). Buffering until the watermark + * passes preserves atomic update visibility across {@code -U}/{@code +U} pairs in JOIN phase. + * <li>Probe-side (input1 / left) records are handled differently per phase. During LOAD they are + * buffered in {@code probeBuffer} until the configured flip point is reached on the + * build-side watermark, at which point a per-key event-time timer drains the buffered probes + * and joins them with the materialized build-side state. During JOIN they are joined + * immediately with the current build-side state. + * </ul> + * + * <p>Watermark forwarding rules: + * + * <ul> + * <li>Build-side watermarks are never forwarded downstream. + * <li>Probe-side watermarks are held back during LOAD and forwarded during JOIN phase. + * </ul> + * + * <p>The flip from LOAD to JOIN phase is triggered by either: + * + * <ul> + * <li>the build-side watermark reaching {@code loadCompletedTime} (event-time gate), or + * <li>the {@code loadCompletedIdleTimeoutMs} processing-time timer firing without any build-side + * watermark advance. Review Comment: The idle timer is (re-)started in `open()` after state recovery, so this time should not reduce the effective idle interval. Wouldn't a new build-side record also cause the emission of a new build-side WM due to periodic watermark generation (default being 200ms)? Or am I misunderstanding your point? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
