[
https://issues.apache.org/jira/browse/BEAM-1820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078407#comment-16078407
]
Łukasz Gajowy edited comment on BEAM-1820 at 7/7/17 5:22 PM:
-------------------------------------------------------------
Below there's a list of PTransforms that assume Source.getDefaultOutputCoder()
is not nullable:
- UnboundedReadFromBoundedSource
- StreamingBoundedRead
- StreamingUnboundedRead
- BoundedReadFromUnboundedSource
- Read.Bounded
- Read.Unbounded
- StreamingUnboundedRead/ReadWithIds (DataflowRunner)
Coder in first four PTransforms can be obtained during the expand() method call
from a Read.from() performed there. When I modify it, this way, all the test
pass. Below there's an example on how i do this based on one of classes to
modify:
{code:title= UnboundedReadFromBoundedSource.java|borderStyle=solid}
...
private Coder<T> outputCoder;
@Override
public PCollection<T> expand(PBegin input) {
PCollection<T> collection = input.getPipeline().apply(
Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
outputCoder = collection.getCoder();
return collection;
}
@Override
protected Coder<T> getDefaultOutputCoder() {
return outputCoder;
}
...
{code}
The other three PTransforms fail the tests when i try to make a similar change.
This is due to the fact that the coder is not being set in the PCollection.
Therefore I have the following questions:
# Is the way of obtaining the coder correct in the first four cases (this is
how i understood the task)? Is the fact that the outputCoder variable remains
null until the expand() method executes ok?
# What about PTransforms in which the Read.from() in the expand() method does
not set the Coder on PCollection?
# Also, there are places in Source classes (e.g. MicrobatchSource) that assume
not null default output coder, but I think that correcting them is not a part
of the task because it's the transform code that should worry about the Coders
regarding to the issue description. Do I understand correctly?
was (Author: łukaszg):
Below there's a list of PTransforms that assume Source.getDefaultOutputCoder()
is not nullable:
- UnboundedReadFromBoundedSource
- StreamingBoundedRead
- StreamingUnboundedRead
- BoundedReadFromUnboundedSource
- Read.Bounded
- Read.Unbounded
- StreamingUnboundedRead/ReadWithIds (DataflowRunner)
Coder in first four PTransforms can be obtained during the expand() method call
from a Read.from() performed there. When I modify it, this way, all the test
pass. Below there's an example on how i do this (based on
UnboundedReadFromBoundedSource)
{code:title= UnboundedReadFromBoundedSource.java|borderStyle=solid}
...
private Coder<T> outputCoder;
@Override
public PCollection<T> expand(PBegin input) {
PCollection<T> collection = input.getPipeline().apply(
Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
outputCoder = collection.getCoder();
return collection;
}
@Override
protected Coder<T> getDefaultOutputCoder() {
return outputCoder;
}
...
{code}
The other three PTransforms fail the tests when i try to make a similar change.
This is due to the fact that the coder is not being set in the PCollection.
Therefore I have the following questions:
# Is the way of obtaining the coder correct in the first four cases (this is
how i understood the task)? Is the fact that the outputCoder variable remains
null until the expand() method executes ok?
# What about PTransforms in which the Read.from() in the expand() method does
not set the Coder on PCollection?
# Also, there are places in Source classes (e.g. MicrobatchSource) that assume
not null default output coder, but I think that correcting them is not a part
of the task because it's the transform code that should worry about the Coders
regarding to the issue description. Do I understand correctly?
> Source.getDefaultOutputCoder() should be @Nullable
> --------------------------------------------------
>
> Key: BEAM-1820
> URL: https://issues.apache.org/jira/browse/BEAM-1820
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core
> Reporter: Eugene Kirpichov
> Assignee: Łukasz Gajowy
> Labels: easyfix, starter
>
> Source.getDefaultOutputCoder() returns a coder for elements produced by the
> source.
> However, the Source objects are nearly always hidden from the user and
> instead encapsulated in a transform. Often, an enclosing transform has a
> better idea of what coder should be used to encode these elements (e.g. a
> user supplied a Coder to that transform's configuration). In that case, it'd
> be good if Source.getDefaultOutputCoder() could just return null, and coder
> would have to be handled by the enclosing transform or perhaps specified on
> the output of that transform explicitly.
> Right now there's a bunch of code in the SDK and runners that assumes
> Source.getDefaultOutputCoder() returns non-null. That code would need to be
> fixed to instead use the coder set on the collection produced by
> Read.from(source).
> It all appears pretty easy to fix, so this is a good starter item.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)