Hello All,
I'm trying to test out a Combine.Globally transform which takes in a small
custom class (CustomA) and outputs a secondary custom class (CustomB). I
have set the coder for the resulting PCollection<CustomB>, but Beam is
arguing that a coder for a KV type is missing (see output at bottom).
Since this a global combine, the input nor the output is of KV type, so I
decided to take a look at the Combine code. Since Combine.Globally.expand()
performs a perKeys and groupedValues underneath the covers, but requires
making an intermediate PCollection KV<Void, OutputT> which--according to
the docs--is inferred from the CombineFn.
I believe I could workaround this by registering a KvCoder with the
CoderRegistry, but that's not intuitive. Is there a better way to address
this currently, or should something be added to the CombineFn area for
setting an output coder similar to PCollection.
Output:
Exception in thread "main" java.lang.IllegalStateException: Unable to
return a default Coder for
GlobalCombine/Combine.perKey(CustomTuple)/Combine.GroupedValues/ParDo(Anonymous).out
[Class]. Correct one of the following root causes:
No Coder has been manually specified; you may do so using .setCoder().
Inferring a Coder from the CoderRegistry failed: Unable to provide a
default Coder for org.apache.beam.sdk.values.KV<K, OutputT>. Correct one of
the following root causes:
Building a Coder using a registered CoderFactory failed: Cannot provide
coder for parameterized type org.apache.beam.sdk.values.KV<K, OutputT>:
Unable to provide a default Coder for java.lang.Object. Correct one of the
following root causes:
Stack:
at
org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:174)
at
org.apache.beam.sdk.values.TypedPValue.getCoder(TypedPValue.java:51)
at
org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:130)
at
org.apache.beam.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:90)
at
org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:95)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:386)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:302)
at
org.apache.beam.sdk.values.PCollection.apply(PCollection.java:154)
at
org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1460)
at
org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1337)
at
org.apache.beam.sdk.runners.PipelineRunner.apply(PipelineRunner.java:76)
at
org.apache.beam.runners.direct.DirectRunner.apply(DirectRunner.java:296)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:388)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:318)
at
org.apache.beam.sdk.values.PCollection.apply(PCollection.java:167)
at
org.iastate.edu.CombineTestPipeline.main(CombineTestPipeline.java:110)
Let me know. Thanks!
-Paul G
--
*Paul Gerver*
[email protected]