adude3141 commented on a change in pull request #15955: URL: https://github.com/apache/beam/pull/15955#discussion_r753827901
########## File path: sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java ########## @@ -1067,8 +1063,11 @@ private void checkForFailures(String message) throws IOException { @Teardown public void teardown() throws Exception { - if (producer != null && producer.getOutstandingRecordsCount() > 0) { - producer.flushSync(); + if (producer != null) { + if (producer.getOutstandingRecordsCount() > 0) { + producer.flushSync(); + } + producer.destroy(); } producer = null; Review comment: @aromanenko-dev After stumbling across this intended change, and as you reviewed the [originating MR](https://github.com/apache/beam/pull/9640) where the code is originating I am wondering, how this is working at all. Iiuc this var 'producer' is static on KinesisWriterFn. Couldn't setting this to 'null' in @Teardown of some DoFn instance result in random NPEs thrown in some other DoFn currently running @ProcessElement or @FinishBundle? As it seems, it does only work because unsetting producer simply does not cross some memory fences? So probably I have a misunderstanding of the inner workings here. What are you thoughts about this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org