This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch 0.12.2
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.12.2 by this push:
     new 0d6a5ed  Proper handling of the exceptions from auto persisting in 
AppenderatorImpl.add() (#5932) (#5984)
0d6a5ed is described below

commit 0d6a5ed3b66e6142b7b35277b71fbc3bae4e6fb9
Author: Jihoon Son <[email protected]>
AuthorDate: Mon Jul 9 13:55:17 2018 -0700

    Proper handling of the exceptions from auto persisting in 
AppenderatorImpl.add() (#5932) (#5984)
---
 .../druid/common/guava/ThreadRenamingCallable.java |  4 +-
 .../realtime/appenderator/AppenderatorImpl.java    | 50 ++++++++++++++++++----
 2 files changed, 44 insertions(+), 10 deletions(-)

diff --git 
a/common/src/main/java/io/druid/common/guava/ThreadRenamingCallable.java 
b/common/src/main/java/io/druid/common/guava/ThreadRenamingCallable.java
index 2fd6877..811fe59 100644
--- a/common/src/main/java/io/druid/common/guava/ThreadRenamingCallable.java
+++ b/common/src/main/java/io/druid/common/guava/ThreadRenamingCallable.java
@@ -35,7 +35,7 @@ public abstract class ThreadRenamingCallable<T> implements 
Callable<T>
   }
 
   @Override
-  public final T call()
+  public final T call() throws Exception
   {
     final Thread currThread = Thread.currentThread();
     String currName = currThread.getName();
@@ -48,5 +48,5 @@ public abstract class ThreadRenamingCallable<T> implements 
Callable<T>
     }
   }
 
-  public abstract T doCall();
+  public abstract T doCall() throws Exception;
 }
diff --git 
a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
 
b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
index 70c2218..3f0f0ba 100644
--- 
a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++ 
b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -33,12 +33,11 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.java.util.emitter.EmittingLogger;
-import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
 import io.druid.common.guava.ThreadRenamingCallable;
@@ -48,10 +47,13 @@ import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.IAE;
 import io.druid.java.util.common.ISE;
 import io.druid.java.util.common.Pair;
+import io.druid.java.util.common.RE;
 import io.druid.java.util.common.RetryUtils;
 import io.druid.java.util.common.StringUtils;
 import io.druid.java.util.common.concurrent.Execs;
 import io.druid.java.util.common.io.Closer;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.query.Query;
 import io.druid.query.QueryRunner;
 import io.druid.query.QueryRunnerFactoryConglomerate;
@@ -131,6 +133,8 @@ public class AppenderatorImpl implements Appenderator
   // and abandon threads do not step over each other
   private final Lock commitLock = new ReentrantLock();
 
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+
   private volatile ListeningExecutorService persistExecutor = null;
   private volatile ListeningExecutorService pushExecutor = null;
   // use intermediate executor so that deadlock conditions can be prevented
@@ -140,7 +144,8 @@ public class AppenderatorImpl implements Appenderator
   private volatile long nextFlush;
   private volatile FileLock basePersistDirLock = null;
   private volatile FileChannel basePersistDirLockChannel = null;
-  private AtomicBoolean closed = new AtomicBoolean(false);
+
+  private volatile Throwable persistError;
 
   public AppenderatorImpl(
       DataSchema schema,
@@ -198,6 +203,13 @@ public class AppenderatorImpl implements Appenderator
     return retVal;
   }
 
+  private void throwPersistErrorIfExists()
+  {
+    if (persistError != null) {
+      throw new RE(persistError, "Error while persisting");
+    }
+  }
+
   @Override
   public AppenderatorAddResult add(
       final SegmentIdentifier identifier,
@@ -206,6 +218,8 @@ public class AppenderatorImpl implements Appenderator
       final boolean allowIncrementalPersists
   ) throws IndexSizeExceededException, SegmentNotWritableException
   {
+    throwPersistErrorIfExists();
+
     if (!identifier.getDataSource().equals(schema.getDataSource())) {
       throw new IAE(
           "Expected dataSource[%s] but was asked to insert row for 
dataSource[%s]?!",
@@ -244,7 +258,23 @@ public class AppenderatorImpl implements Appenderator
         || rowsCurrentlyInMemory.get() >= tuningConfig.getMaxRowsInMemory()) {
       if (allowIncrementalPersists) {
         // persistAll clears rowsCurrentlyInMemory, no need to update it.
-        persistAll(committerSupplier == null ? null : committerSupplier.get());
+        Futures.addCallback(
+            persistAll(committerSupplier == null ? null : 
committerSupplier.get()),
+            new FutureCallback<Object>()
+            {
+              @Override
+              public void onSuccess(@Nullable Object result)
+              {
+                // do nothing
+              }
+
+              @Override
+              public void onFailure(Throwable t)
+              {
+                persistError = t;
+              }
+            }
+        );
       } else {
         isPersistRequired = true;
       }
@@ -340,6 +370,8 @@ public class AppenderatorImpl implements Appenderator
     // Drop commit metadata, then abandon all segments.
 
     try {
+      throwPersistErrorIfExists();
+
       if (persistExecutor != null) {
         final ListenableFuture<?> uncommitFuture = persistExecutor.submit(
             new Callable<Object>()
@@ -373,7 +405,7 @@ public class AppenderatorImpl implements Appenderator
       }
     }
     catch (ExecutionException e) {
-      throw Throwables.propagate(e);
+      throw new RuntimeException(e);
     }
   }
 
@@ -391,6 +423,8 @@ public class AppenderatorImpl implements Appenderator
   @Override
   public ListenableFuture<Object> persistAll(@Nullable final Committer 
committer)
   {
+    throwPersistErrorIfExists();
+
     final Map<String, Integer> currentHydrants = Maps.newHashMap();
     final List<Pair<FireHydrant, SegmentIdentifier>> indexesToPersist = 
Lists.newArrayList();
     int numPersistedRows = 0;
@@ -427,7 +461,7 @@ public class AppenderatorImpl implements Appenderator
         new ThreadRenamingCallable<Object>(threadName)
         {
           @Override
-          public Object doCall()
+          public Object doCall() throws IOException
           {
             try {
               for (Pair<FireHydrant, SegmentIdentifier> pair : 
indexesToPersist) {
@@ -469,9 +503,9 @@ public class AppenderatorImpl implements Appenderator
               // return null if committer is null
               return commitMetadata;
             }
-            catch (Exception e) {
+            catch (IOException e) {
               metrics.incrementFailedPersists();
-              throw Throwables.propagate(e);
+              throw e;
             }
             finally {
               metrics.incrementNumPersists();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to