This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new c9a27e3 Don't let catch/finally suppress main exception in
IncrementalPublishingKafkaIndexTaskRunner (#6258)
c9a27e3 is described below
commit c9a27e3e8e3ad4dc3a67037daa95677b4d3bae2e
Author: Jonathan Wei <[email protected]>
AuthorDate: Tue Aug 28 16:12:02 2018 -0700
Don't let catch/finally suppress main exception in
IncrementalPublishingKafkaIndexTaskRunner (#6258)
---
.../IncrementalPublishingKafkaIndexTaskRunner.java | 66 +++++++++++++++++-----
1 file changed, 52 insertions(+), 14 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index 434e16e..660ee01 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -317,6 +317,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner
implements KafkaIndexTask
)
);
+ Throwable caughtExceptionOuter = null;
try (final KafkaConsumer<byte[], byte[]> consumer = task.newConsumer()) {
toolbox.getDataSegmentServerAnnouncer().announce();
toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
@@ -412,6 +413,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner
implements KafkaIndexTask
// Could eventually support leader/follower mode (for keeping replicas
more in sync)
boolean stillReading = !assignment.isEmpty();
status = Status.READING;
+ Throwable caughtExceptionInner = null;
try {
while (stillReading) {
if (possiblyPause()) {
@@ -616,12 +618,22 @@ public class IncrementalPublishingKafkaIndexTaskRunner
implements KafkaIndexTask
}
catch (Exception e) {
// (1) catch all exceptions while reading from kafka
+ caughtExceptionInner = e;
log.error(e, "Encountered exception in run() before persisting.");
throw e;
}
finally {
log.info("Persisting all pending data");
- driver.persist(committerSupplier.get()); // persist pending data
+ try {
+ driver.persist(committerSupplier.get()); // persist pending data
+ }
+ catch (Exception e) {
+ if (caughtExceptionInner != null) {
+ caughtExceptionInner.addSuppressed(e);
+ } else {
+ throw e;
+ }
+ }
}
synchronized (statusLock) {
@@ -687,9 +699,18 @@ public class IncrementalPublishingKafkaIndexTaskRunner
implements KafkaIndexTask
catch (InterruptedException | RejectedExecutionException e) {
// (2) catch InterruptedException and RejectedExecutionException thrown
for the whole ingestion steps including
// the final publishing.
- Futures.allAsList(publishWaitList).cancel(true);
- Futures.allAsList(handOffWaitList).cancel(true);
- appenderator.closeNow();
+ caughtExceptionOuter = e;
+ try {
+ Futures.allAsList(publishWaitList).cancel(true);
+ Futures.allAsList(handOffWaitList).cancel(true);
+ if (appenderator != null) {
+ appenderator.closeNow();
+ }
+ }
+ catch (Exception e2) {
+ e.addSuppressed(e2);
+ }
+
// handle the InterruptedException that gets wrapped in a
RejectedExecutionException
if (e instanceof RejectedExecutionException
&& (e.getCause() == null || !(e.getCause() instanceof
InterruptedException))) {
@@ -706,21 +727,38 @@ public class IncrementalPublishingKafkaIndexTaskRunner
implements KafkaIndexTask
}
catch (Exception e) {
// (3) catch all other exceptions thrown for the whole ingestion steps
including the final publishing.
- Futures.allAsList(publishWaitList).cancel(true);
- Futures.allAsList(handOffWaitList).cancel(true);
- appenderator.closeNow();
+ caughtExceptionOuter = e;
+ try {
+ Futures.allAsList(publishWaitList).cancel(true);
+ Futures.allAsList(handOffWaitList).cancel(true);
+ if (appenderator != null) {
+ appenderator.closeNow();
+ }
+ }
+ catch (Exception e2) {
+ e.addSuppressed(e2);
+ }
throw e;
}
finally {
- if (driver != null) {
- driver.close();
+ try {
+ if (driver != null) {
+ driver.close();
+ }
+ if (chatHandlerProvider.isPresent()) {
+ chatHandlerProvider.get().unregister(task.getId());
+ }
+
+ toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
+ toolbox.getDataSegmentServerAnnouncer().unannounce();
}
- if (chatHandlerProvider.isPresent()) {
- chatHandlerProvider.get().unregister(task.getId());
+ catch (Exception e) {
+ if (caughtExceptionOuter != null) {
+ caughtExceptionOuter.addSuppressed(e);
+ } else {
+ throw e;
+ }
}
-
- toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
- toolbox.getDataSegmentServerAnnouncer().unannounce();
}
toolbox.getTaskReportFileWriter().write(getTaskCompletionReports(null));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]