Hi Brain,

We’re using consumerFactoryFn that reads certs from GCP and copying those to 
local FS on each Dataflow worker.
Exception raised after consumerFactoryFn when Kafka tries to read certs from 
local fs using KeyStore.load(InputStream is, String pass).

This code we using in consumerFactoryFn to read from GCP and writing to local 
fs :
try (ReadableByteChannel readerChannel =
          
FileSystems.open(FileSystems.matchSingleFileSpec(gcsFilePath).resourceId())) {
        try (FileChannel writeChannel = 
FileChannel.open(Paths.get(outputFilePath), options)) {
          writeChannel.transferFrom(readerChannel, 0, Long.MAX_VALUE);
        }
}

Thank you,
Ilya

From: Brian Hulette <bhule...@google.com>
Reply to: "dev@beam.apache.org" <dev@beam.apache.org>
Date: Wednesday, 26 May 2021, 21:32
To: dev <dev@beam.apache.org>
Cc: Artur Khanin <artur.kha...@akvelon.com>
Subject: Re: KafkaIO SSL issue

I came across this relevant StackOverflow question: 
https://stackoverflow.com/questions/7399154/pkcs12-derinputstream-getlength-exception

They say the error is from a call to `KeyStore.load(InputStream is, String 
pass);` (consistent with your stacktrace), and can occur whenever there's an 
issue with the InputStream passed to it. Who created the InputStream used in 
this case, is it Kafka code, Beam code, or your consumerFactoryFn?



Brian

On Mon, May 24, 2021 at 4:01 AM Ilya Kozyrev 
<ilya.kozy...@akvelon.com<mailto:ilya.kozy...@akvelon.com>> wrote:
Hi community,

We have an issue with KafkaIO in the case of using a secure connection SASL SSL 
to the Confluent Kafka 5.5.1. When we trying to configure the Kafka consumer 
using consumerFactoryFn, we have an irregular issue related to certificate 
reads from the file system. Irregular means, that different Dataflow jobs with 
the same parameters and certs might be failed and succeeded. Store cert types 
for Keystore and Truststore are specified explicitly in consumer config. In our 
case, it's JKS for both certs.

Stacktrase:
Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore 
/tmp/kafka.truststore.jks of type JKS
      at 
org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:289)
      at 
org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:153)
      ... 23 more
Caused by: java.security.cert.CertificateException: Unable to initialize, 
java.io.IOException: DerInputStream.getLength(): lengthTag=65, too big.
      at sun.security.x509.X509CertImpl.<init>(X509CertImpl.java:198)
      at 
sun.security.provider.X509Factory.engineGenerateCertificate(X509Factory.java:102)
      at 
java.security.cert.CertificateFactory.generateCertificate(CertificateFactory.java:339)
      at sun.security.provider.JavaKeyStore.engineLoad(JavaKeyStore.java:755)
      at sun.security.provider.JavaKeyStore$JKS.engineLoad(JavaKeyStore.java:56)
      at 
sun.security.provider.KeyStoreDelegator.engineLoad(KeyStoreDelegator.java:224)
      at 
sun.security.provider.JavaKeyStore$DualFormatJKS.engineLoad(JavaKeyStore.java:70)
      at java.security.KeyStore.load(KeyStore.java:1445)
      at 
org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:286)
      ... 24 more

/tmp/kafka.truststore.jks is a path that’s used in consumerFactoryFn to load 
certs from GCP to the worker's local file system.

Does anyone have any ideas on how to fix this issue?


Thank you,
Ilya

Reply via email to