[
https://issues.apache.org/jira/browse/SPARK-44774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Maxim Martynov updated SPARK-44774:
-----------------------------------
Description:
I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but
when topic exists it does not raise exception. Instead it appends data to a
topic.
Steps to reproduce:
1. Start Kafka:
docker-compose.yml
{code:yaml}
version: '3.9'
services:
zookeeper:
image: bitnami/zookeeper:3.8
environment:
ALLOW_ANONYMOUS_LOGIN: 'yes'
kafka:
image: bitnami/kafka:latest
restart: unless-stopped
ports:
- 9093:9093
environment:
ALLOW_PLAINTEXT_LISTENER: 'yes'
KAFKA_ENABLE_KRAFT: 'no'
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS
KAFKA_CFG_LISTENERS:
INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093
KAFKA_CFG_ADVERTISED_LISTENERS:
INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:
INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
depends_on:
- zookeeper
{code}
{code:bash}
docker-compose up -d
{code}
2. Start Spark session:
{code:bash}
pip install pyspark[sql]==3.4.1
{code}
{code:python}
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.jars.packages",
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate()
{code}
3. Create DataFrame and write it to Kafka. First write using {{mode="append"}}
to create topic, then with {{mode="error"}} to raise because topic already
exist:
{code}
df = spark.createDataFrame([{"value": "string"}])
df.write.format("kafka").option("kafka.bootstrap.servers",
"localhost:9093").option("topic", "new_topic").mode("append").save()
# no exception is raised
df.write.format("kafka").option("kafka.bootstrap.servers",
"localhost:9093").option("topic", "new_topic").mode("error").save()
{code}
4. Check topic content - 2 rows are added to topic instead of one:
{code:python}
spark.read.format("kafka").option("kafka.bootstrap.servers",
"localhost:9093").option("subscribe", "new_topic").load().show(10, False)
{code}
{code}
+----+-------------------+---------+---------+------+-----------------------+-------------+
|key |value |topic |partition|offset|timestamp
|timestampType|
+----+-------------------+---------+---------+------+-----------------------+-------------+
|null|[73 74 72 69 6E 67]|new_topic|0 |0 |2023-08-11 09:39:35.813|0
|
|null|[73 74 72 69 6E 67]|new_topic|0 |1 |2023-08-11 09:39:36.122|0
|
+----+-------------------+---------+---------+------+-----------------------+-------------+
{code}
It looks like mode is checked by KafkaSourceProvider, but is not used at all:
https://github.com/apache/spark/blob/v3.4.1/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L172-L178
So data is always appended to topic.
was:
I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but
when topic exists it does not raise exception. Instead it appends data to a
topic.
Steps to reproduce:
1. Start Kafka:
docker-compose.yml
{code:yaml}
version: '3.9'
services:
zookeeper:
image: bitnami/zookeeper:3.8
environment:
ALLOW_ANONYMOUS_LOGIN: 'yes'
kafka:
image: bitnami/kafka:latest
restart: unless-stopped
ports:
- 9093:9093
environment:
ALLOW_PLAINTEXT_LISTENER: 'yes'
KAFKA_ENABLE_KRAFT: 'no'
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS
KAFKA_CFG_LISTENERS:
INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093
KAFKA_CFG_ADVERTISED_LISTENERS:
INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:
INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
depends_on:
- zookeeper
{code}
{code:bash}
docker-compose up -d
{code}
2. Start Spark session:
{code:bash}
pip install pyspark[sql]==3.4.1
{code}
{code:python}
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.jars.packages",
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate()
{code}
3. Create DataFrame and write it to Kafka. First write using {{mode="append"}}
to create topic, then with {{mode="error"}} to raise because topic already
exist:
{code}
df = spark.createDataFrame([{"value": "string"}])
df.write.format("kafka").option("kafka.bootstrap.servers",
"localhost:9093").option("topic", "new_topic").mode("append").save()
# no exception is raised
df.write.format("kafka").option("kafka.bootstrap.servers",
"localhost:9093").option("topic", "new_topic").mode("error").save()
{code}
4. Check topic content - 2 rows are added to topic instead of one:
{code:python}
spark.read.format("kafka").option("kafka.bootstrap.servers",
"localhost:9093").option("subscribe", "new_topic").load().show(10, False)
{code}
{code}
+----+-------------------+---------+---------+------+-----------------------+-------------+
|key |value |topic |partition|offset|timestamp
|timestampType|
+----+-------------------+---------+---------+------+-----------------------+-------------+
|null|[73 74 72 69 6E 67]|new_topic|0 |0 |2023-08-11 09:39:35.813|0
|
|null|[73 74 72 69 6E 67]|new_topic|0 |1 |2023-08-11 09:39:36.122|0
|
+----+-------------------+---------+---------+------+-----------------------+-------------+
{code}
It looks like mode is checked by KafkaSourceProvider, but is not used at all:
https://github.com/apache/spark/blob/6b1ff22dde1ead51cbf370be6e48a802daae58b6/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L172-L178
So data is always appended to topic.
> SaveMode.ErrorIfExists does not work with kafka-sql
> ---------------------------------------------------
>
> Key: SPARK-44774
> URL: https://issues.apache.org/jira/browse/SPARK-44774
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.4.1
> Reporter: Maxim Martynov
> Priority: Major
>
> I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but
> when topic exists it does not raise exception. Instead it appends data to a
> topic.
> Steps to reproduce:
> 1. Start Kafka:
> docker-compose.yml
> {code:yaml}
> version: '3.9'
> services:
> zookeeper:
> image: bitnami/zookeeper:3.8
> environment:
> ALLOW_ANONYMOUS_LOGIN: 'yes'
> kafka:
> image: bitnami/kafka:latest
> restart: unless-stopped
> ports:
> - 9093:9093
> environment:
> ALLOW_PLAINTEXT_LISTENER: 'yes'
> KAFKA_ENABLE_KRAFT: 'no'
> KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
> KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS
> KAFKA_CFG_LISTENERS:
> INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093
> KAFKA_CFG_ADVERTISED_LISTENERS:
> INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093
> KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:
> INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT
> KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
> depends_on:
> - zookeeper
> {code}
> {code:bash}
> docker-compose up -d
> {code}
> 2. Start Spark session:
> {code:bash}
> pip install pyspark[sql]==3.4.1
> {code}
> {code:python}
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.config("spark.jars.packages",
> "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate()
> {code}
> 3. Create DataFrame and write it to Kafka. First write using
> {{mode="append"}} to create topic, then with {{mode="error"}} to raise
> because topic already exist:
> {code}
> df = spark.createDataFrame([{"value": "string"}])
> df.write.format("kafka").option("kafka.bootstrap.servers",
> "localhost:9093").option("topic", "new_topic").mode("append").save()
> # no exception is raised
> df.write.format("kafka").option("kafka.bootstrap.servers",
> "localhost:9093").option("topic", "new_topic").mode("error").save()
> {code}
> 4. Check topic content - 2 rows are added to topic instead of one:
> {code:python}
> spark.read.format("kafka").option("kafka.bootstrap.servers",
> "localhost:9093").option("subscribe", "new_topic").load().show(10, False)
> {code}
> {code}
> +----+-------------------+---------+---------+------+-----------------------+-------------+
> |key |value |topic |partition|offset|timestamp
> |timestampType|
> +----+-------------------+---------+---------+------+-----------------------+-------------+
> |null|[73 74 72 69 6E 67]|new_topic|0 |0 |2023-08-11
> 09:39:35.813|0 |
> |null|[73 74 72 69 6E 67]|new_topic|0 |1 |2023-08-11
> 09:39:36.122|0 |
> +----+-------------------+---------+---------+------+-----------------------+-------------+
> {code}
> It looks like mode is checked by KafkaSourceProvider, but is not used at all:
> https://github.com/apache/spark/blob/v3.4.1/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L172-L178
> So data is always appended to topic.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]