Siyun Liang created SPARK-49069:
-----------------------------------

             Summary: Spark Structured Streaming unable to send back offset to 
Kafka consumer group
                 Key: SPARK-49069
                 URL: https://issues.apache.org/jira/browse/SPARK-49069
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 3.5.1
         Environment: Spark Version: 3.5.1

Kafka Version: 0.11+

Spark Kafka package: org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1
            Reporter: Siyun Liang


h3. Summary

Unable to commit Kafka offsets back to Kafka consumer group when using Spark 
Structured Streaming.
h3. Description

I am using Spark Structured Streaming to read data from a Kafka topic, process 
it, and write the results to the console. While the streaming job reads and 
processes data from Kafka correctly, it fails to commit the offsets back to the 
Kafka consumer group. As a result, Kafka does not recognize where the consumer 
left off, causing potential reprocessing of messages.
h3. Configuration

Here is the relevant part of my Spark application configuration:
{code:java}
spark = (
        SparkSession.builder
        .appName("Test-KafkaSparkStructuredStreaming")
        .config("spark.ui.enabled", "true")
        .config("spark.ui.port", test_port)
        .config("spark.jars.packages", 
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1")
        .config("spark.dynamicAllocation.enabled", "true")
        .config("spark.dynamicAllocation.minExecutors", "1")
        .config("spark.dynamicAllocation.maxExecutors", "1")
        .config("spark.kafka.consumer.commit.offsets.on.checkpoint", "true")
        .getOrCreate()
    )

df = (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
        .option("subscribe", kafka_topic)
        .option("auto.offset.reset", "earliest")
        .option("group.id", "Test_group")
        .load()
    )

# Spark Streaming Process code ...

query = (
        result_df.writeStream.outputMode("append")
        .format("console")
        .option(
            "checkpointLocation",
            "spark-checkpoint",
        )
        .start()
    )

query.awaitTermination()

spark.stop(){code}
h3. Steps to Reproduce
 # Set up a Kafka topic with data.
 # Configure a Spark Structured Streaming job to read from the Kafka topic 
using the above configuration.
 # Start the streaming job.
 # Observe that data is read and processed correctly, but offsets are not 
committed back to Kafka.

h3. Expected Behavior

Kafka offsets should be committed back to the Kafka consumer group, allowing 
Kafka to track the last consumed offsets.
h3. Actual Behavior

Kafka offsets are not committed back to the Kafka consumer group. As a result, 
Kafka does not recognize where the consumer left off, potentially causing 
reprocessing of messages.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to