This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e2ed70  SAMZA-2712: AzureBlob SystemProducer: flushtimeout is not 
respected if main thread is uploading to azure and azure upload is stuck (#1564)
2e2ed70 is described below

commit 2e2ed70e1904440ac61689e775b02bd95bfd9053
Author: lakshmi-manasa-g <[email protected]>
AuthorDate: Mon Dec 6 10:05:10 2021 -0800

    SAMZA-2712: AzureBlob SystemProducer: flushtimeout is not respected if main 
thread is uploading to azure and azure upload is stuck (#1564)
---
 .../azureblob/avro/AzureBlobOutputStream.java      | 12 +++-
 .../azureblob/avro/TestAzureBlobOutputStream.java  | 69 +++++++++++++++++++++-
 samza-azure/src/test/resources/log4j.xml           |  2 +-
 samza-azure/src/test/resources/log4j2.xml          |  2 +-
 4 files changed, 81 insertions(+), 4 deletions(-)

diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
index 8ca97e0..0e05aef 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
@@ -21,6 +21,7 @@ package org.apache.samza.system.azureblob.avro;
 
 import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
 import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
 import org.apache.samza.AzureException;
 import org.apache.samza.system.azureblob.compression.Compression;
 import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
@@ -44,6 +45,9 @@ import 
org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
 
 /**
  * This class extends {@link java.io.OutputStream} and uses {@link 
java.io.ByteArrayOutputStream}
@@ -273,7 +277,13 @@ public class AzureBlobOutputStream extends OutputStream {
   // SAMZA-2476 stubbing BlockBlobAsyncClient.stageBlock was causing flaky 
tests.
   @VisibleForTesting
   void stageBlock(String blockIdEncoded, ByteBuffer outputStream, int 
blockSize) throws InterruptedException {
-    blobAsyncClient.stageBlock(blockIdEncoded, Flux.just(outputStream), 
blockSize).block();
+    invokeBlobClientStageBlock(blockIdEncoded, outputStream, 
blockSize).subscribeOn(Schedulers.boundedElastic()).block(
+        Duration.ofMillis(flushTimeoutMs));
+  }
+
+  @VisibleForTesting
+  Mono<Void> invokeBlobClientStageBlock(String blockIdEncoded, ByteBuffer 
outputStream, int blockSize) {
+    return blobAsyncClient.stageBlock(blockIdEncoded, Flux.just(outputStream), 
blockSize);
   }
 
   // blockList cleared makes it hard to test close
diff --git 
a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
 
b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
index 4412edf..e4f3cc5 100644
--- 
a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
+++ 
b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
@@ -21,6 +21,7 @@ package org.apache.samza.system.azureblob.avro;
 
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
 import org.apache.samza.AzureException;
 import org.apache.samza.system.azureblob.compression.Compression;
 import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
@@ -46,6 +47,8 @@ import org.mockito.ArgumentCaptor;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Mono;
 
 import static org.mockito.Mockito.any;
@@ -85,6 +88,9 @@ public class TestAzureBlobOutputStream {
   private static final String BLOB_RECORD_NUMBER_METADATA = "numberOfRecords";
   private final BlobMetadataGeneratorFactory blobMetadataGeneratorFactory = 
mock(BlobMetadataGeneratorFactory.class);
   private final Config blobMetadataGeneratorConfig = mock(Config.class);
+  private final BlobMetadataGenerator mockBlobMetadataGenerator = 
mock(BlobMetadataGenerator.class);
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestAzureBlobOutputStream.class);
 
   @Before
   public void setup() throws Exception {
@@ -103,7 +109,6 @@ public class TestAzureBlobOutputStream {
     mockCompression = mock(Compression.class);
     doReturn(COMPRESSED_BYTES).when(mockCompression).compress(BYTES);
 
-    BlobMetadataGenerator mockBlobMetadataGenerator = 
mock(BlobMetadataGenerator.class);
     doAnswer(invocation -> {
       BlobMetadataContext blobMetadataContext = invocation.getArgumentAt(0, 
BlobMetadataContext.class);
       String streamName = blobMetadataContext.getStreamName();
@@ -416,6 +421,68 @@ public class TestAzureBlobOutputStream {
     Assert.assertEquals(BYTES.length + BYTES.length - 10, 
azureBlobOutputStream.getSize());
   }
 
+  /**
+   * Test to ensure that flush timeout is respected even if the block upload 
to azure is stuck/ taking longer than flush timeout
+   * a countdown latch is used to mimic the upload to azure stuck
+   * if flush timeout is respected then an exception is thrown when the 
flushtimeout_ms duration expires
+   * else if timeout is not respected (aka bug is not fixed) then no exception 
is thrown and test hangs
+   * In this test, the flush timeout is chosen to be 10 milliseconds, at the 
end of which, an AzureException of upload failed is thrown.
+   * @throws Exception
+   * @throws InterruptedException
+   */
+  @Test(expected = AzureException.class)
+  public void  testRespectFlushTimeout() throws Exception, 
InterruptedException {
+    // get the threadpool to be the exactly the same as that passed down to 
AzureBlobOutputStream from AzureBlobSystemProducer
+    threadPool = new ThreadPoolExecutor(1, 1, 60,
+        TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(1), new 
ThreadPoolExecutor.CallerRunsPolicy());
+
+    // set a very small flushtimeout of 10ms to avoid test taking too long to 
complete
+    azureBlobOutputStream = spy(new AzureBlobOutputStream(mockBlobAsyncClient, 
threadPool, mockMetrics,
+        blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, FAKE_STREAM,
+        10, THRESHOLD, mockByteArrayOutputStream, mockCompression));
+
+    doNothing().when(azureBlobOutputStream).clearAndMarkClosed();
+    
doReturn(mockBlobMetadataGenerator).when(azureBlobOutputStream).getBlobMetadataGenerator();
+    when(mockCompression.compress(BYTES)).thenReturn(COMPRESSED_BYTES, 
COMPRESSED_BYTES, COMPRESSED_BYTES, COMPRESSED_BYTES);
+
+    // create a latch to mimic uploads getting stuck
+    // and hence unable to honor flush timeout without the fix in stageblock
+    // fix in stageBlock = 
subscribeOn(Schedulers.boundedElastic()).block(flushtimeout)
+    CountDownLatch latch = new CountDownLatch(1);
+    doAnswer(invocation -> {
+
+      String blockid = invocation.getArgumentAt(0, String.class);
+      return Mono.just(1).map(integer -> {
+        try {
+          LOG.info("For block id = " + blockid + " start waiting on the 
countdown latch ");
+          // start indefinite stuck -> mimic upload stuck
+          latch.await();
+          // below log will never be reached
+          LOG.info("For block id = " + blockid + " done waiting on the 
countdown latch ");
+        } catch (Exception e) {
+          LOG.info("For block id = " + blockid + " an exception was caught " + 
e);
+        }
+        return "One";
+      });
+    }).when(azureBlobOutputStream).invokeBlobClientStageBlock(anyString(), 
anyObject(), anyInt());
+
+    doAnswer(invocation -> {
+      LOG.info("commit block ");
+      return null;
+    }).when(azureBlobOutputStream).commitBlob(anyObject(), anyMap());
+
+    azureBlobOutputStream.write(BYTES, 0, THRESHOLD / 2);
+    azureBlobOutputStream.write(BYTES, THRESHOLD / 2, THRESHOLD / 2);
+    azureBlobOutputStream.write(BYTES, 0, THRESHOLD / 2);
+    azureBlobOutputStream.write(BYTES, THRESHOLD / 2, THRESHOLD / 2);
+    azureBlobOutputStream.write(BYTES, 0, THRESHOLD / 2);
+    azureBlobOutputStream.write(BYTES, THRESHOLD / 2, THRESHOLD / 2);
+    // close will wait for all pending uploads to finish
+    // since the uploads are "stuck" (waiting for latch countdown), 
flushtimeout will get triggered
+    // and throw an exception saying upload failed.
+    azureBlobOutputStream.close();
+  }
+
   private String blockIdEncoded(int blockNum) {
     String blockId = String.format("%05d", blockNum);
     return Base64.getEncoder().encodeToString(blockId.getBytes());
diff --git a/samza-azure/src/test/resources/log4j.xml 
b/samza-azure/src/test/resources/log4j.xml
index 4969cfd..8f302a3 100644
--- a/samza-azure/src/test/resources/log4j.xml
+++ b/samza-azure/src/test/resources/log4j.xml
@@ -16,7 +16,7 @@
   <appender name="console" class="org.apache.log4j.ConsoleAppender">
     <layout class="org.apache.log4j.PatternLayout">
       <param name="ConversionPattern"
-             value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n" />
+             value="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %c{1}:%L [%t] - %m%n" />
     </layout>
   </appender>
 
diff --git a/samza-azure/src/test/resources/log4j2.xml 
b/samza-azure/src/test/resources/log4j2.xml
index db72813..cdeff7e 100644
--- a/samza-azure/src/test/resources/log4j2.xml
+++ b/samza-azure/src/test/resources/log4j2.xml
@@ -15,7 +15,7 @@
 
   <Appenders>
     <Console name="STDOUT" target="SYSTEM_OUT">
-      <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n"/>
+      <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %c{1}:%L [%t] - 
 %m%n"/>
     </Console>
   </Appenders>
 

Reply via email to