arunpandianp commented on PR #34258:
URL: https://github.com/apache/beam/pull/34258#issuecomment-2799855776
Pipeline submission fails, when configuring KafkaIO as below. The machine
submitting the job does not have access to the kafka cluster. The job works on
2.64.0 and fails on 2.65.0-SNAPSHOT. Likely this change broke it.
```
PCollection<KafkaRecord<String, String>> kafkaMessages =
pipeline.apply(
"Read from Kafka",
KafkaIO.<String, String>read()
.withBootstrapServers(options.getBootstrapServers())
.withTopicPartitions(
IntStream.range(0, 100)
.mapToObj(i -> new
TopicPartition(options.getKafkaTopic(), i))
.collect(Collectors.toList()))
.withGCPApplicationDefaultCredentials()
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withProcessingTime())
```
```
Exception in thread "main" java.lang.RuntimeException:
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at
org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:55)
at
org.apache.beam.runners.dataflow.DataflowRunner$StreamingUnboundedRead$ReadWithIdsTranslator.translate(DataflowRunner.java:2347)
at
org.apache.beam.runners.dataflow.DataflowRunner$StreamingUnboundedRead$ReadWithIdsTranslator.translate(DataflowRunner.java:2342)
at
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:503)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:593)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:240)
at
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:477)
at
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:435)
at
org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:189)
at
org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:1385)
at
org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:216)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
at <testclass>.main(testclass)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka
consumer
at
org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.<init>(LegacyKafkaConsumer.java:264)
at
org.apache.kafka.clients.consumer.internals.ConsumerDelegateCreator.create(ConsumerDelegateCreator.java:65)
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:600)
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:595)
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:543)
at
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.split(KafkaUnboundedSource.java:111)
at
org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:96)
at
org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:52)
... 18 more
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable
bootstrap urls given in bootstrap.servers
at
org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:103)
at
org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:62)
at
org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:58)
at
org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.<init>(LegacyKafkaConsumer.java:183)
... 25 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]