[ 
https://issues.apache.org/jira/browse/FLINK-32773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mehdi updated FLINK-32773:
--------------------------
    Description: 
When having a Kafka connector using SSL to connect the comment pattern used in 
the Flink SQL Runner example is considering `------{-}BEGIN 
CERTIFICATE--{-}---{-}` and `{-}---{-}END CERTIFICATE{-}----` as comments and 
is not setting correctly the certificates used in SSL.
h3. How to reproduce

This issue can be reproduced as follows (for Kafka) when having ssl enabled in 
Kafka.
 * In 
[https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/sql-scripts/statement-set.sql]
 you should have the property {{properties.ssl.truststore.certificates}}

*Example*

{{WITH (
    'connector' = 'kafka',
    ...
    'properties.ssl.truststore.certificates' = '-----BEGIN CERTIFICATE-----
    ...
    -----END CERTIFICATE-----
    '
);}} * Deploy and check that there is an error to create the Kafka 
producer/consumer

{{org.apache.kafka.common.KafkaException: Failed to construct kafka producer
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:439) 
~[kafka-clients-3.2.3.jar:?]
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289) 
~[kafka-clients-3.2.3.jar:?]
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316) 
~[kafka-clients-3.2.3.jar:?]
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301) 
~[kafka-clients-3.2.3.jar:?]
        at 
org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:55)
 ~[flink-connector-kafka-1.17.1.jar:1.17.1]
        at 
org.apache.flink.connector.kafka.sink.KafkaWriter.<init>(KafkaWriter.java:182) 
~[flink-connector-kafka-1.17.1.jar:1.17.1]
        at 
org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:111)
 ~[flink-connector-kafka-1.17.1.jar:1.17.1]
        at 
org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:57) 
~[flink-connector-kafka-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:117)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:146)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) 
[flink-dist-1.17.1.jar:1.17.1]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
[flink-dist-1.17.1.jar:1.17.1]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
[flink-dist-1.17.1.jar:1.17.1]
        at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: No 
matching CERTIFICATE entries in PEM file}}

  was:When having a Kafka connector using SSL to connect the comment pattern 
used in the Flink SQL Runner example is considering `-----BEGIN 
CERTIFICATE-----` and `-----END CERTIFICATE-----` as comments and is not 
setting correctly the certificates used in SSL.


> Support PEM certificates in the SQL Runner example
> --------------------------------------------------
>
>                 Key: FLINK-32773
>                 URL: https://issues.apache.org/jira/browse/FLINK-32773
>             Project: Flink
>          Issue Type: Bug
>          Components: Kubernetes Operator
>            Reporter: Mehdi
>            Priority: Major
>
> When having a Kafka connector using SSL to connect the comment pattern used 
> in the Flink SQL Runner example is considering `------{-}BEGIN 
> CERTIFICATE--{-}---{-}` and `{-}---{-}END CERTIFICATE{-}----` as comments and 
> is not setting correctly the certificates used in SSL.
> h3. How to reproduce
> This issue can be reproduced as follows (for Kafka) when having ssl enabled 
> in Kafka.
>  * In 
> [https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/sql-scripts/statement-set.sql]
>  you should have the property {{properties.ssl.truststore.certificates}}
> *Example*
> {{WITH (
>     'connector' = 'kafka',
>     ...
>     'properties.ssl.truststore.certificates' = '-----BEGIN CERTIFICATE-----
>     ...
>     -----END CERTIFICATE-----
>     '
> );}} * Deploy and check that there is an error to create the Kafka 
> producer/consumer
> {{org.apache.kafka.common.KafkaException: Failed to construct kafka producer
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:439)
>  ~[kafka-clients-3.2.3.jar:?]
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289)
>  ~[kafka-clients-3.2.3.jar:?]
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316)
>  ~[kafka-clients-3.2.3.jar:?]
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301)
>  ~[kafka-clients-3.2.3.jar:?]
>       at 
> org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:55)
>  ~[flink-connector-kafka-1.17.1.jar:1.17.1]
>       at 
> org.apache.flink.connector.kafka.sink.KafkaWriter.<init>(KafkaWriter.java:182)
>  ~[flink-connector-kafka-1.17.1.jar:1.17.1]
>       at 
> org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:111)
>  ~[flink-connector-kafka-1.17.1.jar:1.17.1]
>       at 
> org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:57)
>  ~[flink-connector-kafka-1.17.1.jar:1.17.1]
>       at 
> org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:117)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>       at 
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:146)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>       at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) 
> [flink-dist-1.17.1.jar:1.17.1]
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
> [flink-dist-1.17.1.jar:1.17.1]
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
> [flink-dist-1.17.1.jar:1.17.1]
>       at java.lang.Thread.run(Unknown Source) [?:?]
> Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: No 
> matching CERTIFICATE entries in PEM file}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to