[ 
https://issues.apache.org/jira/browse/BEAM-10143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140703#comment-17140703
 ] 

Brian Hulette commented on BEAM-10143:
--------------------------------------

Taking a look at this this morning. I had some difficulty replicating because I 
couldn't get the type hints to work correctly. I may just not understand how 
typehints are supposed to work with windowing. Here's a test that replicates 
the error:

{code:python}
  def test_windowing_before_sql(self):                                          
     with TestPipeline() as p:                                                  
 
       out = (p | beam.Create([                                                 
 
           beam.window.TimestampedValue(SimpleRow(1, "foo", 1.), 5),            
 
           beam.window.TimestampedValue(SimpleRow(2, "bar", 2.), 15),           
 
           beam.window.TimestampedValue(SimpleRow(3, "baz", 3.), 25)])          
 
              | beam.WindowInto(beam.window.FixedWindows(10))                   
 
              | beam.Map(lambda tv: tv).with_output_types(SimpleRow)            
                                                                                
                                                             
              | SqlTransform("SELECT COUNT(*) as `count` FROM PCOLLECTION"))    
 
       assert_that(out, equal_to([(1,),(1,),(1,)]))
{code}

Note the odd Map transform that's just there to register the SimpleRow typehint.

That's an orthogonal issue though. I'll keep digging to see why this raises the 
Java error.

> ClassCastException in GROUP BY with non-global window
> -----------------------------------------------------
>
>                 Key: BEAM-10143
>                 URL: https://issues.apache.org/jira/browse/BEAM-10143
>             Project: Beam
>          Issue Type: Bug
>          Components: dsl-sql, sdk-py-core
>            Reporter: Maximilian Michels
>            Priority: P1
>
> I'm using the SqlTransform as an external transform from within a Python
> pipeline. I apply windowing before a GROUP BY query as mentioned as the first 
> option in 
> https://beam.apache.org/documentation/dsls/sql/extensions/windowing-and-triggering/:
> {code:python}
>   input
>       | "Window" >> beam.WindowInto(window.FixedWindows(30))
>       | "Aggregate" >>
>       SqlTransform("""Select field, count(field) from PCOLLECTION
>                       WHERE ...
>                     GROUP BY field
>                    """)
> {code}
> This results in an exception:
> {noformat}
> Caused by: java.lang.ClassCastException: 
> org.apache.beam.sdk.transforms.windowing.IntervalWindow cannot be cast to 
> org.apache.beam.sdk.transforms.windowing.GlobalWindow
>       at 
> org.apache.beam.sdk.transforms.windowing.GlobalWindow$Coder.encode(GlobalWindow.java:59)
>       at 
> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
>       at 
> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
>       at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:588)
>       at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:581)
>       at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:541)
>       at 
> org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:109)
>       at 
> org.apache.beam.fn.harness.BeamFnDataWriteRunner.consume(BeamFnDataWriteRunner.java:154)
>       at 
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>       at 
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
>       at 
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:178)
>       at 
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:158)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       ... 1 more
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to