[ 
https://issues.apache.org/jira/browse/BEAM-1820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078407#comment-16078407
 ] 

Łukasz Gajowy edited comment on BEAM-1820 at 7/7/17 5:21 PM:
-------------------------------------------------------------

Below there's a list of PTransforms that assume Source.getDefaultOutputCoder() 
is not nullable:

- UnboundedReadFromBoundedSource
- StreamingBoundedRead
- StreamingUnboundedRead
- BoundedReadFromUnboundedSource
- Read.Bounded 
- Read.Unbounded
- StreamingUnboundedRead/ReadWithIds (DataflowRunner)

Coder in first four PTransforms can be obtained during the expand() method call 
from a Read.from() performed there. When I modify it, this way, all the test 
pass. Below there's an example on how i do this (based on 
UnboundedReadFromBoundedSource)

{code:title=Bar.java|borderStyle=solid}
private Coder<T> outputCoder;
  
  @Override
  public PCollection<T> expand(PBegin input) {
    PCollection<T> collection = input.getPipeline().apply(
        Read.from(new BoundedToUnboundedSourceAdapter<>(source)));

    outputCoder = collection.getCoder();

    return collection;
  }

  @Override
  protected Coder<T> getDefaultOutputCoder() {
    return outputCoder;
  }

{code}
The other three PTransforms fail the tests when i try to make a similar change. 
This is due to the fact that the coder is not being set in the PCollection. 

Therefore I have the following questions: 

# Is the way of obtaining the coder correct in the first four cases (this is 
how i understood the task)? Is the fact that the outputCoder variable remains 
null until the expand() method executes ok?
# What about PTransforms in which the Read.from() in the expand() method does 
not set the Coder on PCollection? 
# Also, there are places in Source classes (e.g. MicrobatchSource) that assume 
not null default output coder, but I think that correcting them is not a part 
of the task because it's the transform code that should worry about the Coders 
regarding to the issue description. Do I understand correctly?


was (Author: łukaszg):
Below there's a list of PTransforms that assume Source.getDefaultOutputCoder() 
is not nullable:

- UnboundedReadFromBoundedSource
- StreamingBoundedRead
- StreamingUnboundedRead
- BoundedReadFromUnboundedSource
- Read.Bounded 
- Read.Unbounded
- StreamingUnboundedRead/ReadWithIds (DataflowRunner)

Coder in first four PTransforms can be obtained during the expand() method call 
from a Read.from() performed there. When I modify it, this way, all the test 
pass. Below there's an example on how i do this (based on 
UnboundedReadFromBoundedSource)


private Coder<T> outputCoder;
  
  @Override
  public PCollection<T> expand(PBegin input) {
    PCollection<T> collection = input.getPipeline().apply(
        Read.from(new BoundedToUnboundedSourceAdapter<>(source)));

    outputCoder = collection.getCoder();

    return collection;
  }

  @Override
  protected Coder<T> getDefaultOutputCoder() {
    return outputCoder;
  }


The other three PTransforms fail the tests when i try to make a similar change. 
This is due to the fact that the coder is not being set in the PCollection. 

Therefore I have the following questions: 

# Is the way of obtaining the coder correct in the first four cases (this is 
how i understood the task)? Is the fact that the outputCoder variable remains 
null until the expand() method executes ok?
# What about PTransforms in which the Read.from() in the expand() method does 
not set the Coder on PCollection? 
# Also, there are places in Source classes (e.g. MicrobatchSource) that assume 
not null default output coder, but I think that correcting them is not a part 
of the task because it's the transform code that should worry about the Coders 
regarding to the issue description. Do I understand correctly?

> Source.getDefaultOutputCoder() should be @Nullable
> --------------------------------------------------
>
>                 Key: BEAM-1820
>                 URL: https://issues.apache.org/jira/browse/BEAM-1820
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>            Reporter: Eugene Kirpichov
>            Assignee: Łukasz Gajowy
>              Labels: easyfix, starter
>
> Source.getDefaultOutputCoder() returns a coder for elements produced by the 
> source.
> However, the Source objects are nearly always hidden from the user and 
> instead encapsulated in a transform. Often, an enclosing transform has a 
> better idea of what coder should be used to encode these elements (e.g. a 
> user supplied a Coder to that transform's configuration). In that case, it'd 
> be good if Source.getDefaultOutputCoder() could just return null, and coder 
> would have to be handled by the enclosing transform or perhaps specified on 
> the output of that transform explicitly.
> Right now there's a bunch of code in the SDK and runners that assumes 
> Source.getDefaultOutputCoder() returns non-null. That code would need to be 
> fixed to instead use the coder set on the collection produced by 
> Read.from(source).
> It all appears pretty easy to fix, so this is a good starter item.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to