Patricio Navarro created BEAM-13202:
---------------------------------------
Summary: Add Coder to CountIfFn.Accum
Key: BEAM-13202
URL: https://issues.apache.org/jira/browse/BEAM-13202
Project: Beam
Issue Type: Improvement
Components: dsl-sql, runner-dataflow
Affects Versions: 2.33.0
Environment: GCP Dataflow
Reporter: Patricio Navarro
When using
{code:java}
org.apache.beam.sdk.extensions.sql.impl.transform.agg.CountIf
{code}
with dataflow-runner I got the following issue:
{noformat}
[WARNING]
java.lang.RuntimeException: java.io.IOException: Could not obtain a Coder for
the accumulator
at
org.apache.beam.runners.core.construction.PipelineTranslation$1.leaveCompositeTransform
(PipelineTranslation.java:78)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
(TransformHierarchy.java:584)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
(TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
(TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
(TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
(TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
(TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500
(TransformHierarchy.java:239)
at org.apache.beam.sdk.runners.TransformHierarchy.visit
(TransformHierarchy.java:213)
at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
at org.apache.beam.runners.core.construction.PipelineTranslation.toProto
(PipelineTranslation.java:60)
at org.apache.beam.runners.dataflow.DataflowRunner.run
(DataflowRunner.java:996)
at org.apache.beam.runners.dataflow.DataflowRunner.run
(DataflowRunner.java:203)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
at
com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline
(PubsubToStorage.java:115)
at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main
(PubsubToStorage.java:122)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke
(NativeMethodAccessorImpl.java:62)
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke
(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:566)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:829)
Caused by: java.io.IOException: Could not obtain a Coder for the accumulator
at
org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.extractAccumulatorCoder
(CombineTranslation.java:207)
at
org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.translate
(CombineTranslation.java:179)
at
org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate
(PTransformTranslation.java:438)
at org.apache.beam.runners.core.construction.PTransformTranslation.toProto
(PTransformTranslation.java:248)
at
org.apache.beam.runners.core.construction.SdkComponents.registerPTransform
(SdkComponents.java:175)
at
org.apache.beam.runners.core.construction.PipelineTranslation$1.leaveCompositeTransform
(PipelineTranslation.java:75)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
(TransformHierarchy.java:584)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
(TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
(TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
(TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
(TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
(TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500
(TransformHierarchy.java:239)
at org.apache.beam.sdk.runners.TransformHierarchy.visit
(TransformHierarchy.java:213)
at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
at org.apache.beam.runners.core.construction.PipelineTranslation.toProto
(PipelineTranslation.java:60)
at org.apache.beam.runners.dataflow.DataflowRunner.run
(DataflowRunner.java:996)
at org.apache.beam.runners.dataflow.DataflowRunner.run
(DataflowRunner.java:203)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
at
com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline
(PubsubToStorage.java:115)
at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main
(PubsubToStorage.java:122)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke
(NativeMethodAccessorImpl.java:62)
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke
(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:566)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:829)
Caused by: org.apache.beam.sdk.coders.CannotProvideCoderException: Cannot infer
coder for type parameter AccumT
at org.apache.beam.sdk.coders.CoderRegistry.getCoder
(CoderRegistry.java:328)
at
org.apache.beam.sdk.transforms.CombineFnBase$AbstractGlobalCombineFn.getAccumulatorCoder
(CombineFnBase.java:119)
at org.apache.beam.sdk.transforms.Combine$CombineFn.getAccumulatorCoder
(Combine.java:391)
at
org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter$WrappedCombinerBase.getAccumulatorCoder
(AggregationCombineFnAdapter.java:75)
at
org.apache.beam.sdk.transforms.CombineFns$ComposedCombineFn.getAccumulatorCoder
(CombineFns.java:430)
at
org.apache.beam.sdk.schemas.transforms.SchemaAggregateFn$Inner.getAccumulatorCoder
(SchemaAggregateFn.java:335)
at
org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.extractAccumulatorCoder
(CombineTranslation.java:204)
at
org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.translate
(CombineTranslation.java:179)
at
org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate
(PTransformTranslation.java:438)
at org.apache.beam.runners.core.construction.PTransformTranslation.toProto
(PTransformTranslation.java:248)
at
org.apache.beam.runners.core.construction.SdkComponents.registerPTransform
(SdkComponents.java:175)
at
org.apache.beam.runners.core.construction.PipelineTranslation$1.leaveCompositeTransform
(PipelineTranslation.java:75)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
(TransformHierarchy.java:584)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
(TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
(TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
(TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
(TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
(TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500
(TransformHierarchy.java:239)
at org.apache.beam.sdk.runners.TransformHierarchy.visit
(TransformHierarchy.java:213)
at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
at org.apache.beam.runners.core.construction.PipelineTranslation.toProto
(PipelineTranslation.java:60)
at org.apache.beam.runners.dataflow.DataflowRunner.run
(DataflowRunner.java:996)
at org.apache.beam.runners.dataflow.DataflowRunner.run
(DataflowRunner.java:203)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
at
com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline
(PubsubToStorage.java:115)
at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main
(PubsubToStorage.java:122)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke
(NativeMethodAccessorImpl.java:62)
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke
(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:566)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:829)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 57.006 s
[INFO] Finished at: 2021-10-25T13:54:23Z
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java
(default-cli) on project connected-stories-analytics-logger-store: An exception
occured while executing the Java class. java.io.IOException: Could not obtain a
Coder for the accumulator: Cannot infer coder for type parameter AccumT ->
[Help 1]
{noformat}
So I found out that
{code:java}
CountIfFn.Accum
{code}
on Beam seems not to have an associated Coder.
FYI - [~iemejia]
--
This message was sent by Atlassian Jira
(v8.20.1#820001)