adude3141 commented on a change in pull request #15955:
URL: https://github.com/apache/beam/pull/15955#discussion_r753827901



##########
File path: 
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -1067,8 +1063,11 @@ private void checkForFailures(String message) throws 
IOException {
 
       @Teardown
       public void teardown() throws Exception {
-        if (producer != null && producer.getOutstandingRecordsCount() > 0) {
-          producer.flushSync();
+        if (producer != null) {
+          if (producer.getOutstandingRecordsCount() > 0) {
+            producer.flushSync();
+          }
+          producer.destroy();
         }
         producer = null;

Review comment:
       @aromanenko-dev After stumbling across this intended change, and as you 
reviewed the [originating MR](https://github.com/apache/beam/pull/9640) where 
the code is originating I am wondering, how this is working at all.
   
   Iiuc this var 'producer' is static on KinesisWriterFn. Couldn't setting this 
to 'null' in @Teardown of some DoFn instance result in random NPEs thrown in 
some other DoFn currently running @ProcessElement or @FinishBundle? As it 
seems, it does only work because unsetting producer simply does not cross some 
memory fences?
   
   So probably I have a misunderstanding of the inner workings here. What are 
you thoughts about this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to