[
https://issues.apache.org/jira/browse/SPARK-49499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rachel Bushrian updated SPARK-49499:
------------------------------------
Description:
I have a Spark Streaming job that reads data from a Delta table, performs
transformations, and writes the data to Kafka in Avro format. However, in the
event of a failure, Spark logs the logical plan, which inadvertently includes
all Kafka properties, including sensitive mTLS keys.
The code:
{code:java}
val df = spark.readStream.format("delta")
.option("ignoreCorruptFiles", "true")
.option("path", fullPath)
.schema(schema.get)
.load()
val transformedDF = df.transform(...)
transformedDF.writeStream
.format("kafka")
.option("checkpointLocation", cpPath)
.options(config.conf)
.trigger(trigger.getOrElse(Trigger.ProcessingTime(0L)))
.start()
df.sparkSession.streams.awaitAnyTermination(){code}
The exception:
{code:java}
Exception in thread "main"
org.apache.spark.sql.streaming.StreamingQueryException:
org.apache.hadoop.fs.s3a.RemoteFileChangedException: open
s3a://.../_delta_log/00000000000001258632.json at 0 on
s3a://.../_delta_log/00000000000001258632.json:
software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status
Code: 412, Request ID:
tx0000012a53fe47376ba53-0066cf6372-149bb8-default):PreconditionFailed
`s3a://.../_delta_log/00000000000001258632.json': : null (Service: S3, Status
Code: 412, Request ID: tx0000012a53fe47376ba53-0066cf6372-149bb8-default) ===
Streaming Query === Identifier: [id = 0d9cd948-f417-4688-8c02-355a772299a4,
runId = f420b516-3e25-4443-8ae3-9f511b37ff8c] Current Committed Offsets:
{DeltaSource[s3a://...]:
{"sourceVersion":1,"reservoirId":"4c9d94b9-37b7-4aa7-bc9e-e914dfdc4ea2","reservoirVersion":1258632,"index":-1,"isStartingVersion":false}}
Current Available Offsets: {DeltaSource[s3a://...]:
{"sourceVersion":1,"reservoirId":"4c9d94b9-37b7-4aa7-bc9e-e914dfdc4ea2","reservoirVersion":1258632,"index":-1,"isStartingVersion":false}}Current
State: ACTIVE Thread State: RUNNABLE{code}
{code:java}
Logical Plan: WriteToMicroBatchDataSource
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@7945310f,
0d9cd948-f417-4688-8c02-355a772299a4, [kafka.ssl.keystore.type=PEM,
subscribe=dns-correlated-events, kafka.ssl.truststore.certificates=-----BEGIN
CERTIFICATE----- XXXX -----END CERTIFICATE----- ,
kafka.max.request.size=2000000,
kafka.bootstrap.servers=kafkacluster-kafka-bootstrap.kafka.svc.cluster.local:9094,
kafka.ssl.keystore.key=-----BEGIN PRIVATE KEY----- XXXX -----END PRIVATE
KEY----- , checkpointLocation=s3a://…, topic=dns-correlated-events,
kafka.ssl.truststore.type=PEM, kafka.ssl.keystore.certificate.chain=-----BEGIN
CERTIFICATE----- XXXX -----END CERTIFICATE----- , kafka.security.protocol=SSL],
Append +- Project [to_aero(struct(ts, ts#744L, ...
{code}
was:
I have a Spark Streaming job that reads data from a Delta table, performs
transformations, and writes the data to Kafka in Avro format. However, in the
event of a failure, Spark logs the logical plan, which inadvertently includes
all Kafka properties, including sensitive mTLS keys.
The code:
{code:java}
val df = spark.readStream.format("delta")
.option("ignoreCorruptFiles", "true")
.option("path", fullPath)
.schema(schema.get)
.load()
val transformedDF = df.transform(...)
transformedDF.writeStream
.format("kafka")
.option("checkpointLocation", cpPath)
.options(config.conf)
.trigger(trigger.getOrElse(Trigger.ProcessingTime(0L)))
.start()
df.sparkSession.streams.awaitAnyTermination(){code}
The exception:
{code:java}
Exception in thread "main"
org.apache.spark.sql.streaming.StreamingQueryException:
org.apache.hadoop.fs.s3a.RemoteFileChangedException: open
s3a://.../_delta_log/00000000000001258632.json at 0 on
s3a://.../_delta_log/00000000000001258632.json:
software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status
Code: 412, Request ID:
tx0000012a53fe47376ba53-0066cf6372-149bb8-default):PreconditionFailed
`s3a://.../_delta_log/00000000000001258632.json': : null (Service: S3, Status
Code: 412, Request ID: tx0000012a53fe47376ba53-0066cf6372-149bb8-default) ===
Streaming Query === Identifier: [id = 0d9cd948-f417-4688-8c02-355a772299a4,
runId = f420b516-3e25-4443-8ae3-9f511b37ff8c] Current Committed Offsets:
{DeltaSource[s3a://...]:
{"sourceVersion":1,"reservoirId":"4c9d94b9-37b7-4aa7-bc9e-e914dfdc4ea2","reservoirVersion":1258632,"index":-1,"isStartingVersion":false}}
Current Available Offsets: {DeltaSource[s3a://...]:
{"sourceVersion":1,"reservoirId":"4c9d94b9-37b7-4aa7-bc9e-e914dfdc4ea2","reservoirVersion":1258632,"index":-1,"isStartingVersion":false}}Current
State: ACTIVE Thread State: RUNNABLELogical Plan: WriteToMicroBatchDataSource
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@7945310f,
0d9cd948-f417-4688-8c02-355a772299a4, [kafka.ssl.keystore.type=PEM,
subscribe=dns-correlated-events, kafka.ssl.truststore.certificates=-----BEGIN
CERTIFICATE----- XXXX -----END CERTIFICATE----- ,
kafka.max.request.size=2000000,
kafka.bootstrap.servers=kafkacluster-kafka-bootstrap.kafka.svc.cluster.local:9094,
kafka.ssl.keystore.key=-----BEGIN PRIVATE KEY----- XXXX -----END PRIVATE
KEY----- , checkpointLocation=s3a://…, topic=dns-correlated-events,
kafka.ssl.truststore.type=PEM, kafka.ssl.keystore.certificate.chain=-----BEGIN
CERTIFICATE----- XXXX -----END CERTIFICATE----- , kafka.security.protocol=SSL],
Append +- Project [to_aero(struct(ts, ts#744L, ...
{code}
> Spark Streaming Exposes Security Kafka Keys to Logs
> ---------------------------------------------------
>
> Key: SPARK-49499
> URL: https://issues.apache.org/jira/browse/SPARK-49499
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.5.1
> Reporter: Rachel Bushrian
> Priority: Major
>
>
> I have a Spark Streaming job that reads data from a Delta table, performs
> transformations, and writes the data to Kafka in Avro format. However, in the
> event of a failure, Spark logs the logical plan, which inadvertently includes
> all Kafka properties, including sensitive mTLS keys.
> The code:
> {code:java}
> val df = spark.readStream.format("delta")
> .option("ignoreCorruptFiles", "true")
> .option("path", fullPath)
> .schema(schema.get)
> .load()
> val transformedDF = df.transform(...)
> transformedDF.writeStream
> .format("kafka")
> .option("checkpointLocation", cpPath)
> .options(config.conf)
> .trigger(trigger.getOrElse(Trigger.ProcessingTime(0L)))
> .start()
> df.sparkSession.streams.awaitAnyTermination(){code}
> The exception:
> {code:java}
> Exception in thread "main"
> org.apache.spark.sql.streaming.StreamingQueryException:
> org.apache.hadoop.fs.s3a.RemoteFileChangedException: open
> s3a://.../_delta_log/00000000000001258632.json at 0 on
> s3a://.../_delta_log/00000000000001258632.json:
> software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3,
> Status Code: 412, Request ID:
> tx0000012a53fe47376ba53-0066cf6372-149bb8-default):PreconditionFailed
> `s3a://.../_delta_log/00000000000001258632.json': : null (Service: S3, Status
> Code: 412, Request ID: tx0000012a53fe47376ba53-0066cf6372-149bb8-default) ===
> Streaming Query === Identifier: [id = 0d9cd948-f417-4688-8c02-355a772299a4,
> runId = f420b516-3e25-4443-8ae3-9f511b37ff8c] Current Committed Offsets:
> {DeltaSource[s3a://...]:
> {"sourceVersion":1,"reservoirId":"4c9d94b9-37b7-4aa7-bc9e-e914dfdc4ea2","reservoirVersion":1258632,"index":-1,"isStartingVersion":false}}
> Current Available Offsets: {DeltaSource[s3a://...]:
> {"sourceVersion":1,"reservoirId":"4c9d94b9-37b7-4aa7-bc9e-e914dfdc4ea2","reservoirVersion":1258632,"index":-1,"isStartingVersion":false}}Current
> State: ACTIVE Thread State: RUNNABLE{code}
> {code:java}
> Logical Plan: WriteToMicroBatchDataSource
> org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@7945310f,
> 0d9cd948-f417-4688-8c02-355a772299a4, [kafka.ssl.keystore.type=PEM,
> subscribe=dns-correlated-events, kafka.ssl.truststore.certificates=-----BEGIN
> CERTIFICATE----- XXXX -----END CERTIFICATE----- ,
> kafka.max.request.size=2000000,
> kafka.bootstrap.servers=kafkacluster-kafka-bootstrap.kafka.svc.cluster.local:9094,
> kafka.ssl.keystore.key=-----BEGIN PRIVATE KEY----- XXXX -----END PRIVATE
> KEY----- , checkpointLocation=s3a://…, topic=dns-correlated-events,
> kafka.ssl.truststore.type=PEM,
> kafka.ssl.keystore.certificate.chain=-----BEGIN CERTIFICATE----- XXXX
> -----END CERTIFICATE----- , kafka.security.protocol=SSL], Append +- Project
> [to_aero(struct(ts, ts#744L, ...
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]