Mauro Riva created BEAM-11993:
---------------------------------

             Summary: ReadFromKafka doesn’t send data to the next PTransform –  
Apache Flink "Cluster" – Apache Beam Python SDK
                 Key: BEAM-11993
                 URL: https://issues.apache.org/jira/browse/BEAM-11993
             Project: Beam
          Issue Type: Bug
          Components: beam-model, cross-language, io-py-kafka, runner-flink
    Affects Versions: 2.28.0, 2.27.0, 2.26.0
            Reporter: Mauro Riva


I am trying to build a streaming pipeline using Python. The pipeline should 
subscribe to a Kafka topic and process the data on the fly. I am using the 
following configuration:
{code:java}
class PrintFn(beam.DoFn):
    def __init__(self, label):
        self.label = label    
    def process(self, element, timestamp=beam.DoFn.TimestampParam, 
window=beam.DoFn.WindowParam):
        logging.info("[%s]: %s %s %s", self.label, element, window, timestamp)
        yield element

[...]

pipeline_args = [ 
 "--job_endpoint=localhost:8099", 
 "--runner=PortableRunner" , 
 "--environment_type=DOCKER", 
 "--environment_config=gcr.io/xxxx/beam_python3.7_sdk:v2.28.0-custom", 
 "--enable_streaming_engine"
] 
pipeline_options = PipelineOptions(pipeline_args, save_main_session=True, 
streaming=True) 
DataPipeline = beam.Pipeline(options=pipeline_options) 
ReadData = ( 
   DataPipeline 
   | "ReadFromKafka" 
   >> ReadFromKafka( 
       consumer_config={ 
                 "bootstrap.servers": "10.0.1.40:9092", 
                 "auto.offset.reset":"latest" 
       }, 
       topics="beam_topic", 
       expansion_service="localhost:8097" 
      ) 
   | "Debug" 
   >> beam.ParDo(PrintFn(label="test")) 
)
{code}
and a Flink configuration with Job and Task managers. The pipeline is loaded, 
but as soon as it starts running, the task: 
{code:java}
Source: Impulse -> 
[3]ReadFromKafka/KafkaIO.Read/Read(KafkaUnboundedSource)/{ParDo(OutputSingleSource),
 ParDo(UnboundedSourceAsSDFWrapper)}
{code}
changes its status from RUNNING to FINISHED. The Kafka consumer remains 
subscribed and reports the following:
{code:java}
2021-03-16 16:10:54,628 INFO 
org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer 
clientId=consumer-Reader-0_offset_consumer_538555605_none-3, 
groupId=Reader-0_offset_consumer_538555605_none] Seeking to LATEST offset of 
partition topic_beam-0 

2021-03-16 16:10:54,629 INFO 
org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer 
clientId=consumer-Reader-0_offset_consumer_538555605_none-3, 
groupId=Reader-0_offset_consumer_538555605_none] Resetting offset for partition 
topic_beam-0 to offset 144. 

2021-03-16 16:10:55,628 INFO 
org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer 
clientId=consumer-Reader-0_offset_consumer_538555605_none-3, 
groupId=Reader-0_offset_consumer_538555605_none] Seeking to LATEST offset of 
partition topic_beam-0 

2021-03-16 16:10:55,629 INFO 
org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer 
clientId=consumer-Reader-0_offset_consumer_538555605_none-3, 
groupId=Reader-0_offset_consumer_538555605_none] Resetting offset for partition 
topic_beam-0 to offset 145.{code}
But it doesn’t send any data to the next task:
{code:java}
[3]ReadFromKafka/{KafkaIO.Read, Remove Kafka Metadata} -> [1]Debug
{code}
which remains in RUNNING mode.

Changing the configuration to:
{code:java}
| "ReadFromKafka"
>> ReadFromKafka( 
         consumer_config={ 
              "bootstrap.servers": "10.0.1.40:9092", 
              "auto.offset.reset":"earliest" 
         }, 
         topics="beam_topic", 
         max_num_records=10, 
         expansion_service="localhost:8097" 
) 
| "Debug" 
 >> beam.ParDo(PrintFn(label="test")) 
)
{code}
seems to work but only for the X (in the code = {color:#FF0000}10{color}) 
records that should be already available in the broker, and I get the info 
logging as expected:
{code:java}
2021-03-16 15:55:11,665 INFO apachebeam_pipeline.py:164 [] - [test]: (b'beam', 
b'{"type":"Buffer","data":[0,0,0,0,1,28,102]}') GlobalWindow 
Timestamp(1615910111.418000) 
 2021-03-16 15:55:11,665 INFO apachebeam_pipeline.py:164 [] - [test]: (b'beam', 
b'{"type":"Buffer","data":[0,0,0,0,1,28,102]}') GlobalWindow 
Timestamp(1615910111.418000)
{code}
After reading those messages, the complete pipeline (both mentioned tasks) 
changes its status to FINISHED (as expected).
  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to