Oh I guess you are running the KafkaToPubsub example pipeline [1]? The code
you copied is here [2].
Based on that code I think Kafka is in control of creating the InputStream,
since you're just passing a file path in through the config object. So
either Kafka is creating a bad InputStream (seems unlikely), or there's
something wrong with /tmp/kafka.truststore.jks. Maybe it was cleaned up
while Kafka is reading it, or the file is somehow corrupt?

Looking at the code you copied, I suppose it's possible you're not writing
the full file to the local disk. The javadoc for transferFrom [3] states:

> Fewer than the requested number of bytes will be transferred if the
source channel has fewer than count bytes remaining, ** or if the source
channel is non-blocking and has fewer than count bytes immediately
available in its input buffer. **

Is it possible sometimes you're hitting this second case and the whole file
isn't being read? I don't know if readerChannel is blocking or not. You
might check by adding a log statement that prints the number of bytes that
are transferred to see if that correlates with the failure.

Someone else on this list may have advice on a more robust way to copy a
file from remote storage.

Brian

[1]
https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/
[2]
https://github.com/apache/beam/blob/f881a412fe85c64b1caf075160a6c0312995ea45/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/kafka/consumer/SslConsumerFactoryFn.java#L128
[3]
https://docs.oracle.com/javase/8/docs/api/java/nio/channels/FileChannel.html#transferFrom-java.nio.channels.ReadableByteChannel-long-long-

On Wed, Jun 2, 2021 at 9:00 AM Ilya Kozyrev <ilya.kozy...@akvelon.com>
wrote:

> Hi Brain,
>
>
>
> We’re using consumerFactoryFn that reads certs from GCP and copying those
> to local FS on each Dataflow worker.
>
> Exception raised after consumerFactoryFn when Kafka tries to read certs
> from local fs using KeyStore.load(InputStream is, String pass).
>
>
>
> This code we using in consumerFactoryFn to read from GCP and writing to
> local fs :
>
> try (ReadableByteChannel readerChannel =
>
>
>       
> FileSystems.open(FileSystems.matchSingleFileSpec(gcsFilePath).resourceId()))
> {
>
>         try (FileChannel writeChannel =
> FileChannel.open(Paths.get(outputFilePath), options)) {
>
>           writeChannel.transferFrom(readerChannel, 0, Long.MAX_VALUE);
>
>         }
>
> }
>
>
>
> Thank you,
>
> Ilya
>
>
>
> *From: *Brian Hulette <bhule...@google.com>
> *Reply to: *"dev@beam.apache.org" <dev@beam.apache.org>
> *Date: *Wednesday, 26 May 2021, 21:32
> *To: *dev <dev@beam.apache.org>
> *Cc: *Artur Khanin <artur.kha...@akvelon.com>
> *Subject: *Re: KafkaIO SSL issue
>
>
>
> I came across this relevant StackOverflow question:
> https://stackoverflow.com/questions/7399154/pkcs12-derinputstream-getlength-exception
>
> They say the error is from a call to `KeyStore.load(InputStream is,
> String pass);` (consistent with your stacktrace), and can occur whenever
> there's an issue with the InputStream passed to it. Who created the
> InputStream used in this case, is it Kafka code, Beam code, or your
> consumerFactoryFn?
>
>
>
> Brian
>
>
>
> On Mon, May 24, 2021 at 4:01 AM Ilya Kozyrev <ilya.kozy...@akvelon.com>
> wrote:
>
> Hi community,
>
>
> We have an issue with KafkaIO in the case of using a secure connection
> SASL SSL to the Confluent Kafka 5.5.1. When we trying to configure the
> Kafka consumer using consumerFactoryFn, we have an irregular issue related
> to certificate reads from the file system. Irregular means, that different
> Dataflow jobs with the same parameters and certs might be failed and
> succeeded. Store cert types for Keystore and Truststore are specified
> explicitly in consumer config. In our case, it's JKS for both certs.
>
>
> *Stacktrase*:
>
> Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL
> keystore /tmp/kafka.truststore.jks of type JKS
>       at
> org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:289)
>       at
> org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:153)
>       ... 23 more
> Caused by: java.security.cert.CertificateException: Unable to initialize,
> java.io.IOException: DerInputStream.getLength(): lengthTag=65, too big.
>       at sun.security.x509.X509CertImpl.<init>(X509CertImpl.java:198)
>       at
> sun.security.provider.X509Factory.engineGenerateCertificate(X509Factory.java:102)
>       at
> java.security.cert.CertificateFactory.generateCertificate(CertificateFactory.java:339)
>       at
> sun.security.provider.JavaKeyStore.engineLoad(JavaKeyStore.java:755)
>       at
> sun.security.provider.JavaKeyStore$JKS.engineLoad(JavaKeyStore.java:56)
>       at
> sun.security.provider.KeyStoreDelegator.engineLoad(KeyStoreDelegator.java:224)
>       at
> sun.security.provider.JavaKeyStore$DualFormatJKS.engineLoad(JavaKeyStore.java:70)
>       at java.security.KeyStore.load(KeyStore.java:1445)
>       at
> org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:286)
>       ... 24 more
>
>
>
> /tmp/kafka.truststore.jks is a path that’s used in consumerFactoryFn to
> load certs from GCP to the worker's local file system.
>
>
>
> Does anyone have any ideas on how to fix this issue?
>
>
>
>
>
> Thank you,
>
> Ilya
>
>

Reply via email to