This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 83204fc AzureBlobSystemProducer: Catch all Exception in
completableFuture during flush of producer (#1363)
83204fc is described below
commit 83204fc6e72e1a18e3b70420e777bce079c92757
Author: lakshmi-manasa-g <[email protected]>
AuthorDate: Tue Feb 16 18:59:07 2021 -0800
AzureBlobSystemProducer: Catch all Exception in completableFuture during
flush of producer (#1363)
* AzureBlobSystemProducer: Catch all throwable in completableFuture during
flush of the producer
* catch Exception instead of throwable
* Empty commit to Trigger Travis Build
* Empty commit to Trigger Travis Build again
* fix checkstyle build failure
---
.../azureblob/producer/AzureBlobSystemProducer.java | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
diff --git
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
index 5ecd528..d89f38f 100644
---
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
+++
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
@@ -486,15 +486,15 @@ public class AzureBlobSystemProducer implements
SystemProducer {
sourceWriterMap.forEach((stream, writer) -> {
LOG.info("Closing topic:{}", stream);
CompletableFuture<Void> future = CompletableFuture.runAsync(new
Runnable() {
- @Override
- public void run() {
- try {
- writer.close();
- } catch (IOException e) {
- throw new SystemProducerException("Close failed for topic " +
stream, e);
+ @Override
+ public void run() {
+ try {
+ writer.close();
+ } catch (Exception e) {
+ throw new SystemProducerException("Close failed for topic " +
stream, e);
+ }
}
- }
- }, asyncBlobThreadPool);
+ }, asyncBlobThreadPool);
pendingClose.add(future);
future.handle((aVoid, throwable) -> {
sourceWriterMap.remove(writer);