Jasminder pal singh sehgal created BEAM-12494:
-------------------------------------------------

             Summary: Dataflow Kafka Job not triggering for external subnet
                 Key: BEAM-12494
                 URL: https://issues.apache.org/jira/browse/BEAM-12494
             Project: Beam
          Issue Type: Bug
          Components: io-java-kafka
    Affects Versions: 2.28.0
         Environment: IntelliJ community version, Maven, Windows, Dataflow 
version 2.28.0
            Reporter: Jasminder pal singh sehgal
             Fix For: Not applicable
         Attachments: SuccessfulJobRun-KafkaIngestion.txt, 
TimeOutLogs_KafkaIngestion.txt

Hello,

Our team is facing an issue in streaming the Dataflow Kafka job through 
IntelliJ that is hosted on a private subnet. 

The hypothesis is that during Graph Construction time [0], the beam locally 
tries to execute the code and check all the connections. In our case, we don't 
have access to subnet through IntelliJ or through the Cloud console. We do have 
access when compute engine instance is created within that subnet.

We reached out to Google support and they suggested us to raise a defect with 
u. The following code throws *time-out* error when we execute through IntelliJ.
{code:java}
pipeline.apply("Read Kafka", KafkaIO.<String, String>read()
        .withConsumerConfigUpdates(propertyBuilder)
        .withConsumerConfigUpdates(
                ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
"my-consumer-group")
        )
        .withBootstrapServers(options.getBootstrapServers())
        .withTopics(topicsList)
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializer(StringDeserializer.class)
        .commitOffsetsInFinalize()
       // .withMaxNumRecords(5)
)
{code}
But, if we uncomment
{code:java}
.withMaxNumRecords(){code}
The code works perfectly and we are able to spin up dataflow job in the desired 
subnet to ingest the Kafka stream.
{code:java}
pipeline.apply("Read Kafka", KafkaIO.<String, String>read()
        .withConsumerConfigUpdates(propertyBuilder)
        .withConsumerConfigUpdates(
                ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
"my-consumer-group")
        )
        .withBootstrapServers(options.getBootstrapServers())
        .withTopics(topicsList)
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializer(StringDeserializer.class)
        .commitOffsetsInFinalize()
        .withMaxNumRecords(5)
)
{code}
The issue with the above Code is that the Dataflow will stop after ingesting 
the given number of records and will act like Batch ingestion, instead of 
Streaming, which we don't want.

 

Google support team hypothesis:

Current hypothesis is that the issue is happening in 
`KakfaUnboundedSource.split()` [1] which fails due to unable to get Topic 
information.

First point is, `withMaxNumRecords` is used for testing [2] and when specified, 
the unbounded nature of pipeline is converted into bounded read in 
`BoundedReadFromUnboundedSource` [3] but without the `withMaxNumRecords` the 
pipeline is still unbounded.

When the pipeline is Bounded (when mentioning withMaxNumRecords) the `split()` 
happens in Dataflow worker in `SplitFn` [4]. Since, it ran on Dataflow, it did 
not have issue connecting to Kafka.

But, when the pipeline is Unbounded (withMaxNumRecords commented out) the 
`split()` is called when the pipeline is built locally at graph construction 
phase [5][6] at which point it does not have access to Kafka.

 

[0]
 
https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job
 
[1]
https://github.com/apache/beam/blob/v2.28.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L57
[2] 
https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withMaxNumRecords-long-
[3] 
https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L191-L193
[4] 
https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L168-L169
[5] 
https://github.com/apache/beam/blob/v2.28.0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java#L87
[6] 
[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job]
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to