thailb-developer opened a new issue, #30000:
URL: https://github.com/apache/beam/issues/30000
Hi everyone,
_OS: macOS 14.2.1 (Sonoma)
Processor: Apple M1
Docker version 24.0.7, build afdd53b
Docker Compose version v2.23.3-desktop.2_
I am beginner at using Apache Beam with Flink Runner. I am trying to follow
the document and run the pipeline with `FlinkRunner`. It see the pipeline is
`DONE`. However, I see some error log of the task manager (Flink) as:
```
taskmanager-1 | Jan 12, 2024 10:31:36 AM
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise
safeExecute
taskmanager-1 | SEVERE: Failed to submit a listener notification task.
Event loop shut down?
taskmanager-1 | java.lang.NoClassDefFoundError:
org/apache/beam/vendor/grpc/v1p54p0/io/netty/util/concurrent/GlobalEventExecutor$2
taskmanager-1 | at
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.GlobalEventExecutor.startThread(GlobalEventExecutor.java:228)
taskmanager-1 | at
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.GlobalEventExecutor.execute0(GlobalEventExecutor.java:216)
taskmanager-1 | at
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.GlobalEventExecutor.execute(GlobalEventExecutor.java:210)
taskmanager-1 | at
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:862)
taskmanager-1 | at
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:500)
taskmanager-1 | at
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
taskmanager-1 | at
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625)
taskmanager-1 | at
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise.setSuccess(DefaultPromise.java:97)
taskmanager-1 | at
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:1059)
taskmanager-1 | at
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
taskmanager-1 | at
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
taskmanager-1 | at java.base/java.lang.Thread.run(Unknown
Source)
taskmanager-1 | Caused by: java.lang.ClassNotFoundException:
org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.GlobalEventExecutor$2
taskmanager-1 | at
java.base/java.net.URLClassLoader.findClass(Unknown Source)
taskmanager-1 | at
java.base/java.lang.ClassLoader.loadClass(Unknown Source)
taskmanager-1 | at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
taskmanager-1 | at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
taskmanager-1 | at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
taskmanager-1 | at
java.base/java.lang.ClassLoader.loadClass(Unknown Source)
taskmanager-1 | ... 12 more
```
By checking further, I also see this error log
```
taskmanager-1 | 2024-01-12 10:31:35,885 WARN
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory [] - Error
cleaning up servers urn: "beam:env:external:v1"
```
My worker pool also yield errors
```
pythonworkerpool-1 | 2024/01/12 10:31:35 boot.go: error logging message
over FnAPI. endpoint host.docker.internal:8101 error: EOF message follows
pythonworkerpool-1 | 2024/01/12 10:31:35 DEBUG Received signal: terminated
pythonworkerpool-1 | 2024/01/12 10:31:35 boot.go: error logging message
over FnAPI. endpoint host.docker.internal:8101 error: EOF message follows
pythonworkerpool-1 | 2024/01/12 10:31:35 ERROR 0
pythonworkerpool-1 | 1
pythonworkerpool-1 | 2
pythonworkerpool-1 | 3
pythonworkerpool-1 | 4
pythonworkerpool-1 | 5
pythonworkerpool-1 | 6
pythonworkerpool-1 | 7
pythonworkerpool-1 | 8
pythonworkerpool-1 | 9
pythonworkerpool-1 |
pythonworkerpool-1 | 2024/01/12 10:31:35 boot.go: error logging message
over FnAPI. endpoint host.docker.internal:8101 error: EOF message follows
pythonworkerpool-1 | 2024/01/12 10:31:35 WARN Python (worker 1-1) exited 1
times: signal: terminated
pythonworkerpool-1 | restarting SDK process
pythonworkerpool-1 | 2024/01/12 10:31:35 boot.go: error logging message
over FnAPI. endpoint host.docker.internal:8101 error: EOF message follows
pythonworkerpool-1 | 2024/01/12 10:31:35 DEBUG Cleaned up temporary venv
for worker 1-1.
```
NOTE: 1 to 9 is showing from the pipeline. It is correct.
Here are my `docker-compose` file.
```
version: '3.8'
services:
jobmanager:
image: flink:1.16.3-java11
networks:
- flink-network
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=jobmanager.rpc.address: jobmanager
- BEAM_WORKER_POOL_IN_DOCKER_VM=1
- DOCKER_MAC_CONTAINER=1
volumes:
- ./src:/tmp/src
- ./out:/tmp/out
taskmanager:
image: flink:1.16.3-java11
networks:
- flink-network
ports:
- "8100-8200:8100-8200"
depends_on:
- jobmanager
- pythonworkerpool
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
- BEAM_WORKER_POOL_IN_DOCKER_VM=1
- DOCKER_MAC_CONTAINER=1
volumes:
- ./src:/tmp/src
- ./out:/tmp/out
pythonworkerpool:
image: apache/beam_python3.11_sdk:latest
networks:
- flink-network
entrypoint: /opt/apache/beam/boot
command: --worker_pool
volumes:
- ./src:/tmp/src
- ./out:/tmp/out
networks:
flink-network:
name: flink-network
``
And here is my dummy pipeline (Copy it from another person on internet)
```Python
import argparse
import logging
from typing import Tuple, Optional, TypeVar
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
T = TypeVar("T")
@beam.typehints.with_input_types(
element=T, duration=float, variation=Optional[Tuple[float, float]]
)
@beam.typehints.with_output_types(T)
class SleepFn(beam.DoFn):
def process(self, element, duration=0.5, variation=None, **kwargs):
import time
import random
if variation:
duration += random.uniform(*variation)
time.sleep(duration)
yield element
def main(options=None):
with beam.Pipeline(options=options) as pipe:
(
pipe
| "Init" >> beam.Create(range(10))
| "Sleep" >> beam.ParDo(SleepFn(), duration=1.0)
| "Log" >> beam.Map(print)
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.ERROR)
parser = argparse.ArgumentParser()
_, args = parser.parse_known_args()
options = PipelineOptions(args)
main(options=options)
```
I tried to search over the internet but can't find any clue how to fix it. I
hope I can seek your help here. Much appreciate.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]