Hi Matt,

You're right, the KafkaSystemConsumer in 0.14.1 does not support SSL since
it uses SimpleConsumer in the BrokerProxy. The new KafkaSystemConsumer in
Samza 1.0 does.

Backporting this to 14.1 will be non-trivial. Can you upgrade to Samza 1.0
to pick up the new consumer? I think Xinyu already shared a beam runner
snapshot build with 1.0 that you can try.

- Prateek

On Mon, Mar 25, 2019 at 1:58 PM LeVeck, Matt <matt_lev...@intuit.com> wrote:

> So, this is far from definitive for SSL.  However it is consistent with
> what we would expect from an SSL error.  We don’t get the error if I
> instantiate a full fledged consumer, or use kafka-console-consumer with SSL
> configs.  We do see this error if we try a console consumer with the
> deprecated interface of providing the zookeeper addresses instead of the
> broker addresses.  That, combined with the code I linked to in my previous
> message (where samza builds a consumer that doesn’t take, and is not passed
> any SSL configs) is why we think SSL is the issue.
>
> Thanks,
>
> Matt
> 2019-03-25 20:53:41 DEBUG KafkaSystemAdmin:57 - Exception detail:
> kafka.common.KafkaException: fetching topic metadata for topics
> [Set(__samza_checkpoint_ver_1_for_spp-kabini-transformer_42)] from broker
> [ArrayBuffer(BrokerEndPoint(0,sppkafka.data-lake-dev.a.intuit.com,19701),
> BrokerEndPoint(2,sppkafka.data-lake-dev.a.intuit.com,19901),
> BrokerEndPoint(1,sppkafka.data-lake-dev.a.intuit.com,19801))] failed at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:75) at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:96) at
> org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(ClientUtilTopicMetadataStore.scala:37)
> at
> org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$validateStream$2$$anonfun$19.apply(KafkaSystemAdmin.scala:516)
> at
> org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$validateStream$2$$anonfun$19.apply(KafkaSystemAdmin.scala:516)
> at
> org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata(TopicMetadataCache.scala:52)
> at
> org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$validateStream$2.apply(KafkaSystemAdmin.scala:516)
> at
> org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$validateStream$2.apply(KafkaSystemAdmin.scala:514)
> at
> org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:90)
> at
> org.apache.samza.system.kafka.KafkaSystemAdmin.validateStream(KafkaSystemAdmin.scala:513)
> at
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.createResources(KafkaCheckpointManager.scala:88)
> at
> org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange(ZkJobCoordinator.java:245)
> at
> org.apache.samza.zk.ZkJobCoordinator$LeaderElectorListenerImpl.lambda$onBecomingLeader$0(ZkJobCoordinator.java:366)
> at
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:163)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException
> at
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:103)
> at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122) at
> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82) at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:124) at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:61) ... 20
> more
>
>
>
> *From: *"LeVeck, Matt" <matt_lev...@intuit.com>
> *Date: *Monday, March 25, 2019 at 11:35 AM
> *To: *Prateek Maheshwari <prateek...@gmail.com>, "dev@samza.apache.org" <
> dev@samza.apache.org>
> *Cc: *"Deshpande, Omkar" <omkar_deshpa...@intuit.com>, "Audo, Nicholas" <
> nicholas_a...@intuit.com>
> *Subject: *Re: SSL with Samza 0.14.1?
>
>
>
> Thanks Prateek
>
> I’ll grab the logs here shortly.  But having looked at the code a bit
> since my original email, I don’t see any possibility for getting that
> config into the consumer based on this:
>
> https://github.com/apache/samza/blob/0.14.1/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L77
> and
>
> https://github.com/apache/samza/blob/0.14.1/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala
>
> -Matt
>
>
>
> *From: *Prateek Maheshwari <prateek...@gmail.com>
> *Date: *Monday, March 25, 2019 at 10:02 AM
> *To: *"dev@samza.apache.org" <dev@samza.apache.org>
> *Cc: *"Deshpande, Omkar" <omkar_deshpa...@intuit.com>, "LeVeck, Matt" <
> matt_lev...@intuit.com>, "Audo, Nicholas" <nicholas_a...@intuit.com>
> *Subject: *Re: SSL with Samza 0.14.1?
>
>
>
> This email is from an external sender.
>
>
>
> Hi Matt,
>
>
>
> It's possible that the old Kafka AdminClient does not support SSL for ZK
> out of the box. I'll check if this is the case, and if this is something
> that can be configured.
>
>
>
> In the mean time, can you tell us the following:
>
> 1. Kafka broker version you're running.
>
> 2. Kafka client version for the job.
>
> 3. Stacktrace where you see the SSL connect errors.
>
>
>
> Thanks,
>
> Prateek
>
>
>
>
>
>
>
> On Mon, Mar 25, 2019 at 9:47 AM Prateek Maheshwari <prateek...@gmail.com>
> wrote:
>
> Forwarding again. Original email did not show up on the OSS mailing list.
>
> ---------- Forwarded message ---------
> From: *Deshpande, Omkar* <omkar_deshpa...@intuit.com>
> Date: Fri, Mar 22, 2019 at 5:08 PM
> Subject: Fwd: SSL with Samza 0.14.1?
> To: prateek...@gmail.com <prateek...@gmail.com>
>
>
>
> ++Prateek gmail
> ------------------------------
>
> *From:* LeVeck, Matt
> *Sent:* Thursday, March 21, 2019 10:33:11 PM
> *To:* dev@samza.apache.org; pmaheshw...@linkedin.com; Deshpande, Omkar;
> Audo, Nicholas
> *Subject:* SSL with Samza 0.14.1?
>
>
>
> Prateek, Samza dev team,
>
>     This is Matt from Intuit.  We met briefly at the beginning of this
> week’s meetup.  I’m wondering if you could help give us some guidance on
> Kafka SSL with Samza.  Here, I’m talking about the Kafka cluster that Samza
> uses to store checkpoints, etc.  We’re trying to connect to a cluster that
> has SSL enabled, and we’re getting some errors that are indicative of SSL
> connectivity failing.  It might just be that our properties file isn’t
> correct.  But we’re a wondering if there is another possibility. This
> indicates that Samza 0.14.1 uses Kafka 0.11 which should have SSL support.
> But Samza 0.14.1 also requires access to zookeeper for its consumer client,
> which is indicative of older clients (see
> https://samza.apache.org/learn/documentation/0.14/jobs/configuration-table.html#kafka).
> Is it possible that Samza 0.14.1 doesn’t support SSL for Kafka when
> creating its checkpoint topics?
>
> Anyways, I’m hoping that’s not the case, and either our config is wrong or
> we’re doing something else wrong.  Here is our properties snippet in case
> we’ve messed up the config key names.  Any guidance is appreciated.
>
>
> # Kafka System
>
> systems.kafka.zookeeper.connect=
> sppzookeeper.data-lake-dev.a.intuit.com:2181,
> sppzookeeper.data-lake-dev.a.intuit.com:2182,
> sppzookeeper.data-lake-dev.a.intuit.com:2183
>
> systems.kafka.security.protocol=SSL
>
> systems.kafka.ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
>
> systems.kafka.ssl.truststore.type=JKS
>
> systems.kafka.ssl.truststore.location=/home/appuser/spp/kabini.jks
>
> systems.kafka.ssl.truststore.password=Intuit01
>
> systems.kafka.bootstrap.servers=sppkafka.data-lake-dev.a.intuit.com:19701,
> sppkafka.data-lake-dev.a.intuit.com:19801,
> sppkafka.data-lake-dev.a.intuit.com:19901
>
>
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>
>
>
> We’ve also tried adding producer and consumer specific entries:
>
>
>
> systems.kafka.producer.security.protocol=SSL
>
> systems.kafka.producer.ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
>
> systems.kafka.producer.ssl.truststore.type=JKS
>
> systems.kafka.producer.ssl.truststore.location=/home/appuser/spp/kabini.jks
>
> systems.kafka.producer.ssl.truststore.password=Intuit01
>
> systems.kafka.producer.bootstrap.servers=
> sppkafka.data-lake-dev.a.intuit.com:19701,
> sppkafka.data-lake-dev.a.intuit.com:19801,
> sppkafka.data-lake-dev.a.intuit.com:19901
>
> systems.kafka.consumer.zookeeper.connect=
> sppzookeeper.data-lake-dev.a.intuit.com:2181,
> sppzookeeper.data-lake-dev.a.intuit.com:2182,
> sppzookeeper.data-lake-dev.a.intuit.com:2183
>
> systems.kafka.consumer.security.protocol=SSL
>
> systems.kafka.consumer.ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
>
> systems.kafka.consumer.ssl.truststore.type=JKS
>
> systems.kafka.consumer.ssl.truststore.location=/home/appuser/spp/kabini.jks
>
> systems.kafka.consumer.ssl.truststore.password=Intuit01
>
> systems.kafka.consumer.bootstrap.servers=
> sppkafka.data-lake-dev.a.intuit.com:19701,
> sppkafka.data-lake-dev.a.intuit.com:19801,
> sppkafka.data-lake-dev.a.intuit.com:19901
>
> systems.kafka.zookeeper.connect=
> sppzookeeper.data-lake-dev.a.intuit.com:2181,
> sppzookeeper.data-lake-dev.a.intuit.com:2182,
> sppzookeeper.data-lake-dev.a.intuit.com:2183
>
> systems.kafka.security.protocol=SSL
>
> systems.kafka.ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
>
> systems.kafka.ssl.truststore.type=JKS
>
> systems.kafka.ssl.truststore.location=/home/appuser/spp/kabini.jks
>
> systems.kafka.ssl.truststore.password=Intuit01
>
> systems.kafka.bootstrap.servers=sppkafka.data-lake-dev.a.intuit.com:19701,
> sppkafka.data-lake-dev.a.intuit.com:19801,
> sppkafka.data-lake-dev.a.intuit.com:19901
>
> Thanks,
>
> Matt
>
>

Reply via email to