[
https://issues.apache.org/jira/browse/BEAM-12494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Boyuan Zhang updated BEAM-12494:
--------------------------------
Resolution: Won't Fix
Status: Resolved (was: Open)
> Dataflow Kafka Job not triggering for external subnet
> -----------------------------------------------------
>
> Key: BEAM-12494
> URL: https://issues.apache.org/jira/browse/BEAM-12494
> Project: Beam
> Issue Type: Bug
> Components: io-java-kafka
> Affects Versions: 2.28.0
> Environment: IntelliJ community version, Maven, Windows, Dataflow
> version 2.28.0
> Reporter: Jasminder pal singh sehgal
> Priority: P2
> Fix For: Not applicable
>
> Attachments: CodeSnippet.JPG, LogsStreamingEngine.txt,
> SuccessfulJobRun-KafkaIngestion.txt, TimeOutLogs_KafkaIngestion.txt,
> image-2021-06-16-16-54-25-161.png, image-2021-06-16-16-55-57-509.png,
> image-2021-06-20-22-20-24-363.png, image-2021-06-20-22-23-14-052.png,
> image-2021-06-21-15-00-09-851.png
>
>
> Hello,
> Our team is facing an issue in streaming the Dataflow Kafka job through
> IntelliJ that is hosted on a private subnet.
> The hypothesis is that during Graph Construction time [0], the beam locally
> tries to execute the code and check all the connections. In our case, we
> don't have access to subnet through IntelliJ or through the Cloud console. We
> do have access when compute engine instance is created within that subnet.
> We reached out to Google support and they suggested us to raise a defect with
> u. The following code throws *time-out* error when we execute through
> IntelliJ.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.<String, String>read()
> .withConsumerConfigUpdates(propertyBuilder)
> .withConsumerConfigUpdates(
> ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG,
> "my-consumer-group")
> )
> .withBootstrapServers(options.getBootstrapServers())
> .withTopics(topicsList)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .commitOffsetsInFinalize()
> // .withMaxNumRecords(5)
> )
> {code}
> But, if we uncomment
> {code:java}
> .withMaxNumRecords(){code}
> The code works perfectly and we are able to spin up dataflow job in the
> desired subnet to ingest the Kafka stream.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.<String, String>read()
> .withConsumerConfigUpdates(propertyBuilder)
> .withConsumerConfigUpdates(
> ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG,
> "my-consumer-group")
> )
> .withBootstrapServers(options.getBootstrapServers())
> .withTopics(topicsList)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .commitOffsetsInFinalize()
> .withMaxNumRecords(5)
> )
> {code}
> The issue with the above Code is that the Dataflow will stop after ingesting
> the given number of records and will act like Batch ingestion, instead of
> Streaming, which we don't want.
> *Google support team hypothesis:*
> Current hypothesis is that the issue is happening in
> `KakfaUnboundedSource.split()` [1] which fails due to unable to get Topic
> information.
> The first point is, `withMaxNumRecords` is used for testing [2] and when
> specified, the unbounded nature of the pipeline is converted into bounded
> read in `BoundedReadFromUnboundedSource` [3] but without the
> `withMaxNumRecords` the pipeline is still unbounded.
> When the pipeline is Bounded (when mentioning withMaxNumRecords) the
> `split()` happens in Dataflow worker in `SplitFn` [4]. Since, it ran on
> Dataflow, it did not have issue connecting to Kafka.
> But, when the pipeline is Unbounded (withMaxNumRecords commented out) the
> `split()` is called when the pipeline is built locally at graph construction
> phase [5][6] at which point it does not have access to Kafka.
>
> [0]
>
> [https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job]
> [1]
>
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L57]
> [2]
> [https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withMaxNumRecords-long-]
> [3]
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L191-L193]
> [4]
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L168-L169]
> [5]
> [https://github.com/apache/beam/blob/v2.28.0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java#L87]
> [6]
> [https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job]
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)