This looks like it might be because the output coder cannot be determined.
It looks like the registry understands that it must build a KvCoder, but
cannot infer the coder for OutputT. More specifically, within the stack
trace, the following line occurs:

"Unable to provide a default Coder for java.lang.Object. Correct one of the
following root causes:"

CombineFn provides a `getDefaultOutputCoder(CoderRegistry, Coder<InputT>)`
method which may be suitable here for producing the coder for your outputs.

(I can produce a very similar stack trace:
https://gist.github.com/tgroh/04d4b638e7fabf8a03187760ddb26eef)

On Fri, Apr 7, 2017 at 9:46 PM, Aviem Zur <aviem...@gmail.com> wrote:

> I wasn't able to reproduce the issue you're experiencing.
> I've created a gist with an example that works and is similar to what you
> have described.
> Please help us make tweaks to the gist reproduce your problem:
> https://gist.github.com/aviemzur/ba213d98b4484492099b3cf709ddded0
>
> On Fri, Apr 7, 2017 at 7:25 PM Paul Gerver <pfger...@gmail.com> wrote:
>
> > Yes, the pipeline is quite small:
> >
> >         pipeline.apply("source",
> >                 Read.from(new CustomSource())).setCoder(
> CustomSource.coder)
> >         .apply("GlobalCombine", Combine.globally(new
> > CustomCombineFn())).setCoder(CustomTuple.coder);
> >
> >
> > The InputT is not the same as OutputT, so the input coder can't be used.
> >
> > On 2017-04-07 08:58 (-0500), Aviem Zur <aviem...@gmail.com> wrote:
> > > Have you set the coder for your input PCollection? The one on which you
> > > perform the Combine?
> > >
> > > On Fri, Apr 7, 2017 at 4:24 PM Paul Gerver <pfger...@gmail.com> wrote:
> > >
> > > > Hello All,
> > > >
> > > > I'm trying to test out a Combine.Globally transform which takes in a
> > small
> > > > custom class (CustomA) and outputs a secondary custom class
> (CustomB).
> > I
> > > > have set the coder for the resulting PCollection<CustomB>, but Beam
> is
> > > > arguing that a coder for a KV type is missing (see output at bottom).
> > > >
> > > > Since this a global combine, the input nor the output is of KV type,
> > so I
> > > > decided to take a look at the Combine code. Since
> > Combine.Globally.expand()
> > > > performs a perKeys and groupedValues underneath the covers, but
> > requires
> > > > making an intermediate PCollection KV<Void, OutputT> which--according
> > to
> > > > the docs--is inferred from the CombineFn.
> > > >
> > > > I believe I could workaround this by registering a KvCoder with the
> > > > CoderRegistry, but that's not intuitive. Is there a better way to
> > address
> > > > this currently, or should something be added to the CombineFn area
> for
> > > > setting an output coder similar to PCollection.
> > > >
> > > >
> > > > Output:
> > > > Exception in thread "main" java.lang.IllegalStateException: Unable
> to
> > > > return a default Coder for
> > > >
> > > >
> > GlobalCombine/Combine.perKey(CustomTuple)/Combine.
> GroupedValues/ParDo(Anonymous).out
> > > > [Class]. Correct one of the following root causes:
> > > >   No Coder has been manually specified;  you may do so using
> > .setCoder().
> > > >   Inferring a Coder from the CoderRegistry failed: Unable to provide
> a
> > > > default Coder for org.apache.beam.sdk.values.KV<K, OutputT>. Correct
> > one of
> > > > the following root causes:
> > > >   Building a Coder using a registered CoderFactory failed: Cannot
> > provide
> > > > coder for parameterized type org.apache.beam.sdk.values.KV<K,
> OutputT>:
> > > > Unable to provide a default Coder for java.lang.Object. Correct one
> of
> > the
> > > > following root causes:
> > > >
> > > >
> > > > Stack:
> > > >         at
> > > >
> > > >
> > org.apache.beam.sdk.repackaged.com.google.common.
> base.Preconditions.checkState(Preconditions.java:174)
> > > >         at
> > > > org.apache.beam.sdk.values.TypedPValue.getCoder(TypedPValue.java:51)
> > > >         at
> > > > org.apache.beam.sdk.values.PCollection.getCoder(
> PCollection.java:130)
> > > >         at
> > > >
> > > >
> > org.apache.beam.sdk.values.TypedPValue.finishSpecifying(
> TypedPValue.java:90)
> > > >         at
> > > >
> > > >
> > org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(
> TransformHierarchy.java:95)
> > > >         at
> > org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:386)
> > > >         at
> > org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:302)
> > > >         at
> > > > org.apache.beam.sdk.values.PCollection.apply(PCollection.java:154)
> > > >         at
> > > >
> > org.apache.beam.sdk.transforms.Combine$Globally.
> expand(Combine.java:1460)
> > > >         at
> > > >
> > org.apache.beam.sdk.transforms.Combine$Globally.
> expand(Combine.java:1337)
> > > >         at
> > > >
> > org.apache.beam.sdk.runners.PipelineRunner.apply(PipelineRunner.java:76)
> > > >         at
> > > >
> > org.apache.beam.runners.direct.DirectRunner.apply(DirectRunner.java:296)
> > > >         at
> > org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:388)
> > > >         at
> > org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:318)
> > > >         at
> > > > org.apache.beam.sdk.values.PCollection.apply(PCollection.java:167)
> > > >         at
> > > > org.iastate.edu.CombineTestPipeline.main(
> CombineTestPipeline.java:110)
> > > >
> > > >
> > > > Let me know. Thanks!
> > > > -Paul G
> > > >
> > > > --
> > > > *Paul Gerver*
> > > > pfger...@gmail.com
> > > >
> > >
> >
>

Reply via email to