adude3141 commented on a change in pull request #15955:
URL: https://github.com/apache/beam/pull/15955#discussion_r753827901
##########
File path:
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -1067,8 +1063,11 @@ private void checkForFailures(String message) throws
IOException {
@Teardown
public void teardown() throws Exception {
- if (producer != null && producer.getOutstandingRecordsCount() > 0) {
- producer.flushSync();
+ if (producer != null) {
+ if (producer.getOutstandingRecordsCount() > 0) {
+ producer.flushSync();
+ }
+ producer.destroy();
}
producer = null;
Review comment:
@aromanenko-dev After stumbling across this intended change, and as you
reviewed the [originating MR](https://github.com/apache/beam/pull/9640) where
the code is originating I am wondering, how this is working at all.
Iiuc this var 'producer' is static on KinesisWriterFn. Couldn't setting this
to 'null' in @Teardown of some DoFn instance result in random NPEs thrown in
some other DoFn currently running @ProcessElement or @FinishBundle? As it
seems, it does only work because unsetting producer simply does not cross some
memory fences?
So probably I have a misunderstanding of the inner workings here. What are
you thoughts about this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]