arunpandianp commented on PR #34258:
URL: https://github.com/apache/beam/pull/34258#issuecomment-2799855776

   Pipeline submission fails, when configuring KafkaIO as below. The machine 
submitting the job does not have access to the kafka cluster. The job works on 
2.64.0 and fails on 2.65.0-SNAPSHOT. Likely this change broke it.
   
   ```
   PCollection<KafkaRecord<String, String>> kafkaMessages =
           pipeline.apply(
               "Read from Kafka",
               KafkaIO.<String, String>read()
                   .withBootstrapServers(options.getBootstrapServers())
                   .withTopicPartitions(
                       IntStream.range(0, 100)
                           .mapToObj(i -> new 
TopicPartition(options.getKafkaTopic(), i))
                           .collect(Collectors.toList()))
                   .withGCPApplicationDefaultCredentials()
                   .withKeyDeserializer(StringDeserializer.class)
                   .withValueDeserializer(StringDeserializer.class)
                   .withProcessingTime())
   ```
   
   
   ```
   Exception in thread "main" java.lang.RuntimeException: 
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at 
org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:55)
        at 
org.apache.beam.runners.dataflow.DataflowRunner$StreamingUnboundedRead$ReadWithIdsTranslator.translate(DataflowRunner.java:2347)
        at 
org.apache.beam.runners.dataflow.DataflowRunner$StreamingUnboundedRead$ReadWithIdsTranslator.translate(DataflowRunner.java:2342)
        at 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:503)
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:593)
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:240)
        at 
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214)
        at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:477)
        at 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:435)
        at 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:189)
        at 
org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:1385)
        at 
org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:216)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
        at <testclass>.main(testclass)
   Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka 
consumer
        at 
org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.<init>(LegacyKafkaConsumer.java:264)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerDelegateCreator.create(ConsumerDelegateCreator.java:65)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:600)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:595)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:543)
        at 
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.split(KafkaUnboundedSource.java:111)
        at 
org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:96)
        at 
org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:52)
        ... 18 more
   Caused by: org.apache.kafka.common.config.ConfigException: No resolvable 
bootstrap urls given in bootstrap.servers
        at 
org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:103)
        at 
org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:62)
        at 
org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:58)
        at 
org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.<init>(LegacyKafkaConsumer.java:183)
        ... 25 more
   
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to