This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 61a8b10dbddf93cb63faa0957d55695ab69ad81b
Author: Romain Manni-Bucau <[email protected]>
AuthorDate: Mon Mar 19 16:13:00 2018 +0100

    BEAM-3876 avoid NPE if checkpoint is null in an unbounded source
---
 .../direct/UnboundedReadEvaluatorFactory.java      | 26 ++++++++++++++++++----
 1 file changed, 22 insertions(+), 4 deletions(-)

diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 5c3d2f2..a46d657 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -177,10 +177,28 @@ class UnboundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
                         watermark)));
           } else {
             // End of input. Close the reader after finalizing old checkpoint.
-            shard.getCheckpoint().finalizeCheckpoint();
-            UnboundedReader<?> toClose = reader;
-            reader = null; // Avoid double close below in case of an exception.
-            toClose.close();
+            // note: can be null for empty datasets so ensure to null check 
the checkpoint
+            final CheckpointMarkT checkpoint = shard.getCheckpoint();
+            IOException ioe = null;
+            try {
+              if (checkpoint != null) {
+                checkpoint.finalizeCheckpoint();
+              }
+            } catch (final IOException finalizeCheckpointException) {
+              ioe = finalizeCheckpointException;
+            } finally {
+              try {
+                UnboundedReader<?> toClose = reader;
+                reader = null; // Avoid double close below in case of an 
exception.
+                toClose.close();
+              } catch (final IOException closeEx) {
+                if (ioe != null) {
+                  ioe.addSuppressed(closeEx);
+                } else {
+                  throw closeEx;
+                }
+              }
+            }
           }
         }
       } catch (IOException e) {

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to