On Fri, Jan 11, 2019 at 2:14 PM Kenneth Knowles <[email protected]> wrote:

> That clarifies a lot.
>
> One take is that a ParDo doesn't output to an existing PCollection. A
> ParDo returns a PCollection, which it creates and can put special
> conditions on
>

Yes, that's why I said we are verifying the ParDo's place in the graph.


> . Our Java code might make this a pain but that's conceptually what is
> going on. So perhaps the restriction is that if a ParDo has an
> OutputReceiver<Row> it should output a PCollection where setCoder(...) will
> reject types that don't have a (compatible) schema.
>

This is an interesting idea, and I like it. Can you suggest how this would
be done? Today ParDo creates it's output as follows. If I wrap that will I
break anything, or should everything just work?

PCollection.createPrimitiveOutputInternal(
        pipeline, windowingStrategy, isBounded,
coders.get(outputTag)).setTypeDescriptor((TypeDescriptor)
outputTag.getTypeDescriptor())




>
> Kenn
>
> On Fri, Jan 11, 2019 at 2:00 PM Reuven Lax <[email protected]> wrote:
>
>> The verification is on the ParDo (or specifically the ParDo's place in
>> the graph), not on the PCollection.
>>
>> If you remember from the original schema discussion on this list, part of
>> the design was to allow Rows and user types to be used interchangeably when
>> schemas are around. For example, if you have a PCollection<UserType> pc,
>> it's legal to write;
>>
>> pc.apply(ParDo.of(new DoFn....
>>    @ProcessElement
>>     public void process(@Element Row row) {
>>     }
>> }
>>
>> Practically our goal was to make schemas an integral part of the type
>> system, and not force users to write ParDos to convert to PCollection<Row>
>> whenever they want to use schema-based transforms; this seamless usability
>> is one thing that distinguishes our work on schemas from that of some other
>> systems. It also enables us to do things efficiently, as otherwise the cost
>> of constant conversions back and forth to Row would start to dominate.
>>
>> Of course this is only legal if the input PCollection has a schema, so we
>> verify that (today in expand()). The problem I'm dealing with is that we
>> also allow the same for OutputReceiver (it would somewhat defeat the
>> purpose to do this only on input and not on output). So you can have
>> something like:
>>
>> @ProcessElement
>> public void process(...., OutputReceiver<Row> output) {
>> }
>>
>> The problem I face today is that this is only truly legal if the
>> PCollection that the ParDo is outputting to has a schema, but we don't know
>> that until coder inference is done. Right now this means that if a mistake
>> is made, it's not caught until the ParDo is actually running - an
>> unfortunate situation. I would like to be able to detect these
>> OutputReceiver parameters and verify that they are correct before the
>> pipeline begins to run.
>>
>> Reuven
>>
>> On Fri, Jan 11, 2019 at 1:33 PM Kenneth Knowles <[email protected]> wrote:
>>
>>> Can you elaborate a bit more? Maybe a specific code example? I'm a
>>> little bit concerned about this sort of global verification. If the
>>> PCollection gets passed around afterwards, new restrictions on what can be
>>> done with it are a pretty big deal.
>>>
>>> Kenn
>>>
>>> On Fri, Jan 11, 2019 at 12:58 PM Reuven Lax <[email protected]> wrote:
>>>
>>>> My problem is exactly outputs. I want to verify schemas for any
>>>> OutputReceiver parameters, and I don't think I can do this in expand.
>>>>
>>>> The best idea I have so far is to create a new PipelineVisitor to do
>>>> this, and run that after the normal apply is done.
>>>>
>>>> Reuven
>>>>
>>>> On Fri, Jan 11, 2019 at 12:39 PM Kenneth Knowles <[email protected]>
>>>> wrote:
>>>>
>>>>> I believe that today all coders must be fully defined for all
>>>>> arguments to expand(). For the outputs, the ParDo outputting should be
>>>>> agnostic, no? The constraints on setCoder(...) are hoped to be enough to
>>>>> make sure nothing breaks.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Fri, Jan 11, 2019 at 10:41 AM Reuven Lax <[email protected]> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I want to be able to write a verification phase that asserts that
>>>>>> input and output schemas for all ParDos match up properly. The only 
>>>>>> place I
>>>>>> can see to do that today is in expand(), however this does not work as
>>>>>> Coders may not be fully known when expand is called (remember Schemas are
>>>>>> implemented as a special type of Coder today). For example:
>>>>>>
>>>>>> p.apply(ParDo.of(MyDoFn))
>>>>>>   .SetCoder(FooCoder());
>>>>>>
>>>>>> FooCoder is not known yet when expand is called for the ParDo.
>>>>>>
>>>>>> Is there any place in Beam today where I could set up such a
>>>>>> verification pass?
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>

Reply via email to