max-lepikhin opened a new issue, #24367:
URL: https://github.com/apache/beam/issues/24367

   ### What happened?
   
   1. Per beam 
[docs](https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/),
 python code is expected to be passed to flink workers via "workflow.tar.gz", 
the path to local workflow.tar.gz (created by python Stager) is stored to 
ArtifactInformation proto in 
[here](https://github.com/apache/beam/blob/5d2dbf957e4e82fb3980726940df02ac67e563cd/sdks/python/apache_beam/runners/portability/stager.py#L289)
 with 
[FILE](https://github.com/apache/beam/blob/7dac3f5ef40b5d24b24d9ce5bec4717284260b85/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1426)
 type_urn.
   
   2. When using flink job server, the workflow.tar.gz is staged to a /tmp 
directory in 
[reverseArtifactRetrievalService](https://github.com/apache/beam/blob/434427e90b55027c5944fa73de68bff4f9a4e8fe/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java#L386).
   
   3. The path to the locally staged file is passed to Flink worker. When 
attempting to read the file, the job fails, for example:
   ```
   java.io.FileNotFoundException: 
/tmp/beam-tempokzovntq/artifacts8dekjpv4/14df1e909b80d61fc4083e34c254f7dc5cd2386afabe8a5c8997a08136bf6a97/1-ref_Environment_default_e-workflow.tar.gz
 (No such file or directory)
        at java.io.FileInputStream.open0(Native Method)
        at java.io.FileInputStream.open(FileInputStream.java:195)
        at java.io.FileInputStream.<init>(FileInputStream.java:138)
        at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:128)
        at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:84)
        at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:256)
        at 
org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:124)
        at 
org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:99)
        at 
org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:315)
        at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
        at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
        at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
        at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
        at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
        at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:354)
        at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
        at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   ```
   4. Beam's PipelineOptions 
[artifacts_dir](https://github.com/apache/beam/blob/883a362c930aca4298551697d7aaacbe7b6602f1/sdks/python/apache_beam/options/pipeline_options.py#L1428)
 suggests it is possible to stage artifacts in a cloud bucket/container. But:
      - The only available FileSystem is 
[local](https://github.com/apache/beam/blob/b5a0f4895ccf7bd2f8b03217d866e8d0c67f6f52/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L81)
 filesystem.
      - There is a 
[TODO](https://github.com/apache/beam/blob/b5a0f4895ccf7bd2f8b03217d866e8d0c67f6f52/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java#L71)
 to expose filesystem options in the Flink job server driver.
      - Test with "azfs://" failed with below:
   ```
   I1127 22:20:24.765129 140273151465216 subprocess_server.py:126] Caused by: 
java.lang.IllegalArgumentException: No filesystem found for scheme azfs
   I1127 22:20:24.765187 140273151465216 subprocess_server.py:126]      at 
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:518)
   I1127 22:20:24.765243 140273151465216 subprocess_server.py:126]      at 
org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:588)
   I1127 22:20:24.765300 140273151465216 subprocess_server.py:126]      at 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$1.stagingDir(ArtifactStagingService.java:193)
   I1127 22:20:24.765356 140273151465216 subprocess_server.py:126]      at 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$1.getDestination(ArtifactStagingService.java:169)
   I1127 22:20:24.765413 140273151465216 subprocess_server.py:126]      at 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$StoreArtifact.call(ArtifactStagingService.java:271)
   I1127 22:20:24.765469 140273151465216 subprocess_server.py:126]      at 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$StoreArtifact.call(ArtifactStagingService.java:247)
   I1127 22:20:24.765530 140273151465216 subprocess_server.py:126]      at 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
   ```
    
   As a side note, another bug is that ExternalEnvironmentFactory [keeps 
waiting](https://github.com/apache/beam/blob/434427e90b55027c5944fa73de68bff4f9a4e8fe/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java#L125)
 for the boot.go to become healthy even after it fails to read the workflow 
artifact (in 
[here](https://github.com/apache/beam/blob/5d2dbf957e4e82fb3980726940df02ac67e563cd/sdks/python/container/boot.go#L369))
 and exits.
   
   Not sure if Flink can take embedded files as an input, that would be a good 
option for workflow.tar.gz.
   
   ### Issue Priority
   
   Priority: 1
   
   ### Issue Component
   
   Component: runner-flink


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to