Looking forward to your writeup, Max. In the meantime, some comments below.
From: Lukasz Cwik <lc...@google.com> Date: Thu, May 2, 2019 at 6:45 PM To: dev > > > On Thu, May 2, 2019 at 7:20 AM Robert Bradshaw <rober...@google.com> wrote: >> >> On Sat, Apr 27, 2019 at 1:14 AM Lukasz Cwik <lc...@google.com> wrote: >> > >> > We should stick with URN + payload + artifact metadata[1] where the only >> > mandatory one that all SDKs and expansion services understand is the >> > "bytes" artifact type. This allows us to add optional URNs for file://, >> > http://, Maven, PyPi, ... in the future. I would make the artifact staging >> > service use the same URN + payload mechanism to get compatibility of >> > artifacts across the different services and also have the artifact staging >> > service be able to be queried for the list of artifact types it supports. >> >> +1 >> >> > Finally, we would need to have environments enumerate the artifact types >> > that they support. >> >> Meaning at runtime, or as another field statically set in the proto? > > > I don't believe runners/SDKs should have to know what artifacts each > environment supports at runtime and instead have environments enumerate them > explicitly in the proto. I have been thinking about a more general > "capabilities" block on environments which allow them to enumerate URNs that > the environment understands. This would include artifact type URNs, > PTransform URNs, coder URNs, ... I haven't proposed anything specific down > this line yet because I was wondering how environment resources (CPU, min > memory, hardware like GPU, AWS/GCP/Azure/... machine types) should/could tie > into this. > >> >> > Having everyone have the same "artifact" representation would be >> > beneficial since: >> > a) Python environments could install dependencies from a requirements.txt >> > file (something that the Google Cloud Dataflow Python docker container >> > allows for today) >> > b) It provides an extensible and versioned mechanism for SDKs, >> > environments, and artifact staging/retrieval services to support >> > additional artifact types >> > c) Allow for expressing a canonical representation of an artifact like a >> > Maven package so a runner could merge environments that the runner deems >> > compatible. >> > >> > The flow I could see is: >> > 1) (optional) query artifact staging service for supported artifact types >> > 2) SDK request expansion service to expand transform passing in a list of >> > artifact types the SDK and artifact staging service support, the expansion >> > service returns a list of artifact types limited to those supported types >> > + any supported by the environment >> >> The crux of the issue seems to be how the expansion service returns >> the artifacts themselves. Is this going with the approach that the >> caller of the expansion service must host an artifact staging service? > > > The caller would not need to host an artifact staging service (but would > become effectively a proxy service, see my comment below for more details) as > I would have expected this to be part of the expansion service response. > >> >> There is also the question here is how the returned artifacts get >> attached to the various environments, or whether they get implicitly >> applied to all returned stages (which need not have a consistent >> environment)? > > > I would suggest returning additional information that says what artifact is > for which environment. Applying all artifacts to all environments is likely > to cause issues since some environments may not understand certain artifact > types or may get conflicting versions of artifacts. I would see this > happening since an expansion service that aggregates other expansion services > seems likely, for example: > /-> ExpansionSerivce(Python) > ExpansionService(Aggregator) --> ExpansionService(Java) > \-> ExpansionSerivce(Go) All of this goes back to the idea that I think the listing of artifacts (or more general dependencies) should be a property of the environment themselves. >> > 3) SDK converts any artifact types that the artifact staging service or >> > environment doesn't understand, e.g. pulls down Maven dependencies and >> > converts them to "bytes" artifacts >> >> Here I think we're conflating two things. The "type" of an artifact is >> both (1) how to fetch the bytes and (2) how to interpret them (e.g. is >> this a jar file, or a pip tarball, or just some data needed by a DoFn, >> or ...) Only (1) can be freely transmuted. > > > Your right. Thinking about this some more, general artifact conversion is > unlikely to be practical because how to interpret an artifact is environment > dependent. For example, a requirements.txt used to install pip packages for a > Python docker container depends on the filesystem layout of that specific > docker container. One could simulate doing a pip install on the same > filesystem, see the diff and then of all the packages in requirements.txt but > this quickly becomes impractical. > >> >> > 4) SDK sends artifacts to artifact staging service >> > 5) Artifact staging service converts any artifacts to types that the >> > environment understands >> > 6) Environment is started and gets artifacts from the artifact retrieval >> > service. >> > >> > On Wed, Apr 24, 2019 at 4:44 AM Robert Bradshaw <rober...@google.com> >> > wrote: >> >> >> >> On Wed, Apr 24, 2019 at 12:21 PM Maximilian Michels <m...@apache.org> >> >> wrote: >> >> > >> >> > Good idea to let the client expose an artifact staging service that the >> >> > ExpansionService could use to stage artifacts. This solves two problems: >> >> > >> >> > (1) The Expansion Service not being able to access the Job Server >> >> > artifact staging service >> >> > (2) The client not having access to the dependencies returned by the >> >> > Expansion Server >> >> > >> >> > The downside is that it adds an additional indirection. The alternative >> >> > to let the client handle staging the artifacts returned by the Expansion >> >> > Server is more transparent and easier to implement. >> >> >> >> The other downside is that it may not always be possible for the >> >> expansion service to connect to the artifact staging service (e.g. >> >> when constructing a pipeline locally against a remote expansion >> >> service). >> > >> > Just to make sure, your saying the expansion service would return all the >> > artifacts (bytes, urls, ...) as part of the response since the expansion >> > service wouldn't be able to connect to the SDK that is running locally >> > either. >> >> Yes. Well, more I'm asking how the expansion service would return any >> artifacts. >> >> What we have is >> >> Runner <--- SDK ---> Expansion service. >> >> Where the unidirectional arrow means "instantiates a connection with" >> and the other direction (and missing arrows) may not be possible. > > > I believe the ExpansionService Expand request should become a unidirectional > stream back to the caller so that artifacts could be sent back to the SDK > (effectively mirroring the artifact staging service API). So the expansion > response would stream back a bunch artifact data messages and also the > expansion response containing PTransform information. +1. >> >> > Ideally, the Expansion Service won't return any dependencies because the >> >> > environment already contains the required dependencies. We could make it >> >> > a requirement for the expansion to be performed inside an environment. >> >> > Then we would already ensure during expansion time that the runtime >> >> > dependencies are available. >> >> >> >> Yes, it's cleanest if the expansion service provides an environment >> >> without all the dependencies provided. Interesting idea to make this a >> >> property of the expansion service itself. >> > >> > I had thought this too but an opaque docker container that was built on >> > top of a base Beam docker container would be very difficult for a runner >> > to introspect and check to see if its compatible to allow for fusion >> > across PTransforms. I think artifacts need to be communicated in their >> > canonical representation. >> >> It's clean (from the specification point of view), but doesn't allow >> for good introspection/fusion (aside from one being a base of another, >> perhaps). >> >> >> > > In this case, the runner would (as >> >> > > requested by its configuration) be free to merge environments it >> >> > > deemed compatible, including swapping out beam-java-X for >> >> > > beam-java-embedded if it considers itself compatible with the >> >> > > dependency list. >> >> > >> >> > Could you explain how that would work in practice? >> >> >> >> Say one has a pipeline with environments >> >> >> >> A: beam-java-sdk-2.12-docker >> >> B: beam-java-sdk-2.12-docker + dep1 >> >> C: beam-java-sdk-2.12-docker + dep2 >> >> D: beam-java-sdk-2.12-docker + dep3 >> >> >> >> A runner could (conceivably) be intelligent enough to know that dep1 >> >> and dep2 are indeed compatible, and run A, B, and C in a single >> >> beam-java-sdk-2.12-docker + dep1 + dep2 environment (with the >> >> corresponding fusion and lower overhead benefits). If a certain >> >> pipeline option is set, it might further note that dep1 and dep2 are >> >> compatible with its own workers, which are build against sdk-2.12, and >> >> choose to run these in embedded + dep1 + dep2 environment. >> > >> > We have been talking about the expansion service and cross language >> > transforms a lot lately but I believe it will initially come at the cost >> > of poor fusion of transforms since "merging" environments that are >> > compatible is a difficult problem since it brings up many of the >> > dependency management issues (e.g. diamond dependency issues). >> >> I agree. I think expansion services offering "kitchen-sink" >> containers, when possible, can go far here. If we could at least >> recognize when one environment/set of deps is a superset of another, >> that could be an easy case that would yield a lot of benefit as well. > > > +1