[ 
https://issues.apache.org/jira/browse/BEAM-10143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140777#comment-17140777
 ] 

Brian Hulette commented on BEAM-10143:
--------------------------------------

I checked and the expansion request does have ref_Windowing_Windowing_1 as a 
fixed window so I'm pretty sure the Java side is doing everything as it should. 
The issue seems to be due to python switching up the IDs on the windowing 
strategies and not adjusting the Java transforms.

It's also worth noting that ref_Windowing_Windowing_1 is the only  strategy in 
the expansion request and response, there are no global window strategies. This 
makes sense since the Java transfroms are only ever operating on fixed windows, 
but I'm wondering if this is actually the cause of the error. If python is 
generating the ids for the windowing strategies on the fly, then it could 
generate two different ids for the same strategy: one for the expansion 
request's scope, and one for the final pipeline's scope.

> ClassCastException in GROUP BY with non-global window
> -----------------------------------------------------
>
>                 Key: BEAM-10143
>                 URL: https://issues.apache.org/jira/browse/BEAM-10143
>             Project: Beam
>          Issue Type: Bug
>          Components: dsl-sql, sdk-py-core
>            Reporter: Maximilian Michels
>            Priority: P1
>
> I'm using the SqlTransform as an external transform from within a Python
> pipeline. I apply windowing before a GROUP BY query as mentioned as the first 
> option in 
> https://beam.apache.org/documentation/dsls/sql/extensions/windowing-and-triggering/:
> {code:python}
>   input
>       | "Window" >> beam.WindowInto(window.FixedWindows(30))
>       | "Aggregate" >>
>       SqlTransform("""Select field, count(field) from PCOLLECTION
>                       WHERE ...
>                     GROUP BY field
>                    """)
> {code}
> This results in an exception:
> {noformat}
> Caused by: java.lang.ClassCastException: 
> org.apache.beam.sdk.transforms.windowing.IntervalWindow cannot be cast to 
> org.apache.beam.sdk.transforms.windowing.GlobalWindow
>       at 
> org.apache.beam.sdk.transforms.windowing.GlobalWindow$Coder.encode(GlobalWindow.java:59)
>       at 
> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
>       at 
> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
>       at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:588)
>       at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:581)
>       at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:541)
>       at 
> org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:109)
>       at 
> org.apache.beam.fn.harness.BeamFnDataWriteRunner.consume(BeamFnDataWriteRunner.java:154)
>       at 
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>       at 
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
>       at 
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:178)
>       at 
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:158)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       ... 1 more
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to