sshahar1 opened a new issue, #21882:
URL: https://github.com/apache/beam/issues/21882
### What happened?
I have a pipeline that reads data, windows it, groups, aggregates and write
to a relational DB
Code below in Kotlin
```
pipeline
.apply(
"WaitForNewFiles",
FileIO.match()
.filepattern(inputPath)
.continuously(
// Check for new files every 2 minute
Duration.standardMinutes(2),
// Never stop checking for new files
Watch.Growth.never()
)
)
.apply("GetFiles", FileIO.readMatches())
.apply(TextIO.readFiles())
.apply("ToTuple", ParDo.of(toTuple))
.apply(
"WaitForLines",
Window.into<T>(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(Duration.ZERO)
).apply("Filter", Filter.by(Matcher()))
.apply("Flatten", ParDo.of(FlattenRecords()))
.setCoder(AvroCoder.of(MyRecord::class.java))
.apply("Insert to database",
JdbcIO.write<MyRecord>()
.withDataSourceProviderFn(dsProvider)
.withStatement(cloud_sql_rome_records_insert)
.withPreparedStatementSetter(preparedSetter()))
```
Up until upgrading to Apache Beam 2.37 it worked, but after an upgrade we
started getting the following Exceptions (and alot of them):
```
java.lang.ClassCastException: class
org.apache.beam.sdk.transforms.windowing.GlobalWindow cannot be cast to class
org.apache.beam.sdk.transforms.windowing.IntervalWindow
(org.apache.beam.sdk.transforms.windowing.GlobalWindow and
org.apache.beam.sdk.transforms.windowing.IntervalWindow are in unnamed module
of loader 'app')
at
org.apache.beam.sdk.transforms.windowing.IntervalWindow$IntervalWindowCoder.registerByteSizeObserver(IntervalWindow.java:142)
at
org.apache.beam.sdk.coders.IterableLikeCoder.registerByteSizeObserver(IterableLikeCoder.java:211)
at
org.apache.beam.sdk.coders.IterableLikeCoder.registerByteSizeObserver(IterableLikeCoder.java:60)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:640)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558)
at
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:403)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128)
at
org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
at
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output(SimpleDoFnRunner.java:326)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output(SimpleDoFnRunner.java:321)
at org.apache.beam.sdk.io.jdbc.JdbcIO$1.finish(JdbcIO.java:1613)
```
The new code introduced in 2.37 in JDBCIO includes:
```
@FinishBundle
public void finish(FinishBundleContext c) {
if (outputList != null && outputList.size() > 0) {
c.output(outputList, Instant.now(),
GlobalWindow.INSTANCE);
}
outputList = null;
}
```
https://github.com/apache/beam/blob/4ffeae4d2b800f2df36d2ea2eab549f2204d5691/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1610
So, from what I understand the time interval and global interval conflict.
This is a very vague exception obviously and should be addressed in
documentation.
Thanks in advance
### Issue Priority
Priority: 2
### Issue Component
Component: io-java-jdbc
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]