Looking forward to your writeup, Max. In the meantime, some comments below.


From: Lukasz Cwik <lc...@google.com>
Date: Thu, May 2, 2019 at 6:45 PM
To: dev

>
>
> On Thu, May 2, 2019 at 7:20 AM Robert Bradshaw <rober...@google.com> wrote:
>>
>> On Sat, Apr 27, 2019 at 1:14 AM Lukasz Cwik <lc...@google.com> wrote:
>> >
>> > We should stick with URN + payload + artifact metadata[1] where the only 
>> > mandatory one that all SDKs and expansion services understand is the 
>> > "bytes" artifact type. This allows us to add optional URNs for file://, 
>> > http://, Maven, PyPi, ... in the future. I would make the artifact staging 
>> > service use the same URN + payload mechanism to get compatibility of 
>> > artifacts across the different services and also have the artifact staging 
>> > service be able to be queried for the list of artifact types it supports.
>>
>> +1
>>
>> > Finally, we would need to have environments enumerate the artifact types 
>> > that they support.
>>
>> Meaning at runtime, or as another field statically set in the proto?
>
>
> I don't believe runners/SDKs should have to know what artifacts each 
> environment supports at runtime and instead have environments enumerate them 
> explicitly in the proto. I have been thinking about a more general 
> "capabilities" block on environments which allow them to enumerate URNs that 
> the environment understands. This would include artifact type URNs, 
> PTransform URNs, coder URNs, ... I haven't proposed anything specific down 
> this line yet because I was wondering how environment resources (CPU, min 
> memory, hardware like GPU, AWS/GCP/Azure/... machine types) should/could tie 
> into this.
>
>>
>> > Having everyone have the same "artifact" representation would be 
>> > beneficial since:
>> > a) Python environments could install dependencies from a requirements.txt 
>> > file (something that the Google Cloud Dataflow Python docker container 
>> > allows for today)
>> > b) It provides an extensible and versioned mechanism for SDKs, 
>> > environments, and artifact staging/retrieval services to support 
>> > additional artifact types
>> > c) Allow for expressing a canonical representation of an artifact like a 
>> > Maven package so a runner could merge environments that the runner deems 
>> > compatible.
>> >
>> > The flow I could see is:
>> > 1) (optional) query artifact staging service for supported artifact types
>> > 2) SDK request expansion service to expand transform passing in a list of 
>> > artifact types the SDK and artifact staging service support, the expansion 
>> > service returns a list of artifact types limited to those supported types 
>> > + any supported by the environment
>>
>> The crux of the issue seems to be how the expansion service returns
>> the artifacts themselves. Is this going with the approach that the
>> caller of the expansion service must host an artifact staging service?
>
>
> The caller would not need to host an artifact staging service (but would 
> become effectively a proxy service, see my comment below for more details) as 
> I would have expected this to be part of the expansion service response.
>
>>
>> There is also the question here is how the returned artifacts get
>> attached to the various environments, or whether they get implicitly
>> applied to all returned stages (which need not have a consistent
>> environment)?
>
>
> I would suggest returning additional information that says what artifact is 
> for which environment. Applying all artifacts to all environments is likely 
> to cause issues since some environments may not understand certain artifact 
> types or may get conflicting versions of artifacts. I would see this 
> happening since an expansion service that aggregates other expansion services 
> seems likely, for example:
>                              /-> ExpansionSerivce(Python)
> ExpansionService(Aggregator) --> ExpansionService(Java)
>                              \-> ExpansionSerivce(Go)

All of this goes back to the idea that I think the listing of
artifacts (or more general dependencies) should be a property of the
environment themselves.

>> > 3) SDK converts any artifact types that the artifact staging service or 
>> > environment doesn't understand, e.g. pulls down Maven dependencies and 
>> > converts them to "bytes" artifacts
>>
>> Here I think we're conflating two things. The "type" of an artifact is
>> both (1) how to fetch the bytes and (2) how to interpret them (e.g. is
>> this a jar file, or a pip tarball, or just some data needed by a DoFn,
>> or ...) Only (1) can be freely transmuted.
>
>
> Your right. Thinking about this some more, general artifact conversion is 
> unlikely to be practical because how to interpret an artifact is environment 
> dependent. For example, a requirements.txt used to install pip packages for a 
> Python docker container depends on the filesystem layout of that specific 
> docker container. One could simulate doing a pip install on the same 
> filesystem, see the diff and then of all the packages in requirements.txt but 
> this quickly becomes impractical.
>
>>
>> > 4) SDK sends artifacts to artifact staging service
>> > 5) Artifact staging service converts any artifacts to types that the 
>> > environment understands
>> > 6) Environment is started and gets artifacts from the artifact retrieval 
>> > service.
>> >
>> > On Wed, Apr 24, 2019 at 4:44 AM Robert Bradshaw <rober...@google.com> 
>> > wrote:
>> >>
>> >> On Wed, Apr 24, 2019 at 12:21 PM Maximilian Michels <m...@apache.org> 
>> >> wrote:
>> >> >
>> >> > Good idea to let the client expose an artifact staging service that the
>> >> > ExpansionService could use to stage artifacts. This solves two problems:
>> >> >
>> >> > (1) The Expansion Service not being able to access the Job Server
>> >> > artifact staging service
>> >> > (2) The client not having access to the dependencies returned by the
>> >> > Expansion Server
>> >> >
>> >> > The downside is that it adds an additional indirection. The alternative
>> >> > to let the client handle staging the artifacts returned by the Expansion
>> >> > Server is more transparent and easier to implement.
>> >>
>> >> The other downside is that it may not always be possible for the
>> >> expansion service to connect to the artifact staging service (e.g.
>> >> when constructing a pipeline locally against a remote expansion
>> >> service).
>> >
>> > Just to make sure, your saying the expansion service would return all the 
>> > artifacts (bytes, urls, ...) as part of the response since the expansion 
>> > service wouldn't be able to connect to the SDK that is running locally 
>> > either.
>>
>> Yes. Well, more I'm asking how the expansion service would return any
>> artifacts.
>>
>> What we have is
>>
>> Runner <--- SDK ---> Expansion service.
>>
>> Where the unidirectional arrow means "instantiates a connection with"
>> and the other direction (and missing arrows) may not be possible.
>
>
> I believe the ExpansionService Expand request should become a unidirectional 
> stream back to the caller so that artifacts could be sent back to the SDK 
> (effectively mirroring the artifact staging service API). So the expansion 
> response would stream back a bunch artifact data messages and also the 
> expansion response containing PTransform information.

+1.

>> >> > Ideally, the Expansion Service won't return any dependencies because the
>> >> > environment already contains the required dependencies. We could make it
>> >> > a requirement for the expansion to be performed inside an environment.
>> >> > Then we would already ensure during expansion time that the runtime
>> >> > dependencies are available.
>> >>
>> >> Yes, it's cleanest if the expansion service provides an environment
>> >> without all the dependencies provided. Interesting idea to make this a
>> >> property of the expansion service itself.
>> >
>> > I had thought this too but an opaque docker container that was built on 
>> > top of a base Beam docker container would be very difficult for a runner 
>> > to introspect and check to see if its compatible to allow for fusion 
>> > across PTransforms. I think artifacts need to be communicated in their 
>> > canonical representation.
>>
>> It's clean (from the specification point of view), but doesn't allow
>> for good introspection/fusion (aside from one being a base of another,
>> perhaps).
>>
>> >> > > In this case, the runner would (as
>> >> > > requested by its configuration) be free to merge environments it
>> >> > > deemed compatible, including swapping out beam-java-X for
>> >> > > beam-java-embedded if it considers itself compatible with the
>> >> > > dependency list.
>> >> >
>> >> > Could you explain how that would work in practice?
>> >>
>> >> Say one has a pipeline with environments
>> >>
>> >> A: beam-java-sdk-2.12-docker
>> >> B: beam-java-sdk-2.12-docker + dep1
>> >> C: beam-java-sdk-2.12-docker + dep2
>> >> D: beam-java-sdk-2.12-docker + dep3
>> >>
>> >> A runner could (conceivably) be intelligent enough to know that dep1
>> >> and dep2 are indeed compatible, and run A, B, and C in a single
>> >> beam-java-sdk-2.12-docker + dep1 + dep2 environment (with the
>> >> corresponding fusion and lower overhead benefits). If a certain
>> >> pipeline option is set, it might further note that dep1 and dep2 are
>> >> compatible with its own workers, which are build against sdk-2.12, and
>> >> choose to run these in embedded + dep1 + dep2 environment.
>> >
>> > We have been talking about the expansion service and cross language 
>> > transforms a lot lately but I believe it will initially come at the cost 
>> > of poor fusion of transforms since "merging" environments that are 
>> > compatible is a difficult problem since it brings up many of the 
>> > dependency management issues (e.g. diamond dependency issues).
>>
>> I agree. I think expansion services offering "kitchen-sink"
>> containers, when possible, can go far here. If we could at least
>> recognize when one environment/set of deps is a superset of another,
>> that could be an easy case that would yield a lot of benefit as well.
>
>
> +1

Reply via email to