[
https://issues.apache.org/jira/browse/FLINK-32773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mehdi updated FLINK-32773:
--------------------------
Description:
When having a Kafka connector using SSL to connect the comment pattern used in
the Flink SQL Runner example is considering
{{-----BEGIN CERTIFICATE----- and }}{{-----END CERTIFICATE-----}}
as comments and is not setting correctly the certificates used in SSL.
h3. How to reproduce
This issue can be reproduced as follows (for Kafka) when having ssl enabled in
Kafka.
* In
[https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/sql-scripts/statement-set.sql]
you should have the property {{properties.ssl.truststore.certificates}}
*Example*
{{WITH (
'connector' = 'kafka',
...
'properties.ssl.truststore.certificates' = '----{-}BEGIN CERTIFICATE{-}----
...
----{-}END CERTIFICATE{-}----
'
);}} * Deploy and check that there is an error to create the Kafka
producer/consumer
{{org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:439)
~[kafka-clients-3.2.3.jar:?]
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289)
~[kafka-clients-3.2.3.jar:?]
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316)
~[kafka-clients-3.2.3.jar:?]
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301)
~[kafka-clients-3.2.3.jar:?]
at
org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:55)
~[flink-connector-kafka-1.17.1.jar:1.17.1]
at
org.apache.flink.connector.kafka.sink.KafkaWriter.<init>(KafkaWriter.java:182)
~[flink-connector-kafka-1.17.1.jar:1.17.1]
at
org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:111)
~[flink-connector-kafka-1.17.1.jar:1.17.1]
at
org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:57)
~[flink-connector-kafka-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:117)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:146)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
[flink-dist-1.17.1.jar:1.17.1]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: No
matching CERTIFICATE entries in PEM file}}
was:
When having a Kafka connector using SSL to connect the comment pattern used in
the Flink SQL Runner example is considering `------{-}BEGIN
CERTIFICATE--{-}---{-}` and `{-}---{-}END CERTIFICATE{-}----` as comments and
is not setting correctly the certificates used in SSL.
h3. How to reproduce
This issue can be reproduced as follows (for Kafka) when having ssl enabled in
Kafka.
* In
[https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/sql-scripts/statement-set.sql]
you should have the property {{properties.ssl.truststore.certificates}}
*Example*
{{WITH (
'connector' = 'kafka',
...
'properties.ssl.truststore.certificates' = '-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
'
);}} * Deploy and check that there is an error to create the Kafka
producer/consumer
{{org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:439)
~[kafka-clients-3.2.3.jar:?]
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289)
~[kafka-clients-3.2.3.jar:?]
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316)
~[kafka-clients-3.2.3.jar:?]
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301)
~[kafka-clients-3.2.3.jar:?]
at
org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:55)
~[flink-connector-kafka-1.17.1.jar:1.17.1]
at
org.apache.flink.connector.kafka.sink.KafkaWriter.<init>(KafkaWriter.java:182)
~[flink-connector-kafka-1.17.1.jar:1.17.1]
at
org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:111)
~[flink-connector-kafka-1.17.1.jar:1.17.1]
at
org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:57)
~[flink-connector-kafka-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:117)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:146)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
[flink-dist-1.17.1.jar:1.17.1]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: No
matching CERTIFICATE entries in PEM file}}
> Support PEM certificates in the SQL Runner example
> --------------------------------------------------
>
> Key: FLINK-32773
> URL: https://issues.apache.org/jira/browse/FLINK-32773
> Project: Flink
> Issue Type: Bug
> Components: Kubernetes Operator
> Reporter: Mehdi
> Priority: Major
> Labels: pull-request-available
>
> When having a Kafka connector using SSL to connect the comment pattern used
> in the Flink SQL Runner example is considering
> {{-----BEGIN CERTIFICATE----- and }}{{-----END CERTIFICATE-----}}
> as comments and is not setting correctly the certificates used in SSL.
> h3. How to reproduce
> This issue can be reproduced as follows (for Kafka) when having ssl enabled
> in Kafka.
> * In
> [https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/sql-scripts/statement-set.sql]
> you should have the property {{properties.ssl.truststore.certificates}}
> *Example*
> {{WITH (
> 'connector' = 'kafka',
> ...
> 'properties.ssl.truststore.certificates' = '----{-}BEGIN CERTIFICATE{-}----
> ...
> ----{-}END CERTIFICATE{-}----
> '
> );}} * Deploy and check that there is an error to create the Kafka
> producer/consumer
> {{org.apache.kafka.common.KafkaException: Failed to construct kafka producer
> at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:439)
> ~[kafka-clients-3.2.3.jar:?]
> at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289)
> ~[kafka-clients-3.2.3.jar:?]
> at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316)
> ~[kafka-clients-3.2.3.jar:?]
> at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301)
> ~[kafka-clients-3.2.3.jar:?]
> at
> org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:55)
> ~[flink-connector-kafka-1.17.1.jar:1.17.1]
> at
> org.apache.flink.connector.kafka.sink.KafkaWriter.<init>(KafkaWriter.java:182)
> ~[flink-connector-kafka-1.17.1.jar:1.17.1]
> at
> org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:111)
> ~[flink-connector-kafka-1.17.1.jar:1.17.1]
> at
> org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:57)
> ~[flink-connector-kafka-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:117)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:146)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
> [flink-dist-1.17.1.jar:1.17.1]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> [flink-dist-1.17.1.jar:1.17.1]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> [flink-dist-1.17.1.jar:1.17.1]
> at java.lang.Thread.run(Unknown Source) [?:?]
> Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: No
> matching CERTIFICATE entries in PEM file}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)