[ 
https://issues.apache.org/jira/browse/SPARK-49069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Siyun Liang updated SPARK-49069:
--------------------------------
    Summary: Kafka Offsets Not Committed to Consumer Group in Spark Structured 
Streaming  (was: Spark Structured Streaming unable to send back offset to Kafka 
consumer group)

> Kafka Offsets Not Committed to Consumer Group in Spark Structured Streaming
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-49069
>                 URL: https://issues.apache.org/jira/browse/SPARK-49069
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.5.1
>         Environment: Spark Version: 3.5.1
> Kafka Version: 0.11+
> Spark Kafka package: org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1
>            Reporter: Siyun Liang
>            Priority: Major
>              Labels: patch
>
> h3. Summary
> Unable to commit Kafka offsets back to Kafka consumer group when using Spark 
> Structured Streaming.
> h3. Description
> I am using Spark Structured Streaming to read data from a Kafka topic, 
> process it, and write the results to the console. While the streaming job 
> reads and processes data from Kafka correctly, it fails to commit the offsets 
> back to the Kafka consumer group. As a result, Kafka does not recognize where 
> the consumer left off, causing potential reprocessing of messages.
> h3. Configuration
> Here is the relevant part of my Spark application configuration:
> {code:java}
> spark = (
>         SparkSession.builder
>         .appName("Test-KafkaSparkStructuredStreaming")
>         .config("spark.ui.enabled", "true")
>         .config("spark.ui.port", test_port)
>         .config("spark.jars.packages", 
> "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1")
>         .config("spark.dynamicAllocation.enabled", "true")
>         .config("spark.dynamicAllocation.minExecutors", "1")
>         .config("spark.dynamicAllocation.maxExecutors", "1")
>         .config("spark.kafka.consumer.commit.offsets.on.checkpoint", "true")
>         .getOrCreate()
>     )
> df = (
>         spark.readStream.format("kafka")
>         .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
>         .option("subscribe", kafka_topic)
>         .option("auto.offset.reset", "earliest")
>         .option("group.id", "Test_group")
>         .load()
>     )
> # Spark Streaming Process code ...
> query = (
>         result_df.writeStream.outputMode("append")
>         .format("console")
>         .option(
>             "checkpointLocation",
>             "spark-checkpoint",
>         )
>         .start()
>     )
> query.awaitTermination()
> spark.stop(){code}
> h3. Steps to Reproduce
>  # Set up a Kafka topic with data.
>  # Configure a Spark Structured Streaming job to read from the Kafka topic 
> using the above configuration.
>  # Start the streaming job.
>  # Observe that data is read and processed correctly, but offsets are not 
> committed back to Kafka.
> h3. Expected Behavior
> Kafka offsets should be committed back to the Kafka consumer group, allowing 
> Kafka to track the last consumed offsets.
> h3. Actual Behavior
> Kafka offsets are not committed back to the Kafka consumer group. As a 
> result, Kafka does not recognize where the consumer left off, potentially 
> causing reprocessing of messages.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to