thailb-developer opened a new issue, #30000:
URL: https://github.com/apache/beam/issues/30000

   Hi everyone,
   
   _OS: macOS 14.2.1 (Sonoma)
   Processor: Apple M1
   Docker version 24.0.7, build afdd53b
   Docker Compose version v2.23.3-desktop.2_
   
   I am beginner at using Apache Beam with Flink Runner. I am trying to follow 
the document and run the pipeline with `FlinkRunner`. It see the pipeline is 
`DONE`. However, I see some error log of the task manager (Flink) as:
   
   ```
   taskmanager-1       | Jan 12, 2024 10:31:36 AM 
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise 
safeExecute
   taskmanager-1       | SEVERE: Failed to submit a listener notification task. 
Event loop shut down?
   taskmanager-1       | java.lang.NoClassDefFoundError: 
org/apache/beam/vendor/grpc/v1p54p0/io/netty/util/concurrent/GlobalEventExecutor$2
   taskmanager-1       |        at 
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.GlobalEventExecutor.startThread(GlobalEventExecutor.java:228)
   taskmanager-1       |        at 
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.GlobalEventExecutor.execute0(GlobalEventExecutor.java:216)
   taskmanager-1       |        at 
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.GlobalEventExecutor.execute(GlobalEventExecutor.java:210)
   taskmanager-1       |        at 
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:862)
   taskmanager-1       |        at 
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:500)
   taskmanager-1       |        at 
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
   taskmanager-1       |        at 
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625)
   taskmanager-1       |        at 
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise.setSuccess(DefaultPromise.java:97)
   taskmanager-1       |        at 
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:1059)
   taskmanager-1       |        at 
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
   taskmanager-1       |        at 
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
   taskmanager-1       |        at java.base/java.lang.Thread.run(Unknown 
Source)
   taskmanager-1       | Caused by: java.lang.ClassNotFoundException: 
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.GlobalEventExecutor$2
   taskmanager-1       |        at 
java.base/java.net.URLClassLoader.findClass(Unknown Source)
   taskmanager-1       |        at 
java.base/java.lang.ClassLoader.loadClass(Unknown Source)
   taskmanager-1       |        at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
   taskmanager-1       |        at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
   taskmanager-1       |        at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
   taskmanager-1       |        at 
java.base/java.lang.ClassLoader.loadClass(Unknown Source)
   taskmanager-1       |        ... 12 more
   ```
   
   By checking further, I also see this error log
   
   ```
   taskmanager-1       | 2024-01-12 10:31:35,885 WARN  
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory [] - Error 
cleaning up servers urn: "beam:env:external:v1"
   ```
   
   My worker pool also yield errors
   
   ```
   pythonworkerpool-1  | 2024/01/12 10:31:35 boot.go: error logging message 
over FnAPI. endpoint host.docker.internal:8101 error: EOF message follows
   pythonworkerpool-1  | 2024/01/12 10:31:35 DEBUG Received signal: terminated
   pythonworkerpool-1  | 2024/01/12 10:31:35 boot.go: error logging message 
over FnAPI. endpoint host.docker.internal:8101 error: EOF message follows
   pythonworkerpool-1  | 2024/01/12 10:31:35 ERROR 0
   pythonworkerpool-1  | 1
   pythonworkerpool-1  | 2
   pythonworkerpool-1  | 3
   pythonworkerpool-1  | 4
   pythonworkerpool-1  | 5
   pythonworkerpool-1  | 6
   pythonworkerpool-1  | 7
   pythonworkerpool-1  | 8
   pythonworkerpool-1  | 9
   pythonworkerpool-1  |
   pythonworkerpool-1  | 2024/01/12 10:31:35 boot.go: error logging message 
over FnAPI. endpoint host.docker.internal:8101 error: EOF message follows
   pythonworkerpool-1  | 2024/01/12 10:31:35 WARN Python (worker 1-1) exited 1 
times: signal: terminated
   pythonworkerpool-1  | restarting SDK process
   pythonworkerpool-1  | 2024/01/12 10:31:35 boot.go: error logging message 
over FnAPI. endpoint host.docker.internal:8101 error: EOF message follows
   pythonworkerpool-1  | 2024/01/12 10:31:35 DEBUG Cleaned up temporary venv 
for worker 1-1.
   ```
   
   NOTE: 1 to 9 is showing from the pipeline. It is correct.
   
   Here are my `docker-compose` file.
   
   ```
   version: '3.8'
   services:
     jobmanager:
       image: flink:1.16.3-java11
       networks:
         - flink-network
       ports:
         - "8081:8081"
       command: jobmanager
       environment:
         - |
           FLINK_PROPERTIES=jobmanager.rpc.address: jobmanager
         - BEAM_WORKER_POOL_IN_DOCKER_VM=1
         - DOCKER_MAC_CONTAINER=1
       volumes:
       - ./src:/tmp/src
       - ./out:/tmp/out
   
     taskmanager:
       image: flink:1.16.3-java11
       networks:
         - flink-network
       ports:
         - "8100-8200:8100-8200"
       depends_on:
         - jobmanager
         - pythonworkerpool
       command: taskmanager
       scale: 1
       environment:
         - |
           FLINK_PROPERTIES=jobmanager.rpc.address: jobmanager
           taskmanager.numberOfTaskSlots: 2
         - BEAM_WORKER_POOL_IN_DOCKER_VM=1
         - DOCKER_MAC_CONTAINER=1
       volumes:
       - ./src:/tmp/src
       - ./out:/tmp/out
   
     pythonworkerpool:
       image: apache/beam_python3.11_sdk:latest
       networks:
         - flink-network
       entrypoint: /opt/apache/beam/boot
       command: --worker_pool
       volumes:
         - ./src:/tmp/src
         - ./out:/tmp/out
   
   
   networks:
     flink-network:
       name: flink-network
   
   ``
   
   And here is my dummy pipeline (Copy it from another person on internet)
   
   ```Python
   import argparse
   import logging
   from typing import Tuple, Optional, TypeVar
   
   import apache_beam as beam
   from apache_beam.options.pipeline_options import PipelineOptions
   
   T = TypeVar("T")
   
   
   @beam.typehints.with_input_types(
       element=T, duration=float, variation=Optional[Tuple[float, float]]
   )
   @beam.typehints.with_output_types(T)
   class SleepFn(beam.DoFn):
       def process(self, element, duration=0.5, variation=None, **kwargs):
           import time
           import random
   
           if variation:
               duration += random.uniform(*variation)
           time.sleep(duration)
           yield element
   
   
   def main(options=None):
       with beam.Pipeline(options=options) as pipe:
           (
               pipe
               | "Init" >> beam.Create(range(10))
               | "Sleep" >> beam.ParDo(SleepFn(), duration=1.0)
               | "Log" >> beam.Map(print)
           )
   
   
   if __name__ == "__main__":
       logging.getLogger().setLevel(logging.ERROR)
       parser = argparse.ArgumentParser()
       _, args = parser.parse_known_args()
       options = PipelineOptions(args)
       main(options=options)
   
   ```
   
   I tried to search over the internet but can't find any clue how to fix it. I 
hope I can seek your help here. Much appreciate.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to