This is an automated email from the ASF dual-hosted git repository.
richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 440cf90d [AURON #2062] Fix AuronKafkaSourceFunction watermark was not
generated (#2098)
440cf90d is described below
commit 440cf90db79b9f72fbf63d1a1884705bfbed574d
Author: zhangmang <[email protected]>
AuthorDate: Thu Mar 19 11:04:13 2026 +0800
[AURON #2062] Fix AuronKafkaSourceFunction watermark was not generated
(#2098)
# Which issue does this PR close?
Closes #2062
# Rationale for this change
* use `org.apache.flink.table.runtime.generated.WatermarkGenerator` to
calculate the watermark
# What changes are included in this PR?
* add `flink-table-runtime` dependency
* modify `AuronKafkaSourceFunction` to use
`org.apache.flink.table.runtime.generated.WatermarkGenerator`
# Are there any user-facing changes?
* No
# How was this patch tested?
* JNI and Spark need to be decoupled
---
auron-flink-extension/auron-flink-runtime/pom.xml | 6 +
.../kafka/AuronKafkaDynamicTableSource.java | 2 +-
.../connector/kafka/AuronKafkaSourceFunction.java | 238 +++++++++------------
.../SourceContextWatermarkOutputAdapter.java | 48 -----
4 files changed, 107 insertions(+), 187 deletions(-)
diff --git a/auron-flink-extension/auron-flink-runtime/pom.xml
b/auron-flink-extension/auron-flink-runtime/pom.xml
index 654ce826..d99a2dbf 100644
--- a/auron-flink-extension/auron-flink-runtime/pom.xml
+++ b/auron-flink-extension/auron-flink-runtime/pom.xml
@@ -74,6 +74,12 @@
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
<!-- Kafka client for partition metadata discovery -->
<dependency>
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
index 837ea581..4b974d29 100644
---
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
@@ -88,7 +88,7 @@ public class AuronKafkaDynamicTableSource implements
ScanTableSource, SupportsWa
startupMode);
if (watermarkStrategy != null) {
- sourceFunction.assignTimestampsAndWatermarks(watermarkStrategy);
+ sourceFunction.setWatermarkStrategy(watermarkStrategy);
}
return new DataStreamScanProvider() {
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
index 52c56fa2..16d16da9 100644
---
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
@@ -17,10 +17,10 @@
package org.apache.auron.flink.connector.kafka;
import static org.apache.auron.flink.connector.kafka.KafkaConstants.*;
-import static org.apache.flink.util.Preconditions.checkNotNull;
import java.io.File;
import java.io.InputStream;
+import java.lang.reflect.Field;
import java.util.*;
import org.apache.auron.flink.arrow.FlinkArrowReader;
import org.apache.auron.flink.arrow.FlinkArrowUtils;
@@ -38,11 +38,6 @@ import org.apache.auron.protobuf.KafkaStartupMode;
import org.apache.auron.protobuf.PhysicalPlanNode;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.eventtime.TimestampAssigner;
-import org.apache.flink.api.common.eventtime.WatermarkGenerator;
-import org.apache.flink.api.common.eventtime.WatermarkOutput;
-import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
@@ -50,7 +45,6 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
@@ -60,16 +54,17 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
-import
org.apache.flink.streaming.connectors.kafka.internals.SourceContextWatermarkOutputAdapter;
import org.apache.flink.table.data.RowData;
+import
org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier;
+import org.apache.flink.table.runtime.generated.WatermarkGenerator;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.SerializableObject;
-import org.apache.flink.util.SerializedValue;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
@@ -82,9 +77,9 @@ import org.slf4j.LoggerFactory;
* If checkpoints are enabled, Kafka offsets are committed via Auron after a
successful checkpoint.
* If checkpoints are disabled, Kafka offsets are committed periodically via
Auron.
*
- * <p>Watermark support is implemented via {@link WatermarkOutputMultiplexer}
with per-partition
- * watermark generation. Partition expansion is detected periodically using a
lightweight
- * {@link KafkaConsumer} (metadata queries only, no data consumption).
+ * <p>Watermark support uses the table-runtime {@link WatermarkGenerator}
directly
+ * (from {@code WatermarkPushDownSpec}) with per-partition watermark tracking.
+ * The combined watermark emitted downstream is the minimum across all
assigned partitions.
*/
public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData>
implements FlinkAuronFunction, CheckpointListener,
CheckpointedFunction {
@@ -110,7 +105,6 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
private transient Map<Integer, Long> restoredOffsets;
private transient Map<Integer, Long> currentOffsets;
private final SerializableObject lock = new SerializableObject();
- private SerializedValue<WatermarkStrategy<RowData>> watermarkStrategy;
private volatile boolean isRunning;
private transient String auronOperatorIdWithSubtaskIndex;
private transient MetricNode nativeMetric;
@@ -120,14 +114,11 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
private transient KafkaConsumer<byte[], byte[]> kafkaConsumer;
private transient List<Integer> assignedPartitions;
- // Watermark related
- private transient WatermarkOutputMultiplexer watermarkOutputMultiplexer;
- private transient Map<Integer, String> partitionIdToOutputIdMap;
- private transient WatermarkGenerator<RowData> watermarkGenerator;
- private transient TimestampAssigner<RowData> timestampAssigner;
- // Periodic watermark control: autoWatermarkInterval > 0 means enabled
- private transient long autoWatermarkInterval;
- private transient long lastPeriodicWatermarkTime;
+ // Watermark related: uses table-runtime WatermarkGenerator directly
+ private WatermarkStrategy<RowData> watermarkStrategy;
+ private transient WatermarkGenerator tableWatermarkGenerator;
+ private transient Map<Integer, Long> partitionWatermarks;
+ private transient long currentCombinedWatermark;
public AuronKafkaSourceFunction(
LogicalType outputType,
@@ -231,22 +222,24 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
subtaskIndex,
assignedPartitions);
- // 3. Initialize Watermark components if watermarkStrategy is set
+ // 3. Initialize table-runtime WatermarkGenerator if watermarkStrategy
is set
if (watermarkStrategy != null) {
- ClassLoader userCodeClassLoader =
runtimeContext.getUserCodeClassLoader();
- WatermarkStrategy<RowData> deserializedWatermarkStrategy =
- watermarkStrategy.deserializeValue(userCodeClassLoader);
-
MetricGroup metricGroup = runtimeContext.getMetricGroup();
-
- this.timestampAssigner =
deserializedWatermarkStrategy.createTimestampAssigner(() -> metricGroup);
-
- this.watermarkGenerator =
deserializedWatermarkStrategy.createWatermarkGenerator(() -> metricGroup);
-
- // 4. Determine periodic watermark interval
- // autoWatermarkInterval > 0 means periodic watermark is enabled
- this.autoWatermarkInterval =
runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
- this.lastPeriodicWatermarkTime = 0L; // Initialize to 0 so first
emit triggers immediately
+ // Create DataStream API WatermarkGenerator via the strategy
+ org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData>
dsGenerator =
+ watermarkStrategy.createWatermarkGenerator(() ->
metricGroup);
+ // Extract inner table-runtime WatermarkGenerator from
DefaultWatermarkGenerator
+ if (dsGenerator instanceof
GeneratedWatermarkGeneratorSupplier.DefaultWatermarkGenerator) {
+ Field field =
GeneratedWatermarkGeneratorSupplier.DefaultWatermarkGenerator.class.getDeclaredField(
+ "innerWatermarkGenerator");
+ field.setAccessible(true);
+ this.tableWatermarkGenerator = (WatermarkGenerator)
field.get(dsGenerator);
+ } else {
+ throw new IllegalStateException("Expected
DefaultWatermarkGenerator from WatermarkPushDownSpec, got: "
+ + dsGenerator.getClass().getName());
+ }
+ this.partitionWatermarks = new HashMap<>();
+ this.currentCombinedWatermark = Long.MIN_VALUE;
}
this.isRunning = true;
}
@@ -267,97 +260,76 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
fieldList.addAll(((RowType) outputType).getFields());
RowType auronOutputRowType = new RowType(fieldList);
- // Initialize WatermarkOutputMultiplexer here because sourceContext is
available
- if (watermarkGenerator != null) {
- this.watermarkOutputMultiplexer =
- new WatermarkOutputMultiplexer(new
SourceContextWatermarkOutputAdapter<>(sourceContext));
- this.partitionIdToOutputIdMap = new HashMap<>();
- for (Integer partition : assignedPartitions) {
- String outputId = createOutputId(partition);
- partitionIdToOutputIdMap.put(partition, outputId);
- watermarkOutputMultiplexer.registerNewOutput(outputId,
watermark -> {});
- }
- }
-
// Pre-check watermark flag to avoid per-record null checks in the hot
path
- final boolean enableWatermark = watermarkGenerator != null;
-
- while (this.isRunning) {
- AuronCallNativeWrapper wrapper = new AuronCallNativeWrapper(
- FlinkArrowUtils.getRootAllocator(),
- physicalPlanNode,
- nativeMetric,
- 0,
- 0,
- 0,
- AuronAdaptor.getInstance()
- .getAuronConfiguration()
-
.getLong(FlinkAuronConfiguration.NATIVE_MEMORY_SIZE));
-
- if (enableWatermark) {
- // Watermark-enabled path
- while (wrapper.loadNextBatch(batch -> {
- Map<Integer, Long> tmpOffsets = new
HashMap<>(currentOffsets);
- FlinkArrowReader arrowReader =
FlinkArrowReader.create(batch, auronOutputRowType, 3);
-
- for (int i = 0; i < batch.getRowCount(); i++) {
- AuronColumnarRowData tmpRowData =
(AuronColumnarRowData) arrowReader.read(i);
- // Extract kafka meta fields
- int partitionId = tmpRowData.getInt(-3);
- long offset = tmpRowData.getLong(-2);
- long kafkaTimestamp = tmpRowData.getLong(-1);
- tmpOffsets.put(partitionId, offset);
-
- // Extract event timestamp via user-defined
TimestampAssigner
- long timestamp =
timestampAssigner.extractTimestamp(tmpRowData, kafkaTimestamp);
-
- // Route to the per-partition WatermarkOutput and
trigger onEvent
- // outputId must not null, else is a bug
- String outputId =
partitionIdToOutputIdMap.get(partitionId);
- WatermarkOutput partitionOutput =
watermarkOutputMultiplexer.getImmediateOutput(outputId);
- watermarkGenerator.onEvent(tmpRowData, timestamp,
partitionOutput);
- // Emit record with event timestamp
-
sourceContext.collectWithTimestamp(arrowReader.read(i), timestamp);
- }
-
- // Periodic watermark: only emit if enough time has
elapsed since last emit
- // Controlled by ExecutionConfig.getAutoWatermarkInterval()
- long currentTime = System.currentTimeMillis();
- if (autoWatermarkInterval > 0
- && (currentTime - lastPeriodicWatermarkTime) >=
autoWatermarkInterval) {
- for (Map.Entry<Integer, String> entry :
partitionIdToOutputIdMap.entrySet()) {
- // Use getDeferredOutput for periodic emit: all
partitions update first,
- // then multiplexer merges and emits once via
onPeriodicEmit()
- WatermarkOutput output =
watermarkOutputMultiplexer.getDeferredOutput(entry.getValue());
- watermarkGenerator.onPeriodicEmit(output);
+ final boolean enableWatermark = tableWatermarkGenerator != null;
+
+ AuronCallNativeWrapper wrapper = new AuronCallNativeWrapper(
+ FlinkArrowUtils.getRootAllocator(),
+ physicalPlanNode,
+ nativeMetric,
+ 0,
+ 0,
+ 0,
+
AuronAdaptor.getInstance().getAuronConfiguration().getLong(FlinkAuronConfiguration.NATIVE_MEMORY_SIZE));
+
+ if (enableWatermark) {
+ // Watermark-enabled path: use table-runtime WatermarkGenerator
directly
+ while (wrapper.loadNextBatch(batch -> {
+ Map<Integer, Long> tmpOffsets = new HashMap<>(currentOffsets);
+ FlinkArrowReader arrowReader = FlinkArrowReader.create(batch,
auronOutputRowType, 3);
+ for (int i = 0; i < batch.getRowCount(); i++) {
+ AuronColumnarRowData tmpRowData = (AuronColumnarRowData)
arrowReader.read(i);
+ // Extract kafka meta fields
+ int partitionId = tmpRowData.getInt(-3);
+ long offset = tmpRowData.getLong(-2);
+ long kafkaTimestamp = tmpRowData.getLong(-1);
+ tmpOffsets.put(partitionId, offset);
+
+ try {
+ // Compute watermark using table-runtime
WatermarkGenerator (stateless pure function)
+ // with local Timezone
+ Long watermark =
tableWatermarkGenerator.currentWatermark(tmpRowData);
+ // Update per-partition watermark tracking
+ if (watermark != null) {
+ partitionWatermarks.merge(partitionId, watermark,
Math::max);
}
- // Merge all deferred updates and emit the combined
watermark downstream
- watermarkOutputMultiplexer.onPeriodicEmit();
- lastPeriodicWatermarkTime = currentTime;
+ } catch (Exception e) {
+ throw new RuntimeException("Generated
WatermarkGenerator fails to generate:", e);
}
+ // Emit record with kafka timestamp
+ sourceContext.collectWithTimestamp(tmpRowData,
kafkaTimestamp);
+ }
- synchronized (lock) {
- currentOffsets = tmpOffsets;
- }
- })) {}
- } else {
- // No-watermark path: still use collectWithTimestamp with
kafka timestamp
- while (wrapper.loadNextBatch(batch -> {
- Map<Integer, Long> tmpOffsets = new
HashMap<>(currentOffsets);
- FlinkArrowReader arrowReader =
FlinkArrowReader.create(batch, auronOutputRowType, 3);
- for (int i = 0; i < batch.getRowCount(); i++) {
- AuronColumnarRowData tmpRowData =
(AuronColumnarRowData) arrowReader.read(i);
- int partitionId = tmpRowData.getInt(-3);
- long offset = tmpRowData.getLong(-2);
- long kafkaTimestamp = tmpRowData.getLong(-1);
- tmpOffsets.put(partitionId, offset);
-
sourceContext.collectWithTimestamp(arrowReader.read(i), kafkaTimestamp);
- }
- synchronized (lock) {
- currentOffsets = tmpOffsets;
+ // After each batch, compute combined watermark (min across
all partitions) and emit
+ if (!partitionWatermarks.isEmpty()) {
+ long minWatermark =
Collections.min(partitionWatermarks.values());
+ if (minWatermark > currentCombinedWatermark) {
+ currentCombinedWatermark = minWatermark;
+ sourceContext.emitWatermark(new
Watermark(minWatermark));
}
- })) {}
- }
+ }
+
+ synchronized (lock) {
+ currentOffsets = tmpOffsets;
+ }
+ })) {}
+ } else {
+ // No-watermark path: still use collectWithTimestamp with kafka
timestamp
+ while (wrapper.loadNextBatch(batch -> {
+ Map<Integer, Long> tmpOffsets = new HashMap<>(currentOffsets);
+ FlinkArrowReader arrowReader = FlinkArrowReader.create(batch,
auronOutputRowType, 3);
+ for (int i = 0; i < batch.getRowCount(); i++) {
+ AuronColumnarRowData tmpRowData = (AuronColumnarRowData)
arrowReader.read(i);
+ int partitionId = tmpRowData.getInt(-3);
+ long offset = tmpRowData.getLong(-2);
+ long kafkaTimestamp = tmpRowData.getLong(-1);
+ tmpOffsets.put(partitionId, offset);
+ sourceContext.collectWithTimestamp(tmpRowData,
kafkaTimestamp);
+ }
+ synchronized (lock) {
+ currentOffsets = tmpOffsets;
+ }
+ })) {}
}
LOG.info("Auron kafka source run end");
}
@@ -376,6 +348,11 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
kafkaConsumer.close();
}
+ // Close table-runtime WatermarkGenerator
+ if (tableWatermarkGenerator != null) {
+ tableWatermarkGenerator.close();
+ }
+
super.close();
}
@@ -478,22 +455,7 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
}
}
- public AuronKafkaSourceFunction
assignTimestampsAndWatermarks(WatermarkStrategy<RowData> watermarkStrategy) {
- checkNotNull(watermarkStrategy);
- try {
- ClosureCleaner.clean(watermarkStrategy,
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
- this.watermarkStrategy = new SerializedValue<>(watermarkStrategy);
- } catch (Exception e) {
- throw new IllegalArgumentException("The given WatermarkStrategy is
not serializable", e);
- }
- return this;
- }
-
- //
-------------------------------------------------------------------------
- // Internal helpers
- //
-------------------------------------------------------------------------
-
- private String createOutputId(int partitionId) {
- return topic + "-" + partitionId;
+ public void setWatermarkStrategy(WatermarkStrategy<RowData>
watermarkStrategy) {
+ this.watermarkStrategy = watermarkStrategy;
}
}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java
deleted file mode 100644
index ea819441..00000000
---
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.api.common.eventtime.Watermark;
-import org.apache.flink.api.common.eventtime.WatermarkOutput;
-import
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-
-/**
- * A {@link org.apache.flink.api.common.eventtime.WatermarkOutput} that
forwards calls to a {@link
- *
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext}.
- */
-public class SourceContextWatermarkOutputAdapter<T> implements WatermarkOutput
{
- private final SourceContext<T> sourceContext;
-
- public SourceContextWatermarkOutputAdapter(SourceContext<T> sourceContext)
{
- this.sourceContext = sourceContext;
- }
-
- @Override
- public void emitWatermark(Watermark watermark) {
- sourceContext.emitWatermark(new
org.apache.flink.streaming.api.watermark.Watermark(watermark.getTimestamp()));
- }
-
- @Override
- public void markIdle() {
- sourceContext.markAsTemporarilyIdle();
- }
-
- @Override
- public void markActive() {
- // will be set active with next watermark
- }
-}