This is an automated email from the ASF dual-hosted git repository.
richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new ebf14908 [AURON #2061] AuronKafkaSourceFunction support flink
checkpoint (#2084)
ebf14908 is described below
commit ebf14908d036d9631a76c9dc078ea7259b98f55f
Author: zhangmang <[email protected]>
AuthorDate: Wed Mar 11 16:50:12 2026 +0800
[AURON #2061] AuronKafkaSourceFunction support flink checkpoint (#2084)
# Which issue does this PR close?
Closes #2061
# Rationale for this change
* Auron kafka source support flink checkpoint
# What changes are included in this PR?
* modify AuronKafkaSourceFunction add flink checkpoint interface
* modify kafka_scan_exec.rs add offset restore and commit mechanism
* copy from KafkaTopicPartition, flink state compatibility requirements
# Are there any user-facing changes?
* No
# How was this patch tested?
* There is currently no Kafka environment integration, so automated
testing is not possible.
---
.../connector/kafka/AuronKafkaSourceFunction.java | 146 ++++++++++++++++++++-
.../kafka/internals/KafkaTopicPartition.java | 126 ++++++++++++++++++
.../src/flink/kafka_scan_exec.rs | 64 ++++++++-
3 files changed, 327 insertions(+), 9 deletions(-)
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
index 369f4b1d..569e63a4 100644
---
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
@@ -25,32 +25,51 @@ import org.apache.auron.flink.arrow.FlinkArrowReader;
import org.apache.auron.flink.arrow.FlinkArrowUtils;
import org.apache.auron.flink.configuration.FlinkAuronConfiguration;
import org.apache.auron.flink.runtime.operator.FlinkAuronFunction;
+import org.apache.auron.flink.table.data.AuronColumnarRowData;
import org.apache.auron.flink.utils.SchemaConverters;
import org.apache.auron.jni.AuronAdaptor;
import org.apache.auron.jni.AuronCallNativeWrapper;
+import org.apache.auron.jni.JniBridge;
import org.apache.auron.metric.MetricNode;
import org.apache.auron.protobuf.KafkaFormat;
import org.apache.auron.protobuf.KafkaScanExecNode;
import org.apache.auron.protobuf.KafkaStartupMode;
import org.apache.auron.protobuf.PhysicalPlanNode;
+import org.apache.commons.collections.map.LinkedMap;
import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.SerializableObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Auron Kafka source function.
+ * Only support AT-LEAST ONCE semantics.
+ * If checkpoints are enabled, Kafka offsets are committed via Auron after a
successful checkpoint.
+ * If checkpoints are disabled, Kafka offsets are committed periodically via
Auron.
*/
-public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData> implements FlinkAuronFunction {
+public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData>
+ implements FlinkAuronFunction, CheckpointListener,
CheckpointedFunction {
private static final Logger LOG =
LoggerFactory.getLogger(AuronKafkaSourceFunction.class);
-
private final LogicalType outputType;
private final String auronOperatorId;
private final String topic;
@@ -60,9 +79,22 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
private final int bufferSize;
private final String startupMode;
private transient PhysicalPlanNode physicalPlanNode;
+ // Flink Checkpoint-related, compatible with Flink Kafka Legacy source
+ /** State name of the consumer's partition offset states. */
+ private static final String OFFSETS_STATE_NAME =
"topic-partition-offset-states";
+
+ private transient ListState<Tuple2<KafkaTopicPartition, Long>>
unionOffsetStates;
+ /** Data for pending but uncommitted offsets. */
+ private transient LinkedMap pendingOffsetsToCommit;
+
+ private transient Map<Integer, Long> restoredOffsets;
+ private transient Map<Integer, Long> currentOffsets;
+ private final SerializableObject lock = new SerializableObject();
+
private volatile boolean isRunning;
private transient String auronOperatorIdWithSubtaskIndex;
private transient MetricNode nativeMetric;
+ private transient ObjectMapper mapper;
public AuronKafkaSourceFunction(
LogicalType outputType,
@@ -91,7 +123,7 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
scanExecNode.setKafkaTopic(this.topic);
scanExecNode.setKafkaPropertiesJson(this.kafkaPropertiesJson);
scanExecNode.setDataFormat(KafkaFormat.valueOf(this.format.toUpperCase(Locale.ROOT)));
- ObjectMapper mapper = new ObjectMapper();
+ mapper = new ObjectMapper();
scanExecNode.setFormatConfigJson(mapper.writeValueAsString(formatConfig));
scanExecNode.setBatchSize(this.bufferSize);
if
(this.format.equalsIgnoreCase(KafkaConstants.KAFKA_FORMAT_PROTOBUF)) {
@@ -103,10 +135,10 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
if (new File(pwd).exists()) {
File descFile = new File(pwd + "/" + pbDescFileName);
if (!descFile.exists()) {
- LOG.info("Auron kafka source writer pb desc file: " +
pbDescFileName);
+ LOG.info("Auron kafka source writer pb desc file: {}",
pbDescFileName);
FileUtils.copyInputStreamToFile(in, descFile);
} else {
- LOG.warn("Auron kafka source pb desc file already exist,
skip copy " + pbDescFileName);
+ LOG.warn("Auron kafka source pb desc file already exist,
skip copy {}", pbDescFileName);
}
} else {
throw new RuntimeException("PWD is not exist");
@@ -120,6 +152,22 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
scanExecNode.setStartupMode(KafkaStartupMode.valueOf(startupMode));
sourcePlan.setKafkaScan(scanExecNode.build());
this.physicalPlanNode = sourcePlan.build();
+
+ StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext)
getRuntimeContext();
+ boolean enableCheckpoint = runtimeContext.isCheckpointingEnabled();
+ Map<String, Object> auronRuntimeInfo = new HashMap<>();
+ auronRuntimeInfo.put("subtask_index",
runtimeContext.getIndexOfThisSubtask());
+ auronRuntimeInfo.put("num_readers",
runtimeContext.getNumberOfParallelSubtasks());
+ auronRuntimeInfo.put("enable_checkpoint", enableCheckpoint);
+ auronRuntimeInfo.put("restored_offsets", restoredOffsets);
+ JniBridge.putResource(auronOperatorIdWithSubtaskIndex,
mapper.writeValueAsString(auronRuntimeInfo));
+ this.isRunning = true;
+ LOG.info(
+ "Auron kafka source init successful, Auron operator id: {},
enableCheckpoint is {}",
+ auronOperatorIdWithSubtaskIndex,
+ enableCheckpoint);
+ currentOffsets = new HashMap<>();
+ pendingOffsetsToCommit = new LinkedMap();
this.isRunning = true;
}
@@ -129,7 +177,7 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
@Override
public void add(String name, long value) {
// TODO Integration with Flink metrics
- LOG.info(String.format("Metric Auron Source: %s = %s", name,
value));
+ LOG.info("Metric Auron Source: {} = {}", name, value);
}
};
List<RowType.RowField> fieldList = new LinkedList<>();
@@ -150,10 +198,17 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
.getAuronConfiguration()
.getLong(FlinkAuronConfiguration.NATIVE_MEMORY_SIZE));
while (wrapper.loadNextBatch(batch -> {
+ Map<Integer, Long> tmpOffsets = new HashMap<>(currentOffsets);
FlinkArrowReader arrowReader = FlinkArrowReader.create(batch,
auronOutputRowType, 3);
for (int i = 0; i < batch.getRowCount(); i++) {
+ AuronColumnarRowData tmpRowData = (AuronColumnarRowData)
arrowReader.read(i);
+ // update kafka partition and offsets
+ tmpOffsets.put(tmpRowData.getInt(-3),
tmpRowData.getLong(-2));
sourceContext.collect(arrowReader.read(i));
}
+ synchronized (lock) {
+ currentOffsets = tmpOffsets;
+ }
})) {}
;
}
@@ -184,4 +239,83 @@ public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData
public MetricNode getMetricNode() {
return nativeMetric;
}
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ try {
+ final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
+ if (posInMap == -1) {
+ LOG.debug(
+ "Consumer subtask {} received confirmation for unknown
checkpoint id {}",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ checkpointId);
+ return;
+ }
+
+ @SuppressWarnings("unchecked")
+ Map<Integer, Long> offsets = (Map<Integer, Long>)
pendingOffsetsToCommit.remove(posInMap);
+
+ // remove older checkpoints in map
+ for (int i = 0; i < posInMap; i++) {
+ pendingOffsetsToCommit.remove(0);
+ }
+
+ int subTaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+ if (offsets == null || offsets.size() == 0) {
+ LOG.info("Consumer subtask {} has empty checkpoint state.",
subTaskIndex);
+ return;
+ }
+ String commitOffsetsKey = auronOperatorIdWithSubtaskIndex +
"-offsets2commit";
+ LOG.info(
+ "Subtask {} commit [{}] offsets for checkpoint: {},
offsets: {}",
+ subTaskIndex,
+ commitOffsetsKey,
+ checkpointId,
+ offsets);
+ JniBridge.putResource(commitOffsetsKey,
mapper.writeValueAsString(offsets));
+ } catch (Exception e) {
+ LOG.error("NotifyCheckpointComplete error: ", e);
+ if (isRunning) {
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws
Exception {
+ if (!isRunning) {
+ LOG.warn("Auron kafka source is not running, skip snapshot state");
+ } else {
+ Map<Integer, Long> copyCurrentOffsets;
+ synchronized (lock) {
+ // copy offsets, ensure that the corresponding offset has been
dispatched to downstream.
+ copyCurrentOffsets = new HashMap<>(currentOffsets);
+ }
+ pendingOffsetsToCommit.put(context.getCheckpointId(),
copyCurrentOffsets);
+ for (Map.Entry<Integer, Long> offset :
copyCurrentOffsets.entrySet()) {
+ unionOffsetStates.add(Tuple2.of(new KafkaTopicPartition(topic,
offset.getKey()), offset.getValue()));
+ }
+ LOG.info(
+ "snapshotState for checkpointId: {}, currentOffsets: {}",
+ context.getCheckpointId(),
+ copyCurrentOffsets);
+ }
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws
Exception {
+ OperatorStateStore stateStore = context.getOperatorStateStore();
+ this.unionOffsetStates = stateStore.getUnionListState(new
ListStateDescriptor<>(
+ OFFSETS_STATE_NAME, TypeInformation.of(new
TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));
+ this.restoredOffsets = new HashMap<>();
+ if (context.isRestored()) {
+ for (Tuple2<KafkaTopicPartition, Long>
kafkaTopicPartitionOffsetEntry : unionOffsetStates.get()) {
+ restoredOffsets.put(
+ kafkaTopicPartitionOffsetEntry.f0.getPartition(),
kafkaTopicPartitionOffsetEntry.f1);
+ }
+ LOG.info("Restore from state, restoredOffsets: {}",
restoredOffsets);
+ } else {
+ LOG.info("Not restore from state.");
+ }
+ }
}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
new file mode 100644
index 00000000..00ec4085
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Auron to maintain compatibility with Flink's existing legacy kafka state,
this class implementation was copied.
+ * Flink's description of a partition in a Kafka topic. Serializable, and
common across all Kafka
+ * consumer subclasses (0.8, 0.9, ...)
+ *
+ * <p>Note: This class must not change in its structure, because it would
change the serialization
+ * format and make previous savepoints unreadable.
+ */
+@PublicEvolving
+public final class KafkaTopicPartition implements Serializable {
+
+ /**
+ * THIS SERIAL VERSION UID MUST NOT CHANGE, BECAUSE IT WOULD BREAK READING
OLD SERIALIZED
+ * INSTANCES FROM SAVEPOINTS.
+ */
+ private static final long serialVersionUID = 722083576322742325L;
+
+ // ------------------------------------------------------------------------
+
+ private final String topic;
+ private final int partition;
+ private final int cachedHash;
+
+ public KafkaTopicPartition(String topic, int partition) {
+ this.topic = requireNonNull(topic);
+ this.partition = partition;
+ this.cachedHash = 31 * topic.hashCode() + partition;
+ }
+
+ // ------------------------------------------------------------------------
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "KafkaTopicPartition{" + "topic='" + topic + '\'' + ",
partition=" + partition + '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (o instanceof KafkaTopicPartition) {
+ KafkaTopicPartition that = (KafkaTopicPartition) o;
+ return this.partition == that.partition &&
this.topic.equals(that.topic);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return cachedHash;
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ public static String toString(Map<KafkaTopicPartition, Long> map) {
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<KafkaTopicPartition, Long> p : map.entrySet()) {
+ KafkaTopicPartition ktp = p.getKey();
+ sb.append(ktp.getTopic())
+ .append(":")
+ .append(ktp.getPartition())
+ .append("=")
+ .append(p.getValue())
+ .append(", ");
+ }
+ return sb.toString();
+ }
+
+ public static String toString(List<KafkaTopicPartition> partitions) {
+ StringBuilder sb = new StringBuilder();
+ for (KafkaTopicPartition p : partitions) {
+
sb.append(p.getTopic()).append(":").append(p.getPartition()).append(", ");
+ }
+ return sb.toString();
+ }
+
+ /** A {@link java.util.Comparator} for {@link KafkaTopicPartition}s. */
+ public static class Comparator implements
java.util.Comparator<KafkaTopicPartition> {
+ @Override
+ public int compare(KafkaTopicPartition p1, KafkaTopicPartition p2) {
+ if (!p1.getTopic().equals(p2.getTopic())) {
+ return p1.getTopic().compareTo(p2.getTopic());
+ } else {
+ return Integer.compare(p1.getPartition(), p2.getPartition());
+ }
+ }
+ }
+}
diff --git a/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
b/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
index 0840b3d7..aa43e598 100644
--- a/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
@@ -38,7 +38,7 @@ use once_cell::sync::OnceCell;
use rdkafka::{
ClientConfig, ClientContext, Offset, TopicPartitionList,
config::RDKafkaLogLevel,
- consumer::{Consumer, ConsumerContext, Rebalance, StreamConsumer},
+ consumer::{CommitMode, Consumer, ConsumerContext, Rebalance,
StreamConsumer},
error::KafkaResult,
};
use sonic_rs::{JsonContainerTrait, JsonValueTrait};
@@ -245,6 +245,13 @@ fn read_serialized_records_from_kafka(
.get("subtask_index")
.as_i64()
.expect("subtask_index is not valid json") as i32;
+ let enable_checkpoint = task_json
+ .get("enable_checkpoint")
+ .as_bool()
+ .expect("enable_checkpoint is not valid json");
+ let restored_offsets = task_json
+ .get("restored_offsets")
+ .expect("restored_offsets is not valid json");
let kafka_properties =
sonic_rs::from_str::<sonic_rs::Value>(&kafka_properties_json)
.expect("kafka_properties_json is not valid json");
let mut config = ClientConfig::new();
@@ -261,7 +268,12 @@ fn read_serialized_records_from_kafka(
.expect("kafka property value is not valid json string"),
);
});
-
+ if enable_checkpoint {
+ config.set("enable.auto.commit", "false");
+ } else {
+ config.set("enable.auto.commit", "true");
+ }
+ log::info!("Subtask {subtask_index} consumed kafka config is {config:?}");
let consumer: Arc<LoggingConsumer> = Arc::new(
config
.create_with_context(context)
@@ -314,7 +326,16 @@ fn read_serialized_records_from_kafka(
log::info!("Subtask {subtask_index} consumed partitions {partitions:?}");
let mut partition_list =
TopicPartitionList::with_capacity(partitions.len());
for partition in partitions.iter() {
- partition_list.add_partition_offset(&kafka_topic, *partition, offset);
+ let partition_key = partition.to_string();
+ let partition_offset = if let Some(val) =
restored_offsets.get(&partition_key) {
+ Offset::Offset(
+ val.as_i64()
+ .expect("restored_offsets value is not valid json"),
+ )
+ } else {
+ offset
+ };
+ partition_list.add_partition_offset(&kafka_topic, *partition,
partition_offset);
}
consumer
.assign(&partition_list)
@@ -360,6 +381,43 @@ fn read_serialized_records_from_kafka(
],
)?;
sender.send(batch).await;
+ if enable_checkpoint {
+ // if checkpoint is enabled, commit offsets to kafka
+ let offset_to_commit = auron_operator_id.clone() +
"-offsets2commit";
+ let resource_id = jni_new_string!(&offset_to_commit)?;
+ let java_json_str =
+
jni_call_static!(JniBridge.getResource(resource_id.as_obj()) -> JObject)?;
+ if !java_json_str.0.is_null() {
+ let offset_json_str =
jni_get_string!(java_json_str.as_obj().into())
+ .expect("java_json_str is not valid java string");
+ let offsets_to_commit =
+
sonic_rs::from_str::<sonic_rs::Value>(&offset_json_str)
+ .expect("offset_json_str is not valid json");
+ let mut partition_list =
TopicPartitionList::with_capacity(
+ offsets_to_commit
+ .as_object()
+ .expect("offsets_to_commit is not valid json")
+ .len(),
+ );
+ if let Some(obj) = offsets_to_commit.as_object() {
+ if !obj.is_empty() {
+ for (partition, offset) in obj {
+ partition_list.add_partition_offset(
+ &kafka_topic,
+ partition
+ .parse::<i32>()
+ .expect("partition is not valid
json"),
+ Offset::Offset(
+ offset.as_i64().expect("offset is
not valid json"),
+ ),
+ );
+ }
+ log::info!("auron consumer to commit offset:
{partition_list:?}");
+ consumer.commit(&partition_list,
CommitMode::Async);
+ }
+ }
+ }
+ }
}
}))
}