max-lepikhin opened a new issue, #24367: URL: https://github.com/apache/beam/issues/24367
### What happened? 1. Per beam [docs](https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/), python code is expected to be passed to flink workers via "workflow.tar.gz", the path to local workflow.tar.gz (created by python Stager) is stored to ArtifactInformation proto in [here](https://github.com/apache/beam/blob/5d2dbf957e4e82fb3980726940df02ac67e563cd/sdks/python/apache_beam/runners/portability/stager.py#L289) with [FILE](https://github.com/apache/beam/blob/7dac3f5ef40b5d24b24d9ce5bec4717284260b85/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1426) type_urn. 2. When using flink job server, the workflow.tar.gz is staged to a /tmp directory in [reverseArtifactRetrievalService](https://github.com/apache/beam/blob/434427e90b55027c5944fa73de68bff4f9a4e8fe/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java#L386). 3. The path to the locally staged file is passed to Flink worker. When attempting to read the file, the job fails, for example: ``` java.io.FileNotFoundException: /tmp/beam-tempokzovntq/artifacts8dekjpv4/14df1e909b80d61fc4083e34c254f7dc5cd2386afabe8a5c8997a08136bf6a97/1-ref_Environment_default_e-workflow.tar.gz (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.<init>(FileInputStream.java:138) at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:128) at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:84) at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:256) at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:124) at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:99) at org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:315) at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182) at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86) at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:354) at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866) at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` 4. Beam's PipelineOptions [artifacts_dir](https://github.com/apache/beam/blob/883a362c930aca4298551697d7aaacbe7b6602f1/sdks/python/apache_beam/options/pipeline_options.py#L1428) suggests it is possible to stage artifacts in a cloud bucket/container. But: - The only available FileSystem is [local](https://github.com/apache/beam/blob/b5a0f4895ccf7bd2f8b03217d866e8d0c67f6f52/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L81) filesystem. - There is a [TODO](https://github.com/apache/beam/blob/b5a0f4895ccf7bd2f8b03217d866e8d0c67f6f52/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java#L71) to expose filesystem options in the Flink job server driver. - Test with "azfs://" failed with below: ``` I1127 22:20:24.765129 140273151465216 subprocess_server.py:126] Caused by: java.lang.IllegalArgumentException: No filesystem found for scheme azfs I1127 22:20:24.765187 140273151465216 subprocess_server.py:126] at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:518) I1127 22:20:24.765243 140273151465216 subprocess_server.py:126] at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:588) I1127 22:20:24.765300 140273151465216 subprocess_server.py:126] at org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$1.stagingDir(ArtifactStagingService.java:193) I1127 22:20:24.765356 140273151465216 subprocess_server.py:126] at org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$1.getDestination(ArtifactStagingService.java:169) I1127 22:20:24.765413 140273151465216 subprocess_server.py:126] at org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$StoreArtifact.call(ArtifactStagingService.java:271) I1127 22:20:24.765469 140273151465216 subprocess_server.py:126] at org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$StoreArtifact.call(ArtifactStagingService.java:247) I1127 22:20:24.765530 140273151465216 subprocess_server.py:126] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ``` As a side note, another bug is that ExternalEnvironmentFactory [keeps waiting](https://github.com/apache/beam/blob/434427e90b55027c5944fa73de68bff4f9a4e8fe/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java#L125) for the boot.go to become healthy even after it fails to read the workflow artifact (in [here](https://github.com/apache/beam/blob/5d2dbf957e4e82fb3980726940df02ac67e563cd/sdks/python/container/boot.go#L369)) and exits. Not sure if Flink can take embedded files as an input, that would be a good option for workflow.tar.gz. ### Issue Priority Priority: 1 ### Issue Component Component: runner-flink -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
