Can you please add this to the design documents webpage. https://beam.apache.org/contribute/design-documents/
On Wed, May 8, 2019 at 7:29 PM Chamikara Jayalath <chamik...@google.com> wrote: > > > > On Tue, May 7, 2019 at 10:21 AM Maximilian Michels <m...@apache.org> wrote: >> >> Here's the first draft: >> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing >> >> It's rather high-level. We may want to add more details once we have >> finalized the design. Feel free to make comments and edits. > > > Thanks Max. Added some comments. > >> >> >> > All of this goes back to the idea that I think the listing of >> > artifacts (or more general dependencies) should be a property of the >> > environment themselves. >> >> +1 I came to the same conclusion while thinking about how to store >> artifact information for deferred execution of the pipeline. >> >> -Max >> >> On 07.05.19 18:10, Robert Bradshaw wrote: >> > Looking forward to your writeup, Max. In the meantime, some comments below. >> > >> > >> > From: Lukasz Cwik <lc...@google.com> >> > Date: Thu, May 2, 2019 at 6:45 PM >> > To: dev >> > >> >> >> >> >> >> On Thu, May 2, 2019 at 7:20 AM Robert Bradshaw <rober...@google.com> >> >> wrote: >> >>> >> >>> On Sat, Apr 27, 2019 at 1:14 AM Lukasz Cwik <lc...@google.com> wrote: >> >>>> >> >>>> We should stick with URN + payload + artifact metadata[1] where the >> >>>> only mandatory one that all SDKs and expansion services understand is >> >>>> the "bytes" artifact type. This allows us to add optional URNs for >> >>>> file://, http://, Maven, PyPi, ... in the future. I would make the >> >>>> artifact staging service use the same URN + payload mechanism to get >> >>>> compatibility of artifacts across the different services and also have >> >>>> the artifact staging service be able to be queried for the list of >> >>>> artifact types it supports. >> >>> >> >>> +1 >> >>> >> >>>> Finally, we would need to have environments enumerate the artifact >> >>>> types that they support. >> >>> >> >>> Meaning at runtime, or as another field statically set in the proto? >> >> >> >> >> >> I don't believe runners/SDKs should have to know what artifacts each >> >> environment supports at runtime and instead have environments enumerate >> >> them explicitly in the proto. I have been thinking about a more general >> >> "capabilities" block on environments which allow them to enumerate URNs >> >> that the environment understands. This would include artifact type URNs, >> >> PTransform URNs, coder URNs, ... I haven't proposed anything specific >> >> down this line yet because I was wondering how environment resources >> >> (CPU, min memory, hardware like GPU, AWS/GCP/Azure/... machine types) >> >> should/could tie into this. >> >> >> >>> >> >>>> Having everyone have the same "artifact" representation would be >> >>>> beneficial since: >> >>>> a) Python environments could install dependencies from a >> >>>> requirements.txt file (something that the Google Cloud Dataflow Python >> >>>> docker container allows for today) >> >>>> b) It provides an extensible and versioned mechanism for SDKs, >> >>>> environments, and artifact staging/retrieval services to support >> >>>> additional artifact types >> >>>> c) Allow for expressing a canonical representation of an artifact like >> >>>> a Maven package so a runner could merge environments that the runner >> >>>> deems compatible. >> >>>> >> >>>> The flow I could see is: >> >>>> 1) (optional) query artifact staging service for supported artifact >> >>>> types >> >>>> 2) SDK request expansion service to expand transform passing in a list >> >>>> of artifact types the SDK and artifact staging service support, the >> >>>> expansion service returns a list of artifact types limited to those >> >>>> supported types + any supported by the environment >> >>> >> >>> The crux of the issue seems to be how the expansion service returns >> >>> the artifacts themselves. Is this going with the approach that the >> >>> caller of the expansion service must host an artifact staging service? >> >> >> >> >> >> The caller would not need to host an artifact staging service (but would >> >> become effectively a proxy service, see my comment below for more >> >> details) as I would have expected this to be part of the expansion >> >> service response. >> >> >> >>> >> >>> There is also the question here is how the returned artifacts get >> >>> attached to the various environments, or whether they get implicitly >> >>> applied to all returned stages (which need not have a consistent >> >>> environment)? >> >> >> >> >> >> I would suggest returning additional information that says what artifact >> >> is for which environment. Applying all artifacts to all environments is >> >> likely to cause issues since some environments may not understand certain >> >> artifact types or may get conflicting versions of artifacts. I would see >> >> this happening since an expansion service that aggregates other expansion >> >> services seems likely, for example: >> >> /-> ExpansionSerivce(Python) >> >> ExpansionService(Aggregator) --> ExpansionService(Java) >> >> \-> ExpansionSerivce(Go) >> > >> > All of this goes back to the idea that I think the listing of >> > artifacts (or more general dependencies) should be a property of the >> > environment themselves. >> > >> >>>> 3) SDK converts any artifact types that the artifact staging service or >> >>>> environment doesn't understand, e.g. pulls down Maven dependencies and >> >>>> converts them to "bytes" artifacts >> >>> >> >>> Here I think we're conflating two things. The "type" of an artifact is >> >>> both (1) how to fetch the bytes and (2) how to interpret them (e.g. is >> >>> this a jar file, or a pip tarball, or just some data needed by a DoFn, >> >>> or ...) Only (1) can be freely transmuted. >> >> >> >> >> >> Your right. Thinking about this some more, general artifact conversion is >> >> unlikely to be practical because how to interpret an artifact is >> >> environment dependent. For example, a requirements.txt used to install >> >> pip packages for a Python docker container depends on the filesystem >> >> layout of that specific docker container. One could simulate doing a pip >> >> install on the same filesystem, see the diff and then of all the packages >> >> in requirements.txt but this quickly becomes impractical. >> >> >> >>> >> >>>> 4) SDK sends artifacts to artifact staging service >> >>>> 5) Artifact staging service converts any artifacts to types that the >> >>>> environment understands >> >>>> 6) Environment is started and gets artifacts from the artifact >> >>>> retrieval service. >> >>>> >> >>>> On Wed, Apr 24, 2019 at 4:44 AM Robert Bradshaw <rober...@google.com> >> >>>> wrote: >> >>>>> >> >>>>> On Wed, Apr 24, 2019 at 12:21 PM Maximilian Michels <m...@apache.org> >> >>>>> wrote: >> >>>>>> >> >>>>>> Good idea to let the client expose an artifact staging service that >> >>>>>> the >> >>>>>> ExpansionService could use to stage artifacts. This solves two >> >>>>>> problems: >> >>>>>> >> >>>>>> (1) The Expansion Service not being able to access the Job Server >> >>>>>> artifact staging service >> >>>>>> (2) The client not having access to the dependencies returned by the >> >>>>>> Expansion Server >> >>>>>> >> >>>>>> The downside is that it adds an additional indirection. The >> >>>>>> alternative >> >>>>>> to let the client handle staging the artifacts returned by the >> >>>>>> Expansion >> >>>>>> Server is more transparent and easier to implement. >> >>>>> >> >>>>> The other downside is that it may not always be possible for the >> >>>>> expansion service to connect to the artifact staging service (e.g. >> >>>>> when constructing a pipeline locally against a remote expansion >> >>>>> service). >> >>>> >> >>>> Just to make sure, your saying the expansion service would return all >> >>>> the artifacts (bytes, urls, ...) as part of the response since the >> >>>> expansion service wouldn't be able to connect to the SDK that is >> >>>> running locally either. >> >>> >> >>> Yes. Well, more I'm asking how the expansion service would return any >> >>> artifacts. >> >>> >> >>> What we have is >> >>> >> >>> Runner <--- SDK ---> Expansion service. >> >>> >> >>> Where the unidirectional arrow means "instantiates a connection with" >> >>> and the other direction (and missing arrows) may not be possible. >> >> >> >> >> >> I believe the ExpansionService Expand request should become a >> >> unidirectional stream back to the caller so that artifacts could be sent >> >> back to the SDK (effectively mirroring the artifact staging service API). >> >> So the expansion response would stream back a bunch artifact data >> >> messages and also the expansion response containing PTransform >> >> information. >> > >> > +1. >> > >> >>>>>> Ideally, the Expansion Service won't return any dependencies because >> >>>>>> the >> >>>>>> environment already contains the required dependencies. We could make >> >>>>>> it >> >>>>>> a requirement for the expansion to be performed inside an environment. >> >>>>>> Then we would already ensure during expansion time that the runtime >> >>>>>> dependencies are available. >> >>>>> >> >>>>> Yes, it's cleanest if the expansion service provides an environment >> >>>>> without all the dependencies provided. Interesting idea to make this a >> >>>>> property of the expansion service itself. >> >>>> >> >>>> I had thought this too but an opaque docker container that was built on >> >>>> top of a base Beam docker container would be very difficult for a >> >>>> runner to introspect and check to see if its compatible to allow for >> >>>> fusion across PTransforms. I think artifacts need to be communicated in >> >>>> their canonical representation. >> >>> >> >>> It's clean (from the specification point of view), but doesn't allow >> >>> for good introspection/fusion (aside from one being a base of another, >> >>> perhaps). >> >>> >> >>>>>>> In this case, the runner would (as >> >>>>>>> requested by its configuration) be free to merge environments it >> >>>>>>> deemed compatible, including swapping out beam-java-X for >> >>>>>>> beam-java-embedded if it considers itself compatible with the >> >>>>>>> dependency list. >> >>>>>> >> >>>>>> Could you explain how that would work in practice? >> >>>>> >> >>>>> Say one has a pipeline with environments >> >>>>> >> >>>>> A: beam-java-sdk-2.12-docker >> >>>>> B: beam-java-sdk-2.12-docker + dep1 >> >>>>> C: beam-java-sdk-2.12-docker + dep2 >> >>>>> D: beam-java-sdk-2.12-docker + dep3 >> >>>>> >> >>>>> A runner could (conceivably) be intelligent enough to know that dep1 >> >>>>> and dep2 are indeed compatible, and run A, B, and C in a single >> >>>>> beam-java-sdk-2.12-docker + dep1 + dep2 environment (with the >> >>>>> corresponding fusion and lower overhead benefits). If a certain >> >>>>> pipeline option is set, it might further note that dep1 and dep2 are >> >>>>> compatible with its own workers, which are build against sdk-2.12, and >> >>>>> choose to run these in embedded + dep1 + dep2 environment. >> >>>> >> >>>> We have been talking about the expansion service and cross language >> >>>> transforms a lot lately but I believe it will initially come at the >> >>>> cost of poor fusion of transforms since "merging" environments that are >> >>>> compatible is a difficult problem since it brings up many of the >> >>>> dependency management issues (e.g. diamond dependency issues). >> >>> >> >>> I agree. I think expansion services offering "kitchen-sink" >> >>> containers, when possible, can go far here. If we could at least >> >>> recognize when one environment/set of deps is a superset of another, >> >>> that could be an easy case that would yield a lot of benefit as well. >> >> >> >> >> >> +1