This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 2e2ed70 SAMZA-2712: AzureBlob SystemProducer: flushtimeout is not
respected if main thread is uploading to azure and azure upload is stuck (#1564)
2e2ed70 is described below
commit 2e2ed70e1904440ac61689e775b02bd95bfd9053
Author: lakshmi-manasa-g <[email protected]>
AuthorDate: Mon Dec 6 10:05:10 2021 -0800
SAMZA-2712: AzureBlob SystemProducer: flushtimeout is not respected if main
thread is uploading to azure and azure upload is stuck (#1564)
---
.../azureblob/avro/AzureBlobOutputStream.java | 12 +++-
.../azureblob/avro/TestAzureBlobOutputStream.java | 69 +++++++++++++++++++++-
samza-azure/src/test/resources/log4j.xml | 2 +-
samza-azure/src/test/resources/log4j2.xml | 2 +-
4 files changed, 81 insertions(+), 4 deletions(-)
diff --git
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
index 8ca97e0..0e05aef 100644
---
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
+++
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
@@ -21,6 +21,7 @@ package org.apache.samza.system.azureblob.avro;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
import org.apache.samza.AzureException;
import org.apache.samza.system.azureblob.compression.Compression;
import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
@@ -44,6 +45,9 @@ import
org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
/**
* This class extends {@link java.io.OutputStream} and uses {@link
java.io.ByteArrayOutputStream}
@@ -273,7 +277,13 @@ public class AzureBlobOutputStream extends OutputStream {
// SAMZA-2476 stubbing BlockBlobAsyncClient.stageBlock was causing flaky
tests.
@VisibleForTesting
void stageBlock(String blockIdEncoded, ByteBuffer outputStream, int
blockSize) throws InterruptedException {
- blobAsyncClient.stageBlock(blockIdEncoded, Flux.just(outputStream),
blockSize).block();
+ invokeBlobClientStageBlock(blockIdEncoded, outputStream,
blockSize).subscribeOn(Schedulers.boundedElastic()).block(
+ Duration.ofMillis(flushTimeoutMs));
+ }
+
+ @VisibleForTesting
+ Mono<Void> invokeBlobClientStageBlock(String blockIdEncoded, ByteBuffer
outputStream, int blockSize) {
+ return blobAsyncClient.stageBlock(blockIdEncoded, Flux.just(outputStream),
blockSize);
}
// blockList cleared makes it hard to test close
diff --git
a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
index 4412edf..e4f3cc5 100644
---
a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
+++
b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
@@ -21,6 +21,7 @@ package org.apache.samza.system.azureblob.avro;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
import org.apache.samza.AzureException;
import org.apache.samza.system.azureblob.compression.Compression;
import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
@@ -46,6 +47,8 @@ import org.mockito.ArgumentCaptor;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import static org.mockito.Mockito.any;
@@ -85,6 +88,9 @@ public class TestAzureBlobOutputStream {
private static final String BLOB_RECORD_NUMBER_METADATA = "numberOfRecords";
private final BlobMetadataGeneratorFactory blobMetadataGeneratorFactory =
mock(BlobMetadataGeneratorFactory.class);
private final Config blobMetadataGeneratorConfig = mock(Config.class);
+ private final BlobMetadataGenerator mockBlobMetadataGenerator =
mock(BlobMetadataGenerator.class);
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TestAzureBlobOutputStream.class);
@Before
public void setup() throws Exception {
@@ -103,7 +109,6 @@ public class TestAzureBlobOutputStream {
mockCompression = mock(Compression.class);
doReturn(COMPRESSED_BYTES).when(mockCompression).compress(BYTES);
- BlobMetadataGenerator mockBlobMetadataGenerator =
mock(BlobMetadataGenerator.class);
doAnswer(invocation -> {
BlobMetadataContext blobMetadataContext = invocation.getArgumentAt(0,
BlobMetadataContext.class);
String streamName = blobMetadataContext.getStreamName();
@@ -416,6 +421,68 @@ public class TestAzureBlobOutputStream {
Assert.assertEquals(BYTES.length + BYTES.length - 10,
azureBlobOutputStream.getSize());
}
+ /**
+ * Test to ensure that flush timeout is respected even if the block upload
to azure is stuck/ taking longer than flush timeout
+ * a countdown latch is used to mimic the upload to azure stuck
+ * if flush timeout is respected then an exception is thrown when the
flushtimeout_ms duration expires
+ * else if timeout is not respected (aka bug is not fixed) then no exception
is thrown and test hangs
+ * In this test, the flush timeout is chosen to be 10 milliseconds, at the
end of which, an AzureException of upload failed is thrown.
+ * @throws Exception
+ * @throws InterruptedException
+ */
+ @Test(expected = AzureException.class)
+ public void testRespectFlushTimeout() throws Exception,
InterruptedException {
+ // get the threadpool to be the exactly the same as that passed down to
AzureBlobOutputStream from AzureBlobSystemProducer
+ threadPool = new ThreadPoolExecutor(1, 1, 60,
+ TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(1), new
ThreadPoolExecutor.CallerRunsPolicy());
+
+ // set a very small flushtimeout of 10ms to avoid test taking too long to
complete
+ azureBlobOutputStream = spy(new AzureBlobOutputStream(mockBlobAsyncClient,
threadPool, mockMetrics,
+ blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, FAKE_STREAM,
+ 10, THRESHOLD, mockByteArrayOutputStream, mockCompression));
+
+ doNothing().when(azureBlobOutputStream).clearAndMarkClosed();
+
doReturn(mockBlobMetadataGenerator).when(azureBlobOutputStream).getBlobMetadataGenerator();
+ when(mockCompression.compress(BYTES)).thenReturn(COMPRESSED_BYTES,
COMPRESSED_BYTES, COMPRESSED_BYTES, COMPRESSED_BYTES);
+
+ // create a latch to mimic uploads getting stuck
+ // and hence unable to honor flush timeout without the fix in stageblock
+ // fix in stageBlock =
subscribeOn(Schedulers.boundedElastic()).block(flushtimeout)
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocation -> {
+
+ String blockid = invocation.getArgumentAt(0, String.class);
+ return Mono.just(1).map(integer -> {
+ try {
+ LOG.info("For block id = " + blockid + " start waiting on the
countdown latch ");
+ // start indefinite stuck -> mimic upload stuck
+ latch.await();
+ // below log will never be reached
+ LOG.info("For block id = " + blockid + " done waiting on the
countdown latch ");
+ } catch (Exception e) {
+ LOG.info("For block id = " + blockid + " an exception was caught " +
e);
+ }
+ return "One";
+ });
+ }).when(azureBlobOutputStream).invokeBlobClientStageBlock(anyString(),
anyObject(), anyInt());
+
+ doAnswer(invocation -> {
+ LOG.info("commit block ");
+ return null;
+ }).when(azureBlobOutputStream).commitBlob(anyObject(), anyMap());
+
+ azureBlobOutputStream.write(BYTES, 0, THRESHOLD / 2);
+ azureBlobOutputStream.write(BYTES, THRESHOLD / 2, THRESHOLD / 2);
+ azureBlobOutputStream.write(BYTES, 0, THRESHOLD / 2);
+ azureBlobOutputStream.write(BYTES, THRESHOLD / 2, THRESHOLD / 2);
+ azureBlobOutputStream.write(BYTES, 0, THRESHOLD / 2);
+ azureBlobOutputStream.write(BYTES, THRESHOLD / 2, THRESHOLD / 2);
+ // close will wait for all pending uploads to finish
+ // since the uploads are "stuck" (waiting for latch countdown),
flushtimeout will get triggered
+ // and throw an exception saying upload failed.
+ azureBlobOutputStream.close();
+ }
+
private String blockIdEncoded(int blockNum) {
String blockId = String.format("%05d", blockNum);
return Base64.getEncoder().encodeToString(blockId.getBytes());
diff --git a/samza-azure/src/test/resources/log4j.xml
b/samza-azure/src/test/resources/log4j.xml
index 4969cfd..8f302a3 100644
--- a/samza-azure/src/test/resources/log4j.xml
+++ b/samza-azure/src/test/resources/log4j.xml
@@ -16,7 +16,7 @@
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern"
- value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n" />
+ value="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %c{1}:%L [%t] - %m%n" />
</layout>
</appender>
diff --git a/samza-azure/src/test/resources/log4j2.xml
b/samza-azure/src/test/resources/log4j2.xml
index db72813..cdeff7e 100644
--- a/samza-azure/src/test/resources/log4j2.xml
+++ b/samza-azure/src/test/resources/log4j2.xml
@@ -15,7 +15,7 @@
<Appenders>
<Console name="STDOUT" target="SYSTEM_OUT">
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %c{1}:%L [%t] -
%m%n"/>
</Console>
</Appenders>