This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new eb3de5c  SAMZA-2556:AzureBlob SystemProducer: Fix NPE thrown during 
flush. NPE caused by unhandled previous exceptions (#1392)
eb3de5c is described below

commit eb3de5cc116b489cccde7076f6c672e2535e2a3e
Author: lakshmi-manasa-g <[email protected]>
AuthorDate: Wed Feb 10 17:04:12 2021 -0800

    SAMZA-2556:AzureBlob SystemProducer: Fix NPE thrown during flush. NPE 
caused by unhandled previous exceptions (#1392)
    
    * SAMZA-2556:AzureBlob SystemProducer: Fix NPE thrown during flush. NPE 
caused by unhandled previous exceptions
    
    * address comments
    
    * fix checkstyle build failure
---
 .../samza/system/azureblob/avro/AzureBlobAvroWriter.java     |  4 +++-
 .../samza/system/azureblob/avro/TestAzureBlobAvroWriter.java | 12 ++++++++++++
 2 files changed, 15 insertions(+), 1 deletion(-)

diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
index 85c2b33..8798787 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
@@ -195,7 +195,9 @@ public class AzureBlobAvroWriter implements AzureBlobWriter 
{
   @Override
   public void flush() throws IOException {
     synchronized (currentDataFileWriterLock) {
-      currentBlobWriterComponents.dataFileWriter.flush();
+      if (!isClosed && currentBlobWriterComponents != null) {
+        currentBlobWriterComponents.dataFileWriter.flush();
+      }
     }
   }
 
diff --git 
a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
 
b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
index 51d2cdc..f14b82b 100644
--- 
a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
+++ 
b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
@@ -245,6 +245,18 @@ public class TestAzureBlobAvroWriter {
   }
 
   @Test
+  public void testNPEinFlush() throws Exception {
+    // do not provide the dataFileWrite, azureBloboutputstream and blockblob 
client -- to force creation during first write
+    azureBlobAvroWriter =
+        spy(new 
AzureBlobAvroWriter(PowerMockito.mock(BlobContainerAsyncClient.class), 
mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD,
+            60000, "test", null, null, null,
+            blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, 
STREAM_NAME,
+            Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false)); // 
keeping blob size and number of records unlimited
+    when(azureBlobAvroWriter.encodeRecord((IndexedRecord) 
ome.getMessage())).thenThrow(IllegalStateException.class);
+    azureBlobAvroWriter.flush(); // No NPE because has null check for 
currentBlobWriterComponents
+  }
+
+  @Test
   public void testMaxBlobSizeExceeded() throws Exception {
     String blobUrlPrefix = "test";
     String blobNameRegex = 
"test/[0-9]{4}/[0-9]{2}/[0-9]{2}/[0-9]{2}/[0-9]{2}-[0-9]{2}-.{8}.avro.gz";

Reply via email to