[ 
https://issues.apache.org/jira/browse/BEAM-10143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17142254#comment-17142254
 ] 

Brian Hulette commented on BEAM-10143:
--------------------------------------

In this case the fixed_windows windowing strategy is originating from Python 
and sent over to Java. It does seem to be referenced correctly in the Java 
expansion service, it's just that the IDs are mismatched in the final pipeline 
proto generated in Python.

I think I've found the underlying issue and it may be pretty serious (although 
its possible there's a trivial fix I'm just not noticing). It looks like 
PipelineContext determines ids for components lazily, with _unique_ref(): 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/pipeline_context.py#L89

So the id of an individual component is sensitive to the order in which 
components were added.

The PipelineContext that's built and sent to the expansion service has a 
different set of components (or possibly even just a different ordering of the 
same components in some cases?), than the set of components when building the 
final pipeline for submission, so same component can get a different ID in 
those two contexts. In fact I'd think it's likely this would occur unless the 
external transform is the first one added to a pipeline, in which case the 
component ids should be the same.

> ClassCastException in GROUP BY with non-global window
> -----------------------------------------------------
>
>                 Key: BEAM-10143
>                 URL: https://issues.apache.org/jira/browse/BEAM-10143
>             Project: Beam
>          Issue Type: Bug
>          Components: dsl-sql, sdk-py-core
>            Reporter: Maximilian Michels
>            Priority: P1
>
> I'm using the SqlTransform as an external transform from within a Python
> pipeline. I apply windowing before a GROUP BY query as mentioned as the first 
> option in 
> https://beam.apache.org/documentation/dsls/sql/extensions/windowing-and-triggering/:
> {code:python}
>   input
>       | "Window" >> beam.WindowInto(window.FixedWindows(30))
>       | "Aggregate" >>
>       SqlTransform("""Select field, count(field) from PCOLLECTION
>                       WHERE ...
>                     GROUP BY field
>                    """)
> {code}
> This results in an exception:
> {noformat}
> Caused by: java.lang.ClassCastException: 
> org.apache.beam.sdk.transforms.windowing.IntervalWindow cannot be cast to 
> org.apache.beam.sdk.transforms.windowing.GlobalWindow
>       at 
> org.apache.beam.sdk.transforms.windowing.GlobalWindow$Coder.encode(GlobalWindow.java:59)
>       at 
> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
>       at 
> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
>       at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:588)
>       at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:581)
>       at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:541)
>       at 
> org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:109)
>       at 
> org.apache.beam.fn.harness.BeamFnDataWriteRunner.consume(BeamFnDataWriteRunner.java:154)
>       at 
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>       at 
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
>       at 
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:178)
>       at 
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:158)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       ... 1 more
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to