Jerryporter opened a new issue, #32709:
URL: https://github.com/apache/beam/issues/32709

   ### What happened?
   
   I am working on a pipeline that integrates Apache Beam with Kafka, using 
Flink as the runner for the pipeline. The pipeline works fine for small, 
bounded datasets (e.g., the WordCount example), but when I switch to Kafka to 
process unbounded streams, no output is generated. Despite the pipeline 
executing without obvious errors, it fails to process Kafka messages.
   
   **QUESTION:**
   
   - Is there a specific configuration I need to ensure Beam's Kafka 
integration works with Flink?
   - Do I need to handle the unbounded stream differently, such as with 
specific Window or Trigger functions, to ensure each Kafka message is processed 
immediately?
   - What additional debugging steps should I take to determine why Kafka 
messages are not being consumed?
   - Could you please provide an example of Kafka working correctly with Flink?
   
   
   Kafka is running inside Docker, and the Flink cluster is started locally 
using start-cluster.sh. The pipeline runs successfully, but the Kafka messages 
are not consumed or processed as expected. Below is the relevant code and 
additional details.
   
   ```python
   class ParseKafkaMessage(beam.DoFn):
       def process(self, kafka_message):
           image_path = kafka_message[1].decode('utf-8')
           yield {"image_path": image_path}
   
   
   def map_message_image_path(result):
       return result['image_path']
   
   
   def print_message(record):
       print(f"Received message: {record.decode('utf8')}")
       return record 
   
   def run(argv=None, save_main_session=True):
       """
       Args:
         argv: Command line arguments defined for this example.
       """
       parser = argparse.ArgumentParser()
       parser.add_argument('--bootstrap_servers', dest='bootstrap_servers', 
required=True,
                            help='Kafka bootstrap servers.')
        parser.add_argument('--topic', dest='topic', required=True,
                         help='Kafka topic to consume from.')
        known_args, pipeline_args = parser.parse_known_args(argv)
   
       pipeline_options = PipelineOptions()
       logging.getLogger().setLevel(logging.INFO)
   
       pipeline_options.view_as(
           SetupOptions).save_main_session = save_main_session
       print('start!')
   
   
       CONSUMER_CONFIG = {
           "bootstrap.servers": "localhost:9092",
           # "auto.offset.reset": "earliest",
           'group.id': 'temp_group'
       }
   
       CONSUMER_TOPICS = ["test"]
   
       pipeline_options = beam.options.pipeline_options.PipelineOptions([
           "--runner=FlinkRunner",
            "--flink_master=localhost:8081",
            "--environment_type=LOOPBACK",
       ]
       )
   
       with beam.Pipeline(options=pipeline_options) as pipeline:
           consumed = (
               pipeline
               | ReadFromKafka(
                   consumer_config=CONSUMER_CONFIG,
                   topics=CONSUMER_TOPICS,
                   # max_num_records=3,
                   # max_read_time=60,
               )
               # | beam.Map(print))
               # | beam.Map(lambda record: 
convert_kafka_record_to_dictionary(record)))
               | beam.Map(lambda record: print_message(record)))
   
   if __name__ == '__main__':
       run("--output /home/root/beam-llm/kafka_output.txt --runner 
FlinkRunner".split())
   ```
   
   **Steps to reproduce:**
   
   1. Set up a local Flink cluster (start-cluster.sh) and a Kafka broker 
running in Docker.
   2. Create a Kafka topic (test) and publish some messages containing image 
paths.
   3. Implement the following Beam pipeline to consume and process messages 
from Kafka:
   
   **Expected behavior:**
   
   The Kafka messages should be consumed by the Beam pipeline, and the 
print_message() function should output the messages to the console. 
   
   
   **Environment:**
   Apache Beam version: [2.59.0]
   Flink version: [1.18.1]
   Kafka version: [docker:last]
   Runner: FlinkRunner
   
   
   **Actual behavior:**
   
   The pipeline starts without any errors. The Flink job is successfully 
submitted, but the Kafka messages are not consumed or processed, and no output 
is generated. I have verified that Kafka is working correctly and that messages 
are available in the topic. However, the Beam pipeline does not consume the 
messages as expected.
   
   
   **What I have tried:**
   
   - Kafka Consumer Testing: Kafka works as expected. I can produce and consume 
messages using standalone Kafka utilities (kafka-console-producer, 
kafka-console-consumer).
   - Flink Job Testing: The WordCount example runs successfully with 
FlinkRunner.
   - Print Statements: Added print_message() to print the Kafka messages, but 
nothing is printed.
   
   Log Files:
   ```log
   INFO:apache_beam.utils.subprocess_server:Using cached job server jar from 
https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.59.0/beam-sdks-java-io-expansion-service-2.59.0.jar
   INFO:root:Starting a JAR-based expansion service from JAR 
/home/chy/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.59.0.jar
 
   INFO:apache_beam.utils.subprocess_server:Starting service with ['java' 
'-jar' 
'/home/chy/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.59.0.jar'
 '51799' 
'--filesToStage=/home/chy/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.59.0.jar'
 '--alsoStartLoopbackWorker']
   INFO:apache_beam.utils.subprocess_server:WARNING: 
sun.reflect.Reflection.getCallerClass is not supported. This will impact 
performance.
   ......
   INFO:apache_beam.utils.subprocess_server:INFO: Source: Impulse (1/1) 
(359af0cc3ed2f4c625e558bebd61c64b_bc764cd8ddf7a0cff126f51c16239658_0_0) 
switched from RUNNING to FINISHED.
   INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0
   INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel 
for localhost:43293.
   INFO:apache_beam.runners.worker.sdk_worker:Control channel established.
   INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with 
unbounded number of workers.
   INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService 
control
   INFO:apache_beam.utils.subprocess_server:INFO: Beam Fn Control client 
connected with id 1-2
   INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder restoreState
   INFO:apache_beam.utils.subprocess_server:INFO: Finished to build heap keyed 
state-backend.
   INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder restoreState
   INFO:apache_beam.utils.subprocess_server:INFO: Finished to build heap keyed 
state-backend.
   INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder restoreState
   INFO:apache_beam.utils.subprocess_server:INFO: Finished to build heap keyed 
state-backend.
   INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder restoreState
   INFO:apache_beam.utils.subprocess_server:INFO: Finished to build heap keyed 
state-backend.
   INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend <init>
   INFO:apache_beam.utils.subprocess_server:INFO: Initializing heap keyed state 
backend with stream factory.
   INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend <init>
   INFO:apache_beam.utils.subprocess_server:INFO: Initializing heap keyed state 
backend with stream factory.
   INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend <init>
   INFO:apache_beam.utils.subprocess_server:INFO: Initializing heap keyed state 
backend with stream factory.
   INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend <init>
   INFO:apache_beam.utils.subprocess_server:INFO: Initializing heap keyed state 
backend with stream factory.
   INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:43 AM 
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory 
createEnvironment
   INFO:apache_beam.utils.subprocess_server:INFO: Still waiting for startup of 
environment apache/beam_java21_sdk:2.59.0 for worker id 1-1
   INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:48 AM 
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory 
createEnvironment
   INFO:apache_beam.utils.subprocess_server:INFO: Still waiting for startup of 
environment apache/beam_java21_sdk:2.59.0 for worker id 1-1
   INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:54 AM 
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory 
createEnvironment
   INFO:apache_beam.utils.subprocess_server:INFO: Still waiting for startup of 
environment apache/beam_java21_sdk:2.59.0 for worker id 1-1
   INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:59 AM 
   ....
   INFO:apache_beam.utils.subprocess_server:SEVERE: Docker container 
a969a3cff5433b0137518e12fa0b197d3390b381d9f66292373dc3ae72c889d5 logs:
   INFO:apache_beam.utils.subprocess_server:2024/10/08 03:32:38 Failed to 
obtain provisioning information: failed to dial server at localhost:34507
   INFO:apache_beam.utils.subprocess_server:    caused by:
   INFO:apache_beam.utils.subprocess_server:context deadline exceeded
   INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM 
org.apache.flink.runtime.taskmanager.Task transitionState
   INFO:apache_beam.utils.subprocess_server:WARNING: 
[3]ReadFromKafka(beam:transform:org.apache.beam:kafka_read_without_metadata:v1)/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
 KafkaIO.ReadSourceDescriptors} (1/4)#0 
(359af0cc3ed2f4c625e558bebd61c64b_24761c05670fa5069ce6a1b3d4c931eb_0_0) 
switched from INITIALIZING to FAILED with failure cause:
   
INFO:apache_beam.utils.subprocess_server:org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.IllegalStateException: No container running for id 
a969a3cff5433b0137518e12fa0b197d3390b381d9f66292373dc3ae72c889d5
   ....
   INFO:apache_beam.utils.subprocess_server:INFO: Stopping Pekko RPC service.
   INFO:apache_beam.runners.portability.portable_runner:Job state changed to 
FAILED
   INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM 
org.apache.pekko.event.slf4j.Slf4jLogger$$anonfun$receive$1 
$anonfun$applyOrElse$3
   INFO:apache_beam.utils.subprocess_server:INFO: Running CoordinatedShutdown 
with reason [ActorSystemTerminateReason]
   INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM 
org.apache.flink.runtime.rpc.pekko.PekkoRpcService closeAsync
   INFO:apache_beam.utils.subprocess_server:INFO: Stopping Pekko RPC service.
   INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM 
org.apache.flink.runtime.rpc.pekko.PekkoRpcService lambda$closeAsync$7
   INFO:apache_beam.utils.subprocess_server:INFO: Stopped Pekko RPC service.
   INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM 
org.apache.pekko.event.slf4j.Slf4jLogger$$anonfun$receive$1 
$anonfun$applyOrElse$3
   INFO:apache_beam.utils.subprocess_server:INFO: Running CoordinatedShutdown 
with reason [ActorSystemTerminateReason]
   INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM 
org.apache.flink.runtime.blob.AbstractBlobCache close
   INFO:apache_beam.utils.subprocess_server:INFO: Shutting down BLOB cache
   INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM 
org.apache.flink.runtime.blob.AbstractBlobCache close
   INFO:apache_beam.utils.subprocess_server:INFO: Shutting down BLOB cache
   INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM 
org.apache.flink.runtime.blob.BlobServer close
   INFO:apache_beam.utils.subprocess_server:INFO: Stopped BLOB server at 
0.0.0.0:43151
   INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM 
org.apache.flink.runtime.rpc.pekko.PekkoRpcService lambda$closeAsync$7
   INFO:apache_beam.utils.subprocess_server:INFO: Stopped Pekko RPC service.
   start!
   Traceback (most recent call last):
     File "/home/chy/beam-llm/kafka-beam.py", line 196, in <module>
       run()
     File "/home/chy/beam-llm/kafka-beam.py", line 176, in run
       with beam.Pipeline(options=pipeline_options) as pipeline:
     File 
"/home/chy/beam-llm/beam/lib/python3.10/site-packages/apache_beam/pipeline.py", 
line 621, in __exit__
       self.result.wait_until_finish()
     File 
"/home/chy/beam-llm/beam/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py",
 line 568, in wait_until_finish
       raise self._runtime_exception
   RuntimeError: Pipeline 
BeamApp-chy-1008033013-53c548ce_c23a0db3-f61e-41fb-b989-f867c0923852 failed in 
state FAILED: java.lang.IllegalStateException: No container running for id 
a969a3cff5433b0137518e12fa0b197d3390b381d9f66292373dc3ae72c889d5
   ```
   
   
   **It seems that the Docker container did not start, but in fact, when 
checking with docker ps, it shows that Flink has started a container and it is 
running. However, it appears that this container did not execute the task 
correctly.**
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [X] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to