This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new ebf14908 [AURON #2061] AuronKafkaSourceFunction support flink 
checkpoint (#2084)
ebf14908 is described below

commit ebf14908d036d9631a76c9dc078ea7259b98f55f
Author: zhangmang <[email protected]>
AuthorDate: Wed Mar 11 16:50:12 2026 +0800

    [AURON #2061] AuronKafkaSourceFunction support flink checkpoint (#2084)
    
    # Which issue does this PR close?
    
    Closes #2061
    
    # Rationale for this change
    * Auron kafka source support flink checkpoint
    
    # What changes are included in this PR?
    * modify AuronKafkaSourceFunction add flink checkpoint interface
    * modify kafka_scan_exec.rs add offset restore and commit mechanism
    * copy from KafkaTopicPartition, flink state compatibility requirements
    
    # Are there any user-facing changes?
    * No
    
    # How was this patch tested?
    * There is currently no Kafka environment integration, so automated
    testing is not possible.
---
 .../connector/kafka/AuronKafkaSourceFunction.java  | 146 ++++++++++++++++++++-
 .../kafka/internals/KafkaTopicPartition.java       | 126 ++++++++++++++++++
 .../src/flink/kafka_scan_exec.rs                   |  64 ++++++++-
 3 files changed, 327 insertions(+), 9 deletions(-)

diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
index 369f4b1d..569e63a4 100644
--- 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
@@ -25,32 +25,51 @@ import org.apache.auron.flink.arrow.FlinkArrowReader;
 import org.apache.auron.flink.arrow.FlinkArrowUtils;
 import org.apache.auron.flink.configuration.FlinkAuronConfiguration;
 import org.apache.auron.flink.runtime.operator.FlinkAuronFunction;
+import org.apache.auron.flink.table.data.AuronColumnarRowData;
 import org.apache.auron.flink.utils.SchemaConverters;
 import org.apache.auron.jni.AuronAdaptor;
 import org.apache.auron.jni.AuronCallNativeWrapper;
+import org.apache.auron.jni.JniBridge;
 import org.apache.auron.metric.MetricNode;
 import org.apache.auron.protobuf.KafkaFormat;
 import org.apache.auron.protobuf.KafkaScanExecNode;
 import org.apache.auron.protobuf.KafkaStartupMode;
 import org.apache.auron.protobuf.PhysicalPlanNode;
+import org.apache.commons.collections.map.LinkedMap;
 import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.SerializableObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Auron Kafka source function.
+ * Only support AT-LEAST ONCE semantics.
+ * If checkpoints are enabled, Kafka offsets are committed via Auron after a 
successful checkpoint.
+ * If checkpoints are disabled, Kafka offsets are committed periodically via 
Auron.
  */
-public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData> implements FlinkAuronFunction {
+public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData>
+        implements FlinkAuronFunction, CheckpointListener, 
CheckpointedFunction {
     private static final Logger LOG = 
LoggerFactory.getLogger(AuronKafkaSourceFunction.class);
-
     private final LogicalType outputType;
     private final String auronOperatorId;
     private final String topic;
@@ -60,9 +79,22 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
     private final int bufferSize;
     private final String startupMode;
     private transient PhysicalPlanNode physicalPlanNode;
+    // Flink Checkpoint-related, compatible with Flink Kafka Legacy source
+    /** State name of the consumer's partition offset states. */
+    private static final String OFFSETS_STATE_NAME = 
"topic-partition-offset-states";
+
+    private transient ListState<Tuple2<KafkaTopicPartition, Long>> 
unionOffsetStates;
+    /** Data for pending but uncommitted offsets. */
+    private transient LinkedMap pendingOffsetsToCommit;
+
+    private transient Map<Integer, Long> restoredOffsets;
+    private transient Map<Integer, Long> currentOffsets;
+    private final SerializableObject lock = new SerializableObject();
+
     private volatile boolean isRunning;
     private transient String auronOperatorIdWithSubtaskIndex;
     private transient MetricNode nativeMetric;
+    private transient ObjectMapper mapper;
 
     public AuronKafkaSourceFunction(
             LogicalType outputType,
@@ -91,7 +123,7 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
         scanExecNode.setKafkaTopic(this.topic);
         scanExecNode.setKafkaPropertiesJson(this.kafkaPropertiesJson);
         
scanExecNode.setDataFormat(KafkaFormat.valueOf(this.format.toUpperCase(Locale.ROOT)));
-        ObjectMapper mapper = new ObjectMapper();
+        mapper = new ObjectMapper();
         
scanExecNode.setFormatConfigJson(mapper.writeValueAsString(formatConfig));
         scanExecNode.setBatchSize(this.bufferSize);
         if 
(this.format.equalsIgnoreCase(KafkaConstants.KAFKA_FORMAT_PROTOBUF)) {
@@ -103,10 +135,10 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
             if (new File(pwd).exists()) {
                 File descFile = new File(pwd + "/" + pbDescFileName);
                 if (!descFile.exists()) {
-                    LOG.info("Auron kafka source writer pb desc file: " + 
pbDescFileName);
+                    LOG.info("Auron kafka source writer pb desc file: {}", 
pbDescFileName);
                     FileUtils.copyInputStreamToFile(in, descFile);
                 } else {
-                    LOG.warn("Auron kafka source pb desc file already exist, 
skip copy " + pbDescFileName);
+                    LOG.warn("Auron kafka source pb desc file already exist, 
skip copy {}", pbDescFileName);
                 }
             } else {
                 throw new RuntimeException("PWD is not exist");
@@ -120,6 +152,22 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
         scanExecNode.setStartupMode(KafkaStartupMode.valueOf(startupMode));
         sourcePlan.setKafkaScan(scanExecNode.build());
         this.physicalPlanNode = sourcePlan.build();
+
+        StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) 
getRuntimeContext();
+        boolean enableCheckpoint = runtimeContext.isCheckpointingEnabled();
+        Map<String, Object> auronRuntimeInfo = new HashMap<>();
+        auronRuntimeInfo.put("subtask_index", 
runtimeContext.getIndexOfThisSubtask());
+        auronRuntimeInfo.put("num_readers", 
runtimeContext.getNumberOfParallelSubtasks());
+        auronRuntimeInfo.put("enable_checkpoint", enableCheckpoint);
+        auronRuntimeInfo.put("restored_offsets", restoredOffsets);
+        JniBridge.putResource(auronOperatorIdWithSubtaskIndex, 
mapper.writeValueAsString(auronRuntimeInfo));
+        this.isRunning = true;
+        LOG.info(
+                "Auron kafka source init successful, Auron operator id: {}, 
enableCheckpoint is {}",
+                auronOperatorIdWithSubtaskIndex,
+                enableCheckpoint);
+        currentOffsets = new HashMap<>();
+        pendingOffsetsToCommit = new LinkedMap();
         this.isRunning = true;
     }
 
@@ -129,7 +177,7 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
             @Override
             public void add(String name, long value) {
                 // TODO Integration with Flink metrics
-                LOG.info(String.format("Metric Auron Source: %s = %s", name, 
value));
+                LOG.info("Metric Auron Source: {} = {}", name, value);
             }
         };
         List<RowType.RowField> fieldList = new LinkedList<>();
@@ -150,10 +198,17 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
                             .getAuronConfiguration()
                             
.getLong(FlinkAuronConfiguration.NATIVE_MEMORY_SIZE));
             while (wrapper.loadNextBatch(batch -> {
+                Map<Integer, Long> tmpOffsets = new HashMap<>(currentOffsets);
                 FlinkArrowReader arrowReader = FlinkArrowReader.create(batch, 
auronOutputRowType, 3);
                 for (int i = 0; i < batch.getRowCount(); i++) {
+                    AuronColumnarRowData tmpRowData = (AuronColumnarRowData) 
arrowReader.read(i);
+                    // update kafka partition and offsets
+                    tmpOffsets.put(tmpRowData.getInt(-3), 
tmpRowData.getLong(-2));
                     sourceContext.collect(arrowReader.read(i));
                 }
+                synchronized (lock) {
+                    currentOffsets = tmpOffsets;
+                }
             })) {}
             ;
         }
@@ -184,4 +239,83 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
     public MetricNode getMetricNode() {
         return nativeMetric;
     }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        try {
+            final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
+            if (posInMap == -1) {
+                LOG.debug(
+                        "Consumer subtask {} received confirmation for unknown 
checkpoint id {}",
+                        getRuntimeContext().getIndexOfThisSubtask(),
+                        checkpointId);
+                return;
+            }
+
+            @SuppressWarnings("unchecked")
+            Map<Integer, Long> offsets = (Map<Integer, Long>) 
pendingOffsetsToCommit.remove(posInMap);
+
+            // remove older checkpoints in map
+            for (int i = 0; i < posInMap; i++) {
+                pendingOffsetsToCommit.remove(0);
+            }
+
+            int subTaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+            if (offsets == null || offsets.size() == 0) {
+                LOG.info("Consumer subtask {} has empty checkpoint state.", 
subTaskIndex);
+                return;
+            }
+            String commitOffsetsKey = auronOperatorIdWithSubtaskIndex + 
"-offsets2commit";
+            LOG.info(
+                    "Subtask {} commit [{}] offsets for checkpoint: {}, 
offsets: {}",
+                    subTaskIndex,
+                    commitOffsetsKey,
+                    checkpointId,
+                    offsets);
+            JniBridge.putResource(commitOffsetsKey, 
mapper.writeValueAsString(offsets));
+        } catch (Exception e) {
+            LOG.error("NotifyCheckpointComplete error: ", e);
+            if (isRunning) {
+                throw e;
+            }
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+        if (!isRunning) {
+            LOG.warn("Auron kafka source is not running, skip snapshot state");
+        } else {
+            Map<Integer, Long> copyCurrentOffsets;
+            synchronized (lock) {
+                // copy offsets, ensure that the corresponding offset has been 
dispatched to downstream.
+                copyCurrentOffsets = new HashMap<>(currentOffsets);
+            }
+            pendingOffsetsToCommit.put(context.getCheckpointId(), 
copyCurrentOffsets);
+            for (Map.Entry<Integer, Long> offset : 
copyCurrentOffsets.entrySet()) {
+                unionOffsetStates.add(Tuple2.of(new KafkaTopicPartition(topic, 
offset.getKey()), offset.getValue()));
+            }
+            LOG.info(
+                    "snapshotState for checkpointId: {}, currentOffsets: {}",
+                    context.getCheckpointId(),
+                    copyCurrentOffsets);
+        }
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        OperatorStateStore stateStore = context.getOperatorStateStore();
+        this.unionOffsetStates = stateStore.getUnionListState(new 
ListStateDescriptor<>(
+                OFFSETS_STATE_NAME, TypeInformation.of(new 
TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));
+        this.restoredOffsets = new HashMap<>();
+        if (context.isRestored()) {
+            for (Tuple2<KafkaTopicPartition, Long> 
kafkaTopicPartitionOffsetEntry : unionOffsetStates.get()) {
+                restoredOffsets.put(
+                        kafkaTopicPartitionOffsetEntry.f0.getPartition(), 
kafkaTopicPartitionOffsetEntry.f1);
+            }
+            LOG.info("Restore from state, restoredOffsets: {}", 
restoredOffsets);
+        } else {
+            LOG.info("Not restore from state.");
+        }
+    }
 }
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
new file mode 100644
index 00000000..00ec4085
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Auron to maintain compatibility with Flink's existing legacy kafka state, 
this class implementation was copied.
+ * Flink's description of a partition in a Kafka topic. Serializable, and 
common across all Kafka
+ * consumer subclasses (0.8, 0.9, ...)
+ *
+ * <p>Note: This class must not change in its structure, because it would 
change the serialization
+ * format and make previous savepoints unreadable.
+ */
+@PublicEvolving
+public final class KafkaTopicPartition implements Serializable {
+
+    /**
+     * THIS SERIAL VERSION UID MUST NOT CHANGE, BECAUSE IT WOULD BREAK READING 
OLD SERIALIZED
+     * INSTANCES FROM SAVEPOINTS.
+     */
+    private static final long serialVersionUID = 722083576322742325L;
+
+    // ------------------------------------------------------------------------
+
+    private final String topic;
+    private final int partition;
+    private final int cachedHash;
+
+    public KafkaTopicPartition(String topic, int partition) {
+        this.topic = requireNonNull(topic);
+        this.partition = partition;
+        this.cachedHash = 31 * topic.hashCode() + partition;
+    }
+
+    // ------------------------------------------------------------------------
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    // ------------------------------------------------------------------------
+
+    @Override
+    public String toString() {
+        return "KafkaTopicPartition{" + "topic='" + topic + '\'' + ", 
partition=" + partition + '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        } else if (o instanceof KafkaTopicPartition) {
+            KafkaTopicPartition that = (KafkaTopicPartition) o;
+            return this.partition == that.partition && 
this.topic.equals(that.topic);
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return cachedHash;
+    }
+
+    // ------------------------------------------------------------------------
+    //  Utilities
+    // ------------------------------------------------------------------------
+
+    public static String toString(Map<KafkaTopicPartition, Long> map) {
+        StringBuilder sb = new StringBuilder();
+        for (Map.Entry<KafkaTopicPartition, Long> p : map.entrySet()) {
+            KafkaTopicPartition ktp = p.getKey();
+            sb.append(ktp.getTopic())
+                    .append(":")
+                    .append(ktp.getPartition())
+                    .append("=")
+                    .append(p.getValue())
+                    .append(", ");
+        }
+        return sb.toString();
+    }
+
+    public static String toString(List<KafkaTopicPartition> partitions) {
+        StringBuilder sb = new StringBuilder();
+        for (KafkaTopicPartition p : partitions) {
+            
sb.append(p.getTopic()).append(":").append(p.getPartition()).append(", ");
+        }
+        return sb.toString();
+    }
+
+    /** A {@link java.util.Comparator} for {@link KafkaTopicPartition}s. */
+    public static class Comparator implements 
java.util.Comparator<KafkaTopicPartition> {
+        @Override
+        public int compare(KafkaTopicPartition p1, KafkaTopicPartition p2) {
+            if (!p1.getTopic().equals(p2.getTopic())) {
+                return p1.getTopic().compareTo(p2.getTopic());
+            } else {
+                return Integer.compare(p1.getPartition(), p2.getPartition());
+            }
+        }
+    }
+}
diff --git a/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs 
b/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
index 0840b3d7..aa43e598 100644
--- a/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
@@ -38,7 +38,7 @@ use once_cell::sync::OnceCell;
 use rdkafka::{
     ClientConfig, ClientContext, Offset, TopicPartitionList,
     config::RDKafkaLogLevel,
-    consumer::{Consumer, ConsumerContext, Rebalance, StreamConsumer},
+    consumer::{CommitMode, Consumer, ConsumerContext, Rebalance, 
StreamConsumer},
     error::KafkaResult,
 };
 use sonic_rs::{JsonContainerTrait, JsonValueTrait};
@@ -245,6 +245,13 @@ fn read_serialized_records_from_kafka(
         .get("subtask_index")
         .as_i64()
         .expect("subtask_index is not valid json") as i32;
+    let enable_checkpoint = task_json
+        .get("enable_checkpoint")
+        .as_bool()
+        .expect("enable_checkpoint is not valid json");
+    let restored_offsets = task_json
+        .get("restored_offsets")
+        .expect("restored_offsets is not valid json");
     let kafka_properties = 
sonic_rs::from_str::<sonic_rs::Value>(&kafka_properties_json)
         .expect("kafka_properties_json is not valid json");
     let mut config = ClientConfig::new();
@@ -261,7 +268,12 @@ fn read_serialized_records_from_kafka(
                     .expect("kafka property value is not valid json string"),
             );
         });
-
+    if enable_checkpoint {
+        config.set("enable.auto.commit", "false");
+    } else {
+        config.set("enable.auto.commit", "true");
+    }
+    log::info!("Subtask {subtask_index} consumed kafka config is {config:?}");
     let consumer: Arc<LoggingConsumer> = Arc::new(
         config
             .create_with_context(context)
@@ -314,7 +326,16 @@ fn read_serialized_records_from_kafka(
     log::info!("Subtask {subtask_index} consumed partitions {partitions:?}");
     let mut partition_list = 
TopicPartitionList::with_capacity(partitions.len());
     for partition in partitions.iter() {
-        partition_list.add_partition_offset(&kafka_topic, *partition, offset);
+        let partition_key = partition.to_string();
+        let partition_offset = if let Some(val) = 
restored_offsets.get(&partition_key) {
+            Offset::Offset(
+                val.as_i64()
+                    .expect("restored_offsets value is not valid json"),
+            )
+        } else {
+            offset
+        };
+        partition_list.add_partition_offset(&kafka_topic, *partition, 
partition_offset);
     }
     consumer
         .assign(&partition_list)
@@ -360,6 +381,43 @@ fn read_serialized_records_from_kafka(
                     ],
                 )?;
                 sender.send(batch).await;
+                if enable_checkpoint {
+                    // if checkpoint is enabled, commit offsets to kafka
+                    let offset_to_commit = auron_operator_id.clone() + 
"-offsets2commit";
+                    let resource_id = jni_new_string!(&offset_to_commit)?;
+                    let java_json_str =
+                        
jni_call_static!(JniBridge.getResource(resource_id.as_obj()) -> JObject)?;
+                    if !java_json_str.0.is_null() {
+                        let offset_json_str = 
jni_get_string!(java_json_str.as_obj().into())
+                            .expect("java_json_str is not valid java string");
+                        let offsets_to_commit =
+                            
sonic_rs::from_str::<sonic_rs::Value>(&offset_json_str)
+                                .expect("offset_json_str is not valid json");
+                        let mut partition_list = 
TopicPartitionList::with_capacity(
+                            offsets_to_commit
+                                .as_object()
+                                .expect("offsets_to_commit is not valid json")
+                                .len(),
+                        );
+                        if let Some(obj) = offsets_to_commit.as_object() {
+                            if !obj.is_empty() {
+                                for (partition, offset) in obj {
+                                    partition_list.add_partition_offset(
+                                        &kafka_topic,
+                                        partition
+                                            .parse::<i32>()
+                                            .expect("partition is not valid 
json"),
+                                        Offset::Offset(
+                                            offset.as_i64().expect("offset is 
not valid json"),
+                                        ),
+                                    );
+                                }
+                                log::info!("auron consumer to commit offset: 
{partition_list:?}");
+                                consumer.commit(&partition_list, 
CommitMode::Async);
+                            }
+                        }
+                    }
+                }
             }
         }))
 }

Reply via email to