Hi Brain, We’re using consumerFactoryFn that reads certs from GCP and copying those to local FS on each Dataflow worker. Exception raised after consumerFactoryFn when Kafka tries to read certs from local fs using KeyStore.load(InputStream is, String pass).
This code we using in consumerFactoryFn to read from GCP and writing to local fs : try (ReadableByteChannel readerChannel = FileSystems.open(FileSystems.matchSingleFileSpec(gcsFilePath).resourceId())) { try (FileChannel writeChannel = FileChannel.open(Paths.get(outputFilePath), options)) { writeChannel.transferFrom(readerChannel, 0, Long.MAX_VALUE); } } Thank you, Ilya From: Brian Hulette <bhule...@google.com> Reply to: "dev@beam.apache.org" <dev@beam.apache.org> Date: Wednesday, 26 May 2021, 21:32 To: dev <dev@beam.apache.org> Cc: Artur Khanin <artur.kha...@akvelon.com> Subject: Re: KafkaIO SSL issue I came across this relevant StackOverflow question: https://stackoverflow.com/questions/7399154/pkcs12-derinputstream-getlength-exception They say the error is from a call to `KeyStore.load(InputStream is, String pass);` (consistent with your stacktrace), and can occur whenever there's an issue with the InputStream passed to it. Who created the InputStream used in this case, is it Kafka code, Beam code, or your consumerFactoryFn? Brian On Mon, May 24, 2021 at 4:01 AM Ilya Kozyrev <ilya.kozy...@akvelon.com<mailto:ilya.kozy...@akvelon.com>> wrote: Hi community, We have an issue with KafkaIO in the case of using a secure connection SASL SSL to the Confluent Kafka 5.5.1. When we trying to configure the Kafka consumer using consumerFactoryFn, we have an irregular issue related to certificate reads from the file system. Irregular means, that different Dataflow jobs with the same parameters and certs might be failed and succeeded. Store cert types for Keystore and Truststore are specified explicitly in consumer config. In our case, it's JKS for both certs. Stacktrase: Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /tmp/kafka.truststore.jks of type JKS at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:289) at org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:153) ... 23 more Caused by: java.security.cert.CertificateException: Unable to initialize, java.io.IOException: DerInputStream.getLength(): lengthTag=65, too big. at sun.security.x509.X509CertImpl.<init>(X509CertImpl.java:198) at sun.security.provider.X509Factory.engineGenerateCertificate(X509Factory.java:102) at java.security.cert.CertificateFactory.generateCertificate(CertificateFactory.java:339) at sun.security.provider.JavaKeyStore.engineLoad(JavaKeyStore.java:755) at sun.security.provider.JavaKeyStore$JKS.engineLoad(JavaKeyStore.java:56) at sun.security.provider.KeyStoreDelegator.engineLoad(KeyStoreDelegator.java:224) at sun.security.provider.JavaKeyStore$DualFormatJKS.engineLoad(JavaKeyStore.java:70) at java.security.KeyStore.load(KeyStore.java:1445) at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:286) ... 24 more /tmp/kafka.truststore.jks is a path that’s used in consumerFactoryFn to load certs from GCP to the worker's local file system. Does anyone have any ideas on how to fix this issue? Thank you, Ilya