This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch 0.12.2
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.12.2 by this push:
new 0d6a5ed Proper handling of the exceptions from auto persisting in
AppenderatorImpl.add() (#5932) (#5984)
0d6a5ed is described below
commit 0d6a5ed3b66e6142b7b35277b71fbc3bae4e6fb9
Author: Jihoon Son <[email protected]>
AuthorDate: Mon Jul 9 13:55:17 2018 -0700
Proper handling of the exceptions from auto persisting in
AppenderatorImpl.add() (#5932) (#5984)
---
.../druid/common/guava/ThreadRenamingCallable.java | 4 +-
.../realtime/appenderator/AppenderatorImpl.java | 50 ++++++++++++++++++----
2 files changed, 44 insertions(+), 10 deletions(-)
diff --git
a/common/src/main/java/io/druid/common/guava/ThreadRenamingCallable.java
b/common/src/main/java/io/druid/common/guava/ThreadRenamingCallable.java
index 2fd6877..811fe59 100644
--- a/common/src/main/java/io/druid/common/guava/ThreadRenamingCallable.java
+++ b/common/src/main/java/io/druid/common/guava/ThreadRenamingCallable.java
@@ -35,7 +35,7 @@ public abstract class ThreadRenamingCallable<T> implements
Callable<T>
}
@Override
- public final T call()
+ public final T call() throws Exception
{
final Thread currThread = Thread.currentThread();
String currName = currThread.getName();
@@ -48,5 +48,5 @@ public abstract class ThreadRenamingCallable<T> implements
Callable<T>
}
}
- public abstract T doCall();
+ public abstract T doCall() throws Exception;
}
diff --git
a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
index 70c2218..3f0f0ba 100644
---
a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++
b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -33,12 +33,11 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.java.util.emitter.EmittingLogger;
-import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.common.guava.ThreadRenamingCallable;
@@ -48,10 +47,13 @@ import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
+import io.druid.java.util.common.RE;
import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.io.Closer;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactoryConglomerate;
@@ -131,6 +133,8 @@ public class AppenderatorImpl implements Appenderator
// and abandon threads do not step over each other
private final Lock commitLock = new ReentrantLock();
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
private volatile ListeningExecutorService persistExecutor = null;
private volatile ListeningExecutorService pushExecutor = null;
// use intermediate executor so that deadlock conditions can be prevented
@@ -140,7 +144,8 @@ public class AppenderatorImpl implements Appenderator
private volatile long nextFlush;
private volatile FileLock basePersistDirLock = null;
private volatile FileChannel basePersistDirLockChannel = null;
- private AtomicBoolean closed = new AtomicBoolean(false);
+
+ private volatile Throwable persistError;
public AppenderatorImpl(
DataSchema schema,
@@ -198,6 +203,13 @@ public class AppenderatorImpl implements Appenderator
return retVal;
}
+ private void throwPersistErrorIfExists()
+ {
+ if (persistError != null) {
+ throw new RE(persistError, "Error while persisting");
+ }
+ }
+
@Override
public AppenderatorAddResult add(
final SegmentIdentifier identifier,
@@ -206,6 +218,8 @@ public class AppenderatorImpl implements Appenderator
final boolean allowIncrementalPersists
) throws IndexSizeExceededException, SegmentNotWritableException
{
+ throwPersistErrorIfExists();
+
if (!identifier.getDataSource().equals(schema.getDataSource())) {
throw new IAE(
"Expected dataSource[%s] but was asked to insert row for
dataSource[%s]?!",
@@ -244,7 +258,23 @@ public class AppenderatorImpl implements Appenderator
|| rowsCurrentlyInMemory.get() >= tuningConfig.getMaxRowsInMemory()) {
if (allowIncrementalPersists) {
// persistAll clears rowsCurrentlyInMemory, no need to update it.
- persistAll(committerSupplier == null ? null : committerSupplier.get());
+ Futures.addCallback(
+ persistAll(committerSupplier == null ? null :
committerSupplier.get()),
+ new FutureCallback<Object>()
+ {
+ @Override
+ public void onSuccess(@Nullable Object result)
+ {
+ // do nothing
+ }
+
+ @Override
+ public void onFailure(Throwable t)
+ {
+ persistError = t;
+ }
+ }
+ );
} else {
isPersistRequired = true;
}
@@ -340,6 +370,8 @@ public class AppenderatorImpl implements Appenderator
// Drop commit metadata, then abandon all segments.
try {
+ throwPersistErrorIfExists();
+
if (persistExecutor != null) {
final ListenableFuture<?> uncommitFuture = persistExecutor.submit(
new Callable<Object>()
@@ -373,7 +405,7 @@ public class AppenderatorImpl implements Appenderator
}
}
catch (ExecutionException e) {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
@@ -391,6 +423,8 @@ public class AppenderatorImpl implements Appenderator
@Override
public ListenableFuture<Object> persistAll(@Nullable final Committer
committer)
{
+ throwPersistErrorIfExists();
+
final Map<String, Integer> currentHydrants = Maps.newHashMap();
final List<Pair<FireHydrant, SegmentIdentifier>> indexesToPersist =
Lists.newArrayList();
int numPersistedRows = 0;
@@ -427,7 +461,7 @@ public class AppenderatorImpl implements Appenderator
new ThreadRenamingCallable<Object>(threadName)
{
@Override
- public Object doCall()
+ public Object doCall() throws IOException
{
try {
for (Pair<FireHydrant, SegmentIdentifier> pair :
indexesToPersist) {
@@ -469,9 +503,9 @@ public class AppenderatorImpl implements Appenderator
// return null if committer is null
return commitMetadata;
}
- catch (Exception e) {
+ catch (IOException e) {
metrics.incrementFailedPersists();
- throw Throwables.propagate(e);
+ throw e;
}
finally {
metrics.incrementNumPersists();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]