sshahar1 opened a new issue, #21882:
URL: https://github.com/apache/beam/issues/21882

   ### What happened?
   
   I have a pipeline that reads data, windows it, groups, aggregates and write 
to a relational DB
   Code below in Kotlin
   ```
   pipeline
       .apply(
           "WaitForNewFiles",
           FileIO.match()
               .filepattern(inputPath)
               .continuously(
                   // Check for new files every 2 minute
                   Duration.standardMinutes(2),
                   // Never stop checking for new files
                   Watch.Growth.never()
               )
       )
       .apply("GetFiles", FileIO.readMatches())
       .apply(TextIO.readFiles())
       .apply("ToTuple", ParDo.of(toTuple))
       .apply(
           "WaitForLines",
           Window.into<T>(FixedWindows.of(Duration.standardMinutes(1)))
               .triggering(AfterWatermark.pastEndOfWindow())
               .discardingFiredPanes()
               .withAllowedLateness(Duration.ZERO)
       ).apply("Filter", Filter.by(Matcher()))
           .apply("Flatten", ParDo.of(FlattenRecords()))
           .setCoder(AvroCoder.of(MyRecord::class.java))
           .apply("Insert to database",
               JdbcIO.write<MyRecord>()
                   .withDataSourceProviderFn(dsProvider)
                   .withStatement(cloud_sql_rome_records_insert)
                   .withPreparedStatementSetter(preparedSetter()))
   ```
   
   Up until upgrading to Apache Beam 2.37 it worked, but after an upgrade we 
started getting the following Exceptions (and alot of them):
   ```
   java.lang.ClassCastException: class 
org.apache.beam.sdk.transforms.windowing.GlobalWindow cannot be cast to class 
org.apache.beam.sdk.transforms.windowing.IntervalWindow 
(org.apache.beam.sdk.transforms.windowing.GlobalWindow and 
org.apache.beam.sdk.transforms.windowing.IntervalWindow are in unnamed module 
of loader 'app')
   at 
org.apache.beam.sdk.transforms.windowing.IntervalWindow$IntervalWindowCoder.registerByteSizeObserver(IntervalWindow.java:142)
   at 
org.apache.beam.sdk.coders.IterableLikeCoder.registerByteSizeObserver(IterableLikeCoder.java:211)
   at 
org.apache.beam.sdk.coders.IterableLikeCoder.registerByteSizeObserver(IterableLikeCoder.java:60)
   at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:640)
   at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558)
   at 
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:403)
   at 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128)
   at 
org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67)
   at 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
   at 
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
   at 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268)
   at 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
   at 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output(SimpleDoFnRunner.java:326)
   at 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output(SimpleDoFnRunner.java:321)
   at org.apache.beam.sdk.io.jdbc.JdbcIO$1.finish(JdbcIO.java:1613)
   ```
   
   The new code introduced in 2.37 in JDBCIO includes:
   ```
   @FinishBundle
                       public void finish(FinishBundleContext c) {
                         if (outputList != null && outputList.size() > 0) {
                           c.output(outputList, Instant.now(), 
GlobalWindow.INSTANCE);
                         }
                         outputList = null;
                       }
   ```
   
https://github.com/apache/beam/blob/4ffeae4d2b800f2df36d2ea2eab549f2204d5691/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1610
   
   So, from what I understand the time interval and global interval conflict.
   
   This is a very vague exception obviously and should be addressed in 
documentation.
   
   Thanks in advance
   
   ### Issue Priority
   
   Priority: 2
   
   ### Issue Component
   
   Component: io-java-jdbc


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to