fhueske commented on code in PR #28329:
URL: https://github.com/apache/flink/pull/28329#discussion_r3461343173


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/snapshot/LateralSnapshotJoinOperator.java:
##########
@@ -0,0 +1,1038 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.snapshot;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.DefaultOpenContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.JoinCondition;
+import 
org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters;
+import org.apache.flink.table.runtime.operators.metrics.SimpleGauge;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
+
+/**
+ * Stream operator implementing the {@code LATERAL SNAPSHOT} processing-time 
temporal table join.
+ *
+ * <p>The operator runs in two operator-wide phases that progress only 
forward, LOAD then JOIN: LOAD
+ * bootstraps the build-side table from its changelog up to {@code 
loadCompletedTime}, buffering
+ * probe rows meanwhile; JOIN then continuously joins probe rows against the 
materialized build-side
+ * state. How each input is handled depends on the current phase:
+ *
+ * <ul>
+ *   <li>Build-side (input2 / right) changes are handled the same way in both 
phases: they are
+ *       buffered in {@code buildChangeBuffer} and applied lazily to a per-key 
multi-set in {@code
+ *       buildTableState} on the next per-key access (build- or probe-side) 
once the build-side
+ *       watermark has advanced past the buffer's tag, or at the flip. The 
buffered changelog is
+ *       applied in event-time order: changes are sorted by the build-side 
row-time attribute
+ *       ({@code buildRowtimeIndex}), and for equal row-times retractions 
({@code -U}/{@code -D})
+ *       are applied before accumulations ({@code +U}/{@code +I}). Buffering 
until the watermark
+ *       passes preserves atomic update visibility across {@code -U}/{@code 
+U} pairs in JOIN phase.
+ *   <li>Probe-side (input1 / left) records are handled differently per phase. 
During LOAD they are
+ *       buffered in {@code probeBuffer} until the configured flip point is 
reached on the
+ *       build-side watermark, at which point a per-key event-time timer 
drains the buffered probes
+ *       and joins them with the materialized build-side state. During JOIN 
they are joined
+ *       immediately with the current build-side state.
+ * </ul>
+ *
+ * <p>Watermark forwarding rules:
+ *
+ * <ul>
+ *   <li>Build-side watermarks are never forwarded downstream.
+ *   <li>Probe-side watermarks are held back during LOAD and forwarded during 
JOIN phase.
+ * </ul>
+ *
+ * <p>The flip from LOAD to JOIN phase is triggered by either:
+ *
+ * <ul>
+ *   <li>the build-side watermark reaching {@code loadCompletedTime} 
(event-time gate), or
+ *   <li>the {@code loadCompletedIdleTimeoutMs} processing-time timer firing 
without any build-side
+ *       watermark advance.

Review Comment:
   The idle timer is (re-)started in `open()` after state recovery, so this 
time should not reduce the effective idle interval.
   Wouldn't a new build-side record also cause the emission of a new build-side 
WM due to periodic watermark generation (default being 200ms)?
   
   Or am I misunderstanding your point?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to