[ 
https://issues.apache.org/jira/browse/BEAM-12494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jasminder pal singh sehgal updated BEAM-12494:
----------------------------------------------
    Description: 
Hello,

Our team is facing an issue in streaming the Dataflow Kafka job through 
IntelliJ that is hosted on a private subnet. 

The hypothesis is that during Graph Construction time [0], the beam locally 
tries to execute the code and check all the connections. In our case, we don't 
have access to subnet through IntelliJ or through the Cloud console. We do have 
access when compute engine instance is created within that subnet.

We reached out to Google support and they suggested us to raise a defect with 
u. The following code throws *time-out* error when we execute through IntelliJ.
{code:java}
pipeline.apply("Read Kafka", KafkaIO.<String, String>read()
        .withConsumerConfigUpdates(propertyBuilder)
        .withConsumerConfigUpdates(
                ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
"my-consumer-group")
        )
        .withBootstrapServers(options.getBootstrapServers())
        .withTopics(topicsList)
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializer(StringDeserializer.class)
        .commitOffsetsInFinalize()
       // .withMaxNumRecords(5)
)
{code}
But, if we uncomment
{code:java}
.withMaxNumRecords(){code}
The code works perfectly and we are able to spin up dataflow job in the desired 
subnet to ingest the Kafka stream.
{code:java}
pipeline.apply("Read Kafka", KafkaIO.<String, String>read()
        .withConsumerConfigUpdates(propertyBuilder)
        .withConsumerConfigUpdates(
                ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
"my-consumer-group")
        )
        .withBootstrapServers(options.getBootstrapServers())
        .withTopics(topicsList)
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializer(StringDeserializer.class)
        .commitOffsetsInFinalize()
        .withMaxNumRecords(5)
)
{code}
The issue with the above Code is that the Dataflow will stop after ingesting 
the given number of records and will act like Batch ingestion, instead of 
Streaming, which we don't want.

*Google support team hypothesis:*

Current hypothesis is that the issue is happening in 
`KakfaUnboundedSource.split()` [1] which fails due to unable to get Topic 
information.

The first point is, `withMaxNumRecords` is used for testing [2] and when 
specified, the unbounded nature of the pipeline is converted into bounded read 
in `BoundedReadFromUnboundedSource` [3] but without the `withMaxNumRecords` the 
pipeline is still unbounded.

When the pipeline is Bounded (when mentioning withMaxNumRecords) the `split()` 
happens in Dataflow worker in `SplitFn` [4]. Since, it ran on Dataflow, it did 
not have issue connecting to Kafka.

But, when the pipeline is Unbounded (withMaxNumRecords commented out) the 
`split()` is called when the pipeline is built locally at graph construction 
phase [5][6] at which point it does not have access to Kafka.

 

[0]
  
[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job]
  [1]
 
[https://github.com/apache/beam/blob/v2.28.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L57]
 [2] 
[https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withMaxNumRecords-long-]
 [3] 
[https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L191-L193]
 [4] 
[https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L168-L169]
 [5] 
[https://github.com/apache/beam/blob/v2.28.0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java#L87]
 [6] 
[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job]
  

  was:
Hello,

Our team is facing an issue in streaming the Dataflow Kafka job through 
IntelliJ that is hosted on a private subnet. 

The hypothesis is that during Graph Construction time [0], the beam locally 
tries to execute the code and check all the connections. In our case, we don't 
have access to subnet through IntelliJ or through the Cloud console. We do have 
access when compute engine instance is created within that subnet.

We reached out to Google support and they suggested us to raise a defect with 
u. The following code throws *time-out* error when we execute through IntelliJ.
{code:java}
pipeline.apply("Read Kafka", KafkaIO.<String, String>read()
        .withConsumerConfigUpdates(propertyBuilder)
        .withConsumerConfigUpdates(
                ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
"my-consumer-group")
        )
        .withBootstrapServers(options.getBootstrapServers())
        .withTopics(topicsList)
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializer(StringDeserializer.class)
        .commitOffsetsInFinalize()
       // .withMaxNumRecords(5)
)
{code}
But, if we uncomment
{code:java}
.withMaxNumRecords(){code}
The code works perfectly and we are able to spin up dataflow job in the desired 
subnet to ingest the Kafka stream.
{code:java}
pipeline.apply("Read Kafka", KafkaIO.<String, String>read()
        .withConsumerConfigUpdates(propertyBuilder)
        .withConsumerConfigUpdates(
                ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
"my-consumer-group")
        )
        .withBootstrapServers(options.getBootstrapServers())
        .withTopics(topicsList)
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializer(StringDeserializer.class)
        .commitOffsetsInFinalize()
        .withMaxNumRecords(5)
)
{code}
The issue with the above Code is that the Dataflow will stop after ingesting 
the given number of records and will act like Batch ingestion, instead of 
Streaming, which we don't want.

 

Google support team hypothesis:

Current hypothesis is that the issue is happening in 
`KakfaUnboundedSource.split()` [1] which fails due to unable to get Topic 
information.

First point is, `withMaxNumRecords` is used for testing [2] and when specified, 
the unbounded nature of pipeline is converted into bounded read in 
`BoundedReadFromUnboundedSource` [3] but without the `withMaxNumRecords` the 
pipeline is still unbounded.

When the pipeline is Bounded (when mentioning withMaxNumRecords) the `split()` 
happens in Dataflow worker in `SplitFn` [4]. Since, it ran on Dataflow, it did 
not have issue connecting to Kafka.

But, when the pipeline is Unbounded (withMaxNumRecords commented out) the 
`split()` is called when the pipeline is built locally at graph construction 
phase [5][6] at which point it does not have access to Kafka.

 

[0]
 
https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job
 
[1]
https://github.com/apache/beam/blob/v2.28.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L57
[2] 
https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withMaxNumRecords-long-
[3] 
https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L191-L193
[4] 
https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L168-L169
[5] 
https://github.com/apache/beam/blob/v2.28.0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java#L87
[6] 
[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job]
 


> Dataflow Kafka Job not triggering for external subnet
> -----------------------------------------------------
>
>                 Key: BEAM-12494
>                 URL: https://issues.apache.org/jira/browse/BEAM-12494
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka
>    Affects Versions: 2.28.0
>         Environment: IntelliJ community version, Maven, Windows, Dataflow 
> version 2.28.0
>            Reporter: Jasminder pal singh sehgal
>            Priority: P2
>             Fix For: Not applicable
>
>         Attachments: CodeSnippet.JPG, SuccessfulJobRun-KafkaIngestion.txt, 
> TimeOutLogs_KafkaIngestion.txt
>
>
> Hello,
> Our team is facing an issue in streaming the Dataflow Kafka job through 
> IntelliJ that is hosted on a private subnet. 
> The hypothesis is that during Graph Construction time [0], the beam locally 
> tries to execute the code and check all the connections. In our case, we 
> don't have access to subnet through IntelliJ or through the Cloud console. We 
> do have access when compute engine instance is created within that subnet.
> We reached out to Google support and they suggested us to raise a defect with 
> u. The following code throws *time-out* error when we execute through 
> IntelliJ.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.<String, String>read()
>         .withConsumerConfigUpdates(propertyBuilder)
>         .withConsumerConfigUpdates(
>                 ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
> "my-consumer-group")
>         )
>         .withBootstrapServers(options.getBootstrapServers())
>         .withTopics(topicsList)
>         .withKeyDeserializer(StringDeserializer.class)
>         .withValueDeserializer(StringDeserializer.class)
>         .commitOffsetsInFinalize()
>        // .withMaxNumRecords(5)
> )
> {code}
> But, if we uncomment
> {code:java}
> .withMaxNumRecords(){code}
> The code works perfectly and we are able to spin up dataflow job in the 
> desired subnet to ingest the Kafka stream.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.<String, String>read()
>         .withConsumerConfigUpdates(propertyBuilder)
>         .withConsumerConfigUpdates(
>                 ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
> "my-consumer-group")
>         )
>         .withBootstrapServers(options.getBootstrapServers())
>         .withTopics(topicsList)
>         .withKeyDeserializer(StringDeserializer.class)
>         .withValueDeserializer(StringDeserializer.class)
>         .commitOffsetsInFinalize()
>         .withMaxNumRecords(5)
> )
> {code}
> The issue with the above Code is that the Dataflow will stop after ingesting 
> the given number of records and will act like Batch ingestion, instead of 
> Streaming, which we don't want.
> *Google support team hypothesis:*
> Current hypothesis is that the issue is happening in 
> `KakfaUnboundedSource.split()` [1] which fails due to unable to get Topic 
> information.
> The first point is, `withMaxNumRecords` is used for testing [2] and when 
> specified, the unbounded nature of the pipeline is converted into bounded 
> read in `BoundedReadFromUnboundedSource` [3] but without the 
> `withMaxNumRecords` the pipeline is still unbounded.
> When the pipeline is Bounded (when mentioning withMaxNumRecords) the 
> `split()` happens in Dataflow worker in `SplitFn` [4]. Since, it ran on 
> Dataflow, it did not have issue connecting to Kafka.
> But, when the pipeline is Unbounded (withMaxNumRecords commented out) the 
> `split()` is called when the pipeline is built locally at graph construction 
> phase [5][6] at which point it does not have access to Kafka.
>  
> [0]
>   
> [https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job]
>   [1]
>  
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L57]
>  [2] 
> [https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withMaxNumRecords-long-]
>  [3] 
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L191-L193]
>  [4] 
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L168-L169]
>  [5] 
> [https://github.com/apache/beam/blob/v2.28.0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java#L87]
>  [6] 
> [https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job]
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to