Oh I guess you are running the KafkaToPubsub example pipeline [1]? The code you copied is here [2]. Based on that code I think Kafka is in control of creating the InputStream, since you're just passing a file path in through the config object. So either Kafka is creating a bad InputStream (seems unlikely), or there's something wrong with /tmp/kafka.truststore.jks. Maybe it was cleaned up while Kafka is reading it, or the file is somehow corrupt?
Looking at the code you copied, I suppose it's possible you're not writing the full file to the local disk. The javadoc for transferFrom [3] states: > Fewer than the requested number of bytes will be transferred if the source channel has fewer than count bytes remaining, ** or if the source channel is non-blocking and has fewer than count bytes immediately available in its input buffer. ** Is it possible sometimes you're hitting this second case and the whole file isn't being read? I don't know if readerChannel is blocking or not. You might check by adding a log statement that prints the number of bytes that are transferred to see if that correlates with the failure. Someone else on this list may have advice on a more robust way to copy a file from remote storage. Brian [1] https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/ [2] https://github.com/apache/beam/blob/f881a412fe85c64b1caf075160a6c0312995ea45/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/kafka/consumer/SslConsumerFactoryFn.java#L128 [3] https://docs.oracle.com/javase/8/docs/api/java/nio/channels/FileChannel.html#transferFrom-java.nio.channels.ReadableByteChannel-long-long- On Wed, Jun 2, 2021 at 9:00 AM Ilya Kozyrev <ilya.kozy...@akvelon.com> wrote: > Hi Brain, > > > > We’re using consumerFactoryFn that reads certs from GCP and copying those > to local FS on each Dataflow worker. > > Exception raised after consumerFactoryFn when Kafka tries to read certs > from local fs using KeyStore.load(InputStream is, String pass). > > > > This code we using in consumerFactoryFn to read from GCP and writing to > local fs : > > try (ReadableByteChannel readerChannel = > > > > FileSystems.open(FileSystems.matchSingleFileSpec(gcsFilePath).resourceId())) > { > > try (FileChannel writeChannel = > FileChannel.open(Paths.get(outputFilePath), options)) { > > writeChannel.transferFrom(readerChannel, 0, Long.MAX_VALUE); > > } > > } > > > > Thank you, > > Ilya > > > > *From: *Brian Hulette <bhule...@google.com> > *Reply to: *"dev@beam.apache.org" <dev@beam.apache.org> > *Date: *Wednesday, 26 May 2021, 21:32 > *To: *dev <dev@beam.apache.org> > *Cc: *Artur Khanin <artur.kha...@akvelon.com> > *Subject: *Re: KafkaIO SSL issue > > > > I came across this relevant StackOverflow question: > https://stackoverflow.com/questions/7399154/pkcs12-derinputstream-getlength-exception > > They say the error is from a call to `KeyStore.load(InputStream is, > String pass);` (consistent with your stacktrace), and can occur whenever > there's an issue with the InputStream passed to it. Who created the > InputStream used in this case, is it Kafka code, Beam code, or your > consumerFactoryFn? > > > > Brian > > > > On Mon, May 24, 2021 at 4:01 AM Ilya Kozyrev <ilya.kozy...@akvelon.com> > wrote: > > Hi community, > > > We have an issue with KafkaIO in the case of using a secure connection > SASL SSL to the Confluent Kafka 5.5.1. When we trying to configure the > Kafka consumer using consumerFactoryFn, we have an irregular issue related > to certificate reads from the file system. Irregular means, that different > Dataflow jobs with the same parameters and certs might be failed and > succeeded. Store cert types for Keystore and Truststore are specified > explicitly in consumer config. In our case, it's JKS for both certs. > > > *Stacktrase*: > > Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL > keystore /tmp/kafka.truststore.jks of type JKS > at > org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:289) > at > org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:153) > ... 23 more > Caused by: java.security.cert.CertificateException: Unable to initialize, > java.io.IOException: DerInputStream.getLength(): lengthTag=65, too big. > at sun.security.x509.X509CertImpl.<init>(X509CertImpl.java:198) > at > sun.security.provider.X509Factory.engineGenerateCertificate(X509Factory.java:102) > at > java.security.cert.CertificateFactory.generateCertificate(CertificateFactory.java:339) > at > sun.security.provider.JavaKeyStore.engineLoad(JavaKeyStore.java:755) > at > sun.security.provider.JavaKeyStore$JKS.engineLoad(JavaKeyStore.java:56) > at > sun.security.provider.KeyStoreDelegator.engineLoad(KeyStoreDelegator.java:224) > at > sun.security.provider.JavaKeyStore$DualFormatJKS.engineLoad(JavaKeyStore.java:70) > at java.security.KeyStore.load(KeyStore.java:1445) > at > org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:286) > ... 24 more > > > > /tmp/kafka.truststore.jks is a path that’s used in consumerFactoryFn to > load certs from GCP to the worker's local file system. > > > > Does anyone have any ideas on how to fix this issue? > > > > > > Thank you, > > Ilya > >