We're working on running an "expansion service as a service" for xlang transforms. One of the things we'd really like is to serve the actual required artifacts to the client (submitting the pipeline) from our blobstore rather than streaming it through the artifact retrieval API (GetArtifact).
I have the expansion service returning the required artifact URLs when expanding the transforms, but after that I've run into some confusion on how it's supposed to work from there. Using the direct runner, the local artifact retrieval service [1] will correctly download the URL resource. However, trying to run in dataflow will cause the stager to break since it only supports FILE types [2]. I _think_ what should happen is that in artifact_service.maybe_store_artifact [3] we should download URL artifacts locally and replace them with a temp file resource (similar to how store_resource) works. I made these changes locally and was able to then submit jobs both via the direct runner and dataflow. Any thoughts here? [1] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/artifact_service.py#L76 [2] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py#L579 [3] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/artifact_service.py#L284
