This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new eb3de5c SAMZA-2556:AzureBlob SystemProducer: Fix NPE thrown during
flush. NPE caused by unhandled previous exceptions (#1392)
eb3de5c is described below
commit eb3de5cc116b489cccde7076f6c672e2535e2a3e
Author: lakshmi-manasa-g <[email protected]>
AuthorDate: Wed Feb 10 17:04:12 2021 -0800
SAMZA-2556:AzureBlob SystemProducer: Fix NPE thrown during flush. NPE
caused by unhandled previous exceptions (#1392)
* SAMZA-2556:AzureBlob SystemProducer: Fix NPE thrown during flush. NPE
caused by unhandled previous exceptions
* address comments
* fix checkstyle build failure
---
.../samza/system/azureblob/avro/AzureBlobAvroWriter.java | 4 +++-
.../samza/system/azureblob/avro/TestAzureBlobAvroWriter.java | 12 ++++++++++++
2 files changed, 15 insertions(+), 1 deletion(-)
diff --git
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
index 85c2b33..8798787 100644
---
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
+++
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
@@ -195,7 +195,9 @@ public class AzureBlobAvroWriter implements AzureBlobWriter
{
@Override
public void flush() throws IOException {
synchronized (currentDataFileWriterLock) {
- currentBlobWriterComponents.dataFileWriter.flush();
+ if (!isClosed && currentBlobWriterComponents != null) {
+ currentBlobWriterComponents.dataFileWriter.flush();
+ }
}
}
diff --git
a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
index 51d2cdc..f14b82b 100644
---
a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
+++
b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
@@ -245,6 +245,18 @@ public class TestAzureBlobAvroWriter {
}
@Test
+ public void testNPEinFlush() throws Exception {
+ // do not provide the dataFileWrite, azureBloboutputstream and blockblob
client -- to force creation during first write
+ azureBlobAvroWriter =
+ spy(new
AzureBlobAvroWriter(PowerMockito.mock(BlobContainerAsyncClient.class),
mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD,
+ 60000, "test", null, null, null,
+ blobMetadataGeneratorFactory, blobMetadataGeneratorConfig,
STREAM_NAME,
+ Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false)); //
keeping blob size and number of records unlimited
+ when(azureBlobAvroWriter.encodeRecord((IndexedRecord)
ome.getMessage())).thenThrow(IllegalStateException.class);
+ azureBlobAvroWriter.flush(); // No NPE because has null check for
currentBlobWriterComponents
+ }
+
+ @Test
public void testMaxBlobSizeExceeded() throws Exception {
String blobUrlPrefix = "test";
String blobNameRegex =
"test/[0-9]{4}/[0-9]{2}/[0-9]{2}/[0-9]{2}/[0-9]{2}-[0-9]{2}-.{8}.avro.gz";