[
https://issues.apache.org/jira/browse/BEAM-10143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17142254#comment-17142254
]
Brian Hulette commented on BEAM-10143:
--------------------------------------
In this case the fixed_windows windowing strategy is originating from Python
and sent over to Java. It does seem to be referenced correctly in the Java
expansion service, it's just that the IDs are mismatched in the final pipeline
proto generated in Python.
I think I've found the underlying issue and it may be pretty serious (although
its possible there's a trivial fix I'm just not noticing). It looks like
PipelineContext determines ids for components lazily, with _unique_ref():
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/pipeline_context.py#L89
So the id of an individual component is sensitive to the order in which
components were added.
The PipelineContext that's built and sent to the expansion service has a
different set of components (or possibly even just a different ordering of the
same components in some cases?), than the set of components when building the
final pipeline for submission, so same component can get a different ID in
those two contexts. In fact I'd think it's likely this would occur unless the
external transform is the first one added to a pipeline, in which case the
component ids should be the same.
> ClassCastException in GROUP BY with non-global window
> -----------------------------------------------------
>
> Key: BEAM-10143
> URL: https://issues.apache.org/jira/browse/BEAM-10143
> Project: Beam
> Issue Type: Bug
> Components: dsl-sql, sdk-py-core
> Reporter: Maximilian Michels
> Priority: P1
>
> I'm using the SqlTransform as an external transform from within a Python
> pipeline. I apply windowing before a GROUP BY query as mentioned as the first
> option in
> https://beam.apache.org/documentation/dsls/sql/extensions/windowing-and-triggering/:
> {code:python}
> input
> | "Window" >> beam.WindowInto(window.FixedWindows(30))
> | "Aggregate" >>
> SqlTransform("""Select field, count(field) from PCOLLECTION
> WHERE ...
> GROUP BY field
> """)
> {code}
> This results in an exception:
> {noformat}
> Caused by: java.lang.ClassCastException:
> org.apache.beam.sdk.transforms.windowing.IntervalWindow cannot be cast to
> org.apache.beam.sdk.transforms.windowing.GlobalWindow
> at
> org.apache.beam.sdk.transforms.windowing.GlobalWindow$Coder.encode(GlobalWindow.java:59)
> at
> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
> at
> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
> at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:588)
> at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:581)
> at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:541)
> at
> org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:109)
> at
> org.apache.beam.fn.harness.BeamFnDataWriteRunner.consume(BeamFnDataWriteRunner.java:154)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
> at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:178)
> at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:158)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ... 1 more
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)