Patricio Navarro created BEAM-13202:
---------------------------------------

             Summary: Add Coder to CountIfFn.Accum
                 Key: BEAM-13202
                 URL: https://issues.apache.org/jira/browse/BEAM-13202
             Project: Beam
          Issue Type: Improvement
          Components: dsl-sql, runner-dataflow
    Affects Versions: 2.33.0
         Environment: GCP Dataflow
            Reporter: Patricio Navarro


When using 
{code:java}
org.apache.beam.sdk.extensions.sql.impl.transform.agg.CountIf 
{code}
with dataflow-runner I got the following issue:
{noformat}
[WARNING] 
java.lang.RuntimeException: java.io.IOException: Could not obtain a Coder for 
the accumulator
    at 
org.apache.beam.runners.core.construction.PipelineTranslation$1.leaveCompositeTransform
 (PipelineTranslation.java:78)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
(TransformHierarchy.java:584)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
(TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
(TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
(TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
(TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
(TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 
(TransformHierarchy.java:239)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit 
(TransformHierarchy.java:213)
    at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
    at org.apache.beam.runners.core.construction.PipelineTranslation.toProto 
(PipelineTranslation.java:60)
    at org.apache.beam.runners.dataflow.DataflowRunner.run 
(DataflowRunner.java:996)
    at org.apache.beam.runners.dataflow.DataflowRunner.run 
(DataflowRunner.java:203)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
    at 
com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline 
(PubsubToStorage.java:115)
    at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main 
(PubsubToStorage.java:122)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke 
(NativeMethodAccessorImpl.java:62)
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke 
(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:566)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:829)
Caused by: java.io.IOException: Could not obtain a Coder for the accumulator
    at 
org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.extractAccumulatorCoder
 (CombineTranslation.java:207)
    at 
org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.translate
 (CombineTranslation.java:179)
    at 
org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate
 (PTransformTranslation.java:438)
    at org.apache.beam.runners.core.construction.PTransformTranslation.toProto 
(PTransformTranslation.java:248)
    at 
org.apache.beam.runners.core.construction.SdkComponents.registerPTransform 
(SdkComponents.java:175)
    at 
org.apache.beam.runners.core.construction.PipelineTranslation$1.leaveCompositeTransform
 (PipelineTranslation.java:75)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
(TransformHierarchy.java:584)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
(TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
(TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
(TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
(TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
(TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 
(TransformHierarchy.java:239)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit 
(TransformHierarchy.java:213)
    at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
    at org.apache.beam.runners.core.construction.PipelineTranslation.toProto 
(PipelineTranslation.java:60)
    at org.apache.beam.runners.dataflow.DataflowRunner.run 
(DataflowRunner.java:996)
    at org.apache.beam.runners.dataflow.DataflowRunner.run 
(DataflowRunner.java:203)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
    at 
com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline 
(PubsubToStorage.java:115)
    at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main 
(PubsubToStorage.java:122)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke 
(NativeMethodAccessorImpl.java:62)
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke 
(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:566)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:829)
Caused by: org.apache.beam.sdk.coders.CannotProvideCoderException: Cannot infer 
coder for type parameter AccumT
    at org.apache.beam.sdk.coders.CoderRegistry.getCoder 
(CoderRegistry.java:328)
    at 
org.apache.beam.sdk.transforms.CombineFnBase$AbstractGlobalCombineFn.getAccumulatorCoder
 (CombineFnBase.java:119)
    at org.apache.beam.sdk.transforms.Combine$CombineFn.getAccumulatorCoder 
(Combine.java:391)
    at 
org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter$WrappedCombinerBase.getAccumulatorCoder
 (AggregationCombineFnAdapter.java:75)
    at 
org.apache.beam.sdk.transforms.CombineFns$ComposedCombineFn.getAccumulatorCoder 
(CombineFns.java:430)
    at 
org.apache.beam.sdk.schemas.transforms.SchemaAggregateFn$Inner.getAccumulatorCoder
 (SchemaAggregateFn.java:335)
    at 
org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.extractAccumulatorCoder
 (CombineTranslation.java:204)
    at 
org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.translate
 (CombineTranslation.java:179)
    at 
org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate
 (PTransformTranslation.java:438)
    at org.apache.beam.runners.core.construction.PTransformTranslation.toProto 
(PTransformTranslation.java:248)
    at 
org.apache.beam.runners.core.construction.SdkComponents.registerPTransform 
(SdkComponents.java:175)
    at 
org.apache.beam.runners.core.construction.PipelineTranslation$1.leaveCompositeTransform
 (PipelineTranslation.java:75)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
(TransformHierarchy.java:584)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
(TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
(TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
(TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
(TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
(TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 
(TransformHierarchy.java:239)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit 
(TransformHierarchy.java:213)
    at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
    at org.apache.beam.runners.core.construction.PipelineTranslation.toProto 
(PipelineTranslation.java:60)
    at org.apache.beam.runners.dataflow.DataflowRunner.run 
(DataflowRunner.java:996)
    at org.apache.beam.runners.dataflow.DataflowRunner.run 
(DataflowRunner.java:203)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
    at 
com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline 
(PubsubToStorage.java:115)
    at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main 
(PubsubToStorage.java:122)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke 
(NativeMethodAccessorImpl.java:62)
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke 
(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:566)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:829)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  57.006 s
[INFO] Finished at: 2021-10-25T13:54:23Z
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java 
(default-cli) on project connected-stories-analytics-logger-store: An exception 
occured while executing the Java class. java.io.IOException: Could not obtain a 
Coder for the accumulator: Cannot infer coder for type parameter AccumT -> 
[Help 1]
{noformat}

So I found out that 
{code:java}
CountIfFn.Accum
{code}
 on Beam seems not to have an associated Coder.

FYI - [~iemejia]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to