[
https://issues.apache.org/jira/browse/BEAM-10143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140765#comment-17140765
]
Brian Hulette commented on BEAM-10143:
--------------------------------------
Running out of time for today but I think I'm getting close to the bottom of
this.
There's a mismatch between the windowing strategies that are in the expansion
response and the windowing strategies in the final pipeline proto. In the
expansion response, ref_Windowing_Windowing_1 (the strategy used by all of the
transforms in the expanded SqlTransform) has fixed windows:
{code}
windowing_strategies {
key: "ref_Windowing_Windowing_1"
value {
window_fn {
urn: "beam:window_fn:fixed_windows:v1"
payload: "\n\002\b\n\022\000"
}
merge_status: NON_MERGING
window_coder_id: "ref_Coder_IntervalWindowCoder_2"
trigger {
default {
}
}
accumulation_mode: DISCARDING
output_time: END_OF_WINDOW
closing_behavior: EMIT_ALWAYS
OnTimeBehavior: FIRE_ALWAYS
}
}
{code}
But in the final pipeline ref_Windowing_Windowing_1 is the global window used
before windowing, and after dropping into the global window for assert_that:
{code}
{
key: "ref_Windowing_Windowing_1"
value: {
window_fn: {
urn: "beam:window_fn:global_windows:v1"
}
merge_status: NON_MERGING
window_coder_id: "ref_Coder_GlobalWindowCoder_2"
trigger: {
default: {}
}
accumulation_mode: DISCARDING
output_time: END_OF_WINDOW
closing_behavior: EMIT_ALWAYS
OnTimeBehavior: FIRE_ALWAYS
}
}
{code}
There's a fixed window strategy in the final pipeline, but it's
ref_Windowing_Windowing_3, and all of the Java transforms still reference
ref_Windowing_Windowing_1
CC: [~chamikara] [~heejong] does there need to be some logic for translating
windowing strategies in the expansion service response in python?
> ClassCastException in GROUP BY with non-global window
> -----------------------------------------------------
>
> Key: BEAM-10143
> URL: https://issues.apache.org/jira/browse/BEAM-10143
> Project: Beam
> Issue Type: Bug
> Components: dsl-sql, sdk-py-core
> Reporter: Maximilian Michels
> Priority: P1
>
> I'm using the SqlTransform as an external transform from within a Python
> pipeline. I apply windowing before a GROUP BY query as mentioned as the first
> option in
> https://beam.apache.org/documentation/dsls/sql/extensions/windowing-and-triggering/:
> {code:python}
> input
> | "Window" >> beam.WindowInto(window.FixedWindows(30))
> | "Aggregate" >>
> SqlTransform("""Select field, count(field) from PCOLLECTION
> WHERE ...
> GROUP BY field
> """)
> {code}
> This results in an exception:
> {noformat}
> Caused by: java.lang.ClassCastException:
> org.apache.beam.sdk.transforms.windowing.IntervalWindow cannot be cast to
> org.apache.beam.sdk.transforms.windowing.GlobalWindow
> at
> org.apache.beam.sdk.transforms.windowing.GlobalWindow$Coder.encode(GlobalWindow.java:59)
> at
> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
> at
> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
> at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:588)
> at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:581)
> at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:541)
> at
> org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:109)
> at
> org.apache.beam.fn.harness.BeamFnDataWriteRunner.consume(BeamFnDataWriteRunner.java:154)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
> at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:178)
> at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:158)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ... 1 more
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)