Robert, you are correct that in principle the to/from functions are needed
on the operation, as that's where automatic conversion happens (in Java it
happens in DoFnRunner). However there are two blockers there:

1. As Brian mentioned, the issue in Java is that we never have
PCollection<Row> in this case. The source PCollection will simply be
PCollection<T>, where T has a schema. The to/from functions are now
required to interpret this PCollection. Currently we need to put it on the
PCollection itself to may Java's type system happy (an alternative is to
always create an intermediate PCollection<Row>, but that would be
computationally expensive). We might be able to find a way to model this in
Java with the to/from on the operation, however I suspect it would be
difficult and a lot of work.

2. I believe there are some cases where PTransforms access the to/from
functions in expand(), which is before we have an operation to attach the
those functions to. Again this is presumably solvable, but would require
design and more work.

3. Currently the user can call setSchema on any PCollection, and pass in
to/from functions there. We would have to rethink this API.

So I think leaving it in the coder is the pragmatic approach for now,
though it would be interesting to see if we could solve the above issues
and instead automatically propagate the functions to the operation.

I agree that we should not make these things opaque in the portable
representation, if only for ease of debugging. However they should not be
needed for cross-language calls.

Reuven

On Tue, Jun 18, 2019 at 5:09 AM Robert Bradshaw <rober...@google.com> wrote:

> Thanks for updating that alternative.
>
> As for the to/from functions, it does seem pragmatic to dangle them
> off the purely portable representation (either as a field there, or as
> an opaque logical type whose payload contains the to/from functions,
> or a separate coder that wraps the schema coder (though I can't see
> how the latter would work well if nesting is allowed)) until we figure
> out a good way to attach them to the operations themselves.
>
> On Tue, Jun 18, 2019 at 2:37 AM Brian Hulette <bhule...@google.com> wrote:
> >
> > Realized I completely ignored one of your points, added another response
> inline.
> >
> > On Fri, Jun 14, 2019 at 2:20 AM Robert Bradshaw <rober...@google.com>
> wrote:
> >>
> >> On Thu, Jun 13, 2019 at 8:42 PM Reuven Lax <re...@google.com> wrote:
> >> >
> >> > Spoke to Brian about his proposal. It is essentially this:
> >> >
> >> > We create PortableSchemaCoder, with a well-known URN. This coder is
> parameterized by the schema (i.e. list of field name -> field type pairs).
> >>
> >> Given that we have a field type that is (list of field names -> field
> >> type pairs), is there a reason to do this enumeration at the top level
> >> as well? This would likely also eliminate some of the strangeness
> >> where we want to treat a PCollection with a single-field row as a
> >> PCollection with just that value instead.
> >
> >
> > This is part of what I was suggesting in my "Root schema is a logical
> type" alternative [1], except that the language about SDK-specific logical
> types is now obsolete. I'll update it to better reflect this alternative.
> > I do think at the very least we should just have one (list of field
> names -> field type pairs) that is re-used, which is what I did in my PR
> [2].
> >
> > [1]
> https://docs.google.com/document/d/1uu9pJktzT_O3DxGd1-Q2op4nRk4HekIZbzi-0oTAips/edit#heading=h.7570feur1qin
> > [2]
> https://github.com/apache/beam/pull/8853/files#diff-f0d64c2cfc4583bfe2a7e5ee59818ae2L686
> >
> >>
> >>
> >> > Java also continues to have its own CustomSchemaCoder. This is
> parameterized by the schema as well as the to/from functions needed to make
> the Java API "nice."
> >> >
> >> > When the expansion service expands a Java PTransform for usage across
> languages, it will add a transform mapping the  PCollection with
> CustomSchemaCoder to a PCollection which has PortableSchemaCoder. This way
> Java can maintain the information needed to maintain its API (and Python
> can do the same), but there's no need to shove this information into the
> well-known portable representation.
> >> >
> >> > Brian, can you confirm that this was your proposal? If so, I like it.
> >>
> >> The major downside of this that I see is that it assumes that
> >> transparency is only needed at certain "boundaries" and everything
> >> between these boundaries is opaque. I think we'd be better served by a
> >> format where schemas are transparently represented throughout. For
> >> example, the "boundaries" between runner and SDK are not known at
> >> pipeline construction time, and we want the runner <-> SDK
> >> communication to understand the schemas to be able to use more
> >> efficient transport mechanisms (e.g. batches of arrow records). It may
> >> also be common for a pipeline in language X to invoke two transforms
> >> in language Y in succession (e.g. two SQL statements) in which case
> >> introducing two extra transforms in the expansion service would be
> >> wasteful. I also think we want to allow the flexibility for runners to
> >> swap out transforms an optimizations regardless of construction-time
> >> boundaries (e.g. implementing a projection natively, rather than
> >> outsourcing to the SDK).
> >>
> >> Are the to/from conversion functions the only extra information needed
> >> to make the Java APIs nice? If so, can they be attached to the
> >> operations themselves (where it seems they're actually needed/used),
> >> rather than to the schema/coder of the PCollection? Alternatively, I'd
> >> prefer this be opaque metadata attached to a transparent schema rather
> >> than making the whole schema opaque.
> >>
> >> > We've gone back and forth discussing abstracts for over a month now.
> I suggest that the next step should be to create a PR, and move discussion
> to that PR. Having actual code can often make discussion much more concrete.
> >>
> >> +1 to a PR, though I feel like there are fundamental high-level issues
> >> that are still not decided. (I suppose we should be open to throwing
> >> whole PRs away in that case.) There are certainly pieces that we'll
> >> know that we need (like the ability to serialize a row consistently in
> >> all languages) we can get in immediately.
> >>
> >> > Reuven
> >> >
> >> > On Thu, Jun 13, 2019 at 6:28 AM Robert Bradshaw <rober...@google.com>
> wrote:
> >> >>
> >> >> On Thu, Jun 13, 2019 at 5:47 AM Reuven Lax <re...@google.com> wrote:
> >> >>>
> >> >>>
> >> >>> On Wed, Jun 12, 2019 at 8:29 PM Kenneth Knowles <k...@apache.org>
> wrote:
> >> >>>>
> >> >>>> Can we choose a first step? I feel there's consensus around:
> >> >>>>
> >> >>>>  - the basic idea of what a schema looks like, ignoring logical
> types or SDK-specific bits
> >> >>>>  - the version of logical type which is a standardized URN+payload
> plus a representation
> >> >>>>
> >> >>>> Perhaps we could commit this and see what it looks like to try to
> use it?
> >> >>
> >> >>
> >> >> +1
> >> >>
> >> >>>>
> >> >>>> It also seems like there might be consensus around the idea of
> each of:
> >> >>>>
> >> >>>>  - a coder that simply encodes rows; its payload is just a schema;
> it is minimalist, canonical
> >> >>>>
> >> >>>>  - a coder that encodes a non-row using the serialization format
> of a row; this has to be a coder (versus Convert transforms) so that
> to/from row conversions can be elided when primitives are fused (just like
> to/from bytes is elided)
> >> >>
> >> >>
> >> >> So, to make it concrete, in the Beam protos we would have an
> [Elementwise]SchemaCoder whose single parameterization would be FieldType,
> whose definition is in terms of URN + payload + components (+
> representation, for non-primitive types, some details TBD there). It could
> be deserialized into various different Coder instances (an SDK
> implementation detail) in an SDK depending on the type. One of the most
> important primitive field types is Row (aka Struct).
> >> >>
> >> >> We would define a byte encoding for each primitive type. We *could*
> choose to simply require that the encoding of any non-row primitive is the
> same as its encoding in a single-member row, but that's not necessary.
> >> >>
> >> >> In the short term, the window/timestamp/pane info would still live
> outside via an enclosing WindowCoder, as it does now, not blocking on a
> desirable but still-to-be-figured-out unification at that level.
> >> >>
> >> >> This seems like a good path forward.
> >> >>
> >> >>> Actually this doesn't make sense to me. I think from the
> portability perspective, all we have is schemas - the rest is just a
> convenience for the SDK. As such, I don't think it makes sense at all to
> model this as a Coder.
> >> >>
> >> >>
> >> >> Coder and Schemas are mutually exclusive on PCollections, and
> completely specify type information, so I think it makes sense to reuse
> this (as we're currently doing) until we can get rid of coders altogether.
> >> >>
> >> >> (At execution time, we would generalize the notion of a coder to
> indicate how *batches* of elements are encoded, not just how individual
> elements are encoded. Here we have the option of letting the runner pick
> depending on the use (e.g. elementwise for key lookups vs. arrow for bulk
> data channel transfer vs ???, possibly with parameters like "preferred
> batch size") or standardizing on one physical byte representation for all
> communication over the boundary.)
> >> >>
> >> >>>
> >> >>>
> >> >>>>
> >> >>>>
> >> >>>> Can we also just have both of these, with different URNs?
> >> >>>>
> >> >>>> Kenn
> >> >>>>
> >> >>>> On Wed, Jun 12, 2019 at 3:57 PM Reuven Lax <re...@google.com>
> wrote:
> >> >>>>>
> >> >>>>>
> >> >>>>>
> >> >>>>> On Wed, Jun 12, 2019 at 3:46 PM Robert Bradshaw <
> rober...@google.com> wrote:
> >> >>>>>>
> >> >>>>>> On Tue, Jun 11, 2019 at 8:04 PM Kenneth Knowles <k...@apache.org>
> wrote:
> >> >>>>>>>
> >> >>>>>>>
> >> >>>>>>> I believe the schema registry is a transient construction-time
> concept. I don't think there's any need for a concept of a registry in the
> portable representation.
> >> >>>>>>>
> >> >>>>>>>> I'd rather urn:beam:schema:logicaltype:javasdk not be used
> whenever one has (say) a Java POJO as that would prevent other SDKs from
> "understanding" it as above (unless we had a way of declaring it as "just
> an alias/wrapper").
> >> >>>>>>>
> >> >>>>>>>
> >> >>>>>>> I didn't understand the example I snipped, but I think I
> understand your concern here. Is this what you want? (a) something
> presented as a POJO in Java (b) encoded to a row, but still decoded to the
> POJO and (c) non-Java SDK knows that it is "just a struct" so it is safe to
> mess about with or even create new ones. If this is what you want it seems
> potentially useful, but also easy to live without. This can also be done
> entirely within the Java SDK via conversions, leaving no logical type in
> the portable pipeline.
> >> >>>>>>
> >> >>>>>>
> >> >>>>>> I'm imaging a world where someone defines a PTransform that
> takes a POJO for a constructor, and consumes and produces a POJO, and is
> now usable from Go with no additional work on the PTransform author's
> part.  But maybe I'm thinking about this wrong and the POJO <-> Row
> conversion is part of the @ProcesssElement magic, not encoded in the schema
> itself.
> >> >>>>>
> >> >>>>>
> >> >>>>> The user's output would have to be explicitly schema. They would
> somehow have to tell Beam the infer a schema from the output POJO (e.g. one
> way to do this is to annotate the POJO with the @DefaultSchema
> annotation).  We don't currently magically turn a POJO into a schema unless
> we are asked to do so.
>

Reply via email to