This is an automated email from the ASF dual-hosted git repository.
xvrl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a934b26 remove ListenableFutures and revert to using the Guava
implementation (#9944)
a934b26 is described below
commit a934b2664ca930dfa04c65bd47c225b4cb8dfed1
Author: Xavier Léauté <[email protected]>
AuthorDate: Wed Jun 3 10:46:03 2020 -0700
remove ListenableFutures and revert to using the Guava implementation
(#9944)
This change removes ListenableFutures.transformAsync in favor of the
existing Guava Futures.transform implementation. Our own implementation
had a bug which did not fail the future if the applied function threw an
exception, resulting in the future never completing.
An attempt was made to fix this bug, however when running againts Guava's
own
tests, our version failed another half dozen tests, so it was decided to not
continue down that path and scrap our own implementation.
Explanation for how was this bug manifested itself:
An exception thrown in BaseAppenderatorDriver.publishInBackground when
invoked via transformAsync in StreamAppenderatorDriver.publish will
cause the resulting future to never complete.
This explains why when encountering
https://github.com/apache/druid/issues/9845
the task will never complete, forever waiting for the publishFuture to
register the handoff. As a result, the corresponding "Error while
publishing segments ..." message only gets logged once the index task
times out and is forcefully shutdown when the future is force-cancelled
by the executor.
---
codestyle/druid-forbidden-apis.txt | 1 -
.../util/common/concurrent/ListenableFutures.java | 75 ----------------------
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 7 +-
.../indexing/kinesis/KinesisIndexTaskTest.java | 7 +-
.../task/AppenderatorDriverRealtimeIndexTask.java | 7 +-
.../appenderator/BatchAppenderatorDriver.java | 7 +-
.../appenderator/StreamAppenderatorDriver.java | 10 +--
7 files changed, 22 insertions(+), 92 deletions(-)
diff --git a/codestyle/druid-forbidden-apis.txt
b/codestyle/druid-forbidden-apis.txt
index 0ae33f5..95c7d77 100644
--- a/codestyle/druid-forbidden-apis.txt
+++ b/codestyle/druid-forbidden-apis.txt
@@ -24,7 +24,6 @@ com.google.common.io.Files#createTempDir() @ Use
org.apache.druid.java.util.comm
com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor() @ Use
org.apache.druid.java.util.common.concurrent.Execs#directExecutor()
com.google.common.util.concurrent.MoreExecutors#newDirectExecutorService() @
Use org.apache.druid.java.util.common.concurrent.Execs#directExecutor()
com.google.common.util.concurrent.MoreExecutors#directExecutor() @ Use
org.apache.druid.java.util.common.concurrent.Execs#directExecutor()
-com.google.common.util.concurrent.Futures#transform(com.google.common.util.concurrent.ListenableFuture,
com.google.common.util.concurrent.AsyncFunction) @ Use
org.apache.druid.java.util.common.concurrent.ListenableFutures#transformAsync
java.io.File#toURL() @ Use java.io.File#toURI() and java.net.URI#toURL()
instead
java.lang.String#matches(java.lang.String) @ Use startsWith(), endsWith(),
contains(), or compile and cache a Pattern explicitly
java.lang.String#replace(java.lang.CharSequence,java.lang.CharSequence) @ Use
one of the appropriate methods in StringUtils instead
diff --git
a/core/src/main/java/org/apache/druid/java/util/common/concurrent/ListenableFutures.java
b/core/src/main/java/org/apache/druid/java/util/common/concurrent/ListenableFutures.java
deleted file mode 100644
index 1722e4a..0000000
---
a/core/src/main/java/org/apache/druid/java/util/common/concurrent/ListenableFutures.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.java.util.common.concurrent;
-
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-
-import javax.annotation.Nullable;
-import java.util.function.Function;
-
-public class ListenableFutures
-{
- /**
- * Guava 19 changes the Futures.transform signature so that the async form
is different. This is here as a
- * compatability layer until such a time as druid only supports Guava 19 or
later, in which case
- * Futures.transformAsync should be used
- *
- * This is NOT copied from guava.
- */
- public static <I, O> ListenableFuture<O> transformAsync(
- final ListenableFuture<I> inFuture,
- final Function<I, ListenableFuture<O>> transform
- )
- {
- final SettableFuture<O> finalFuture = SettableFuture.create();
- Futures.addCallback(inFuture, new FutureCallback<I>()
- {
- @Override
- public void onSuccess(@Nullable I result)
- {
- final ListenableFuture<O> transformFuture = transform.apply(result);
- Futures.addCallback(transformFuture, new FutureCallback<O>()
- {
- @Override
- public void onSuccess(@Nullable O result)
- {
- finalFuture.set(result);
- }
-
- @Override
- public void onFailure(Throwable t)
- {
- finalFuture.setException(t);
- }
- });
- }
-
- @Override
- public void onFailure(Throwable t)
- {
- finalFuture.setException(t);
- }
- });
- return finalFuture;
- }
-}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index eaa2f99..46a6a04 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -27,6 +27,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.curator.test.TestingCluster;
@@ -74,7 +76,6 @@ import org.apache.druid.indexing.test.TestDataSegmentKiller;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.java.util.common.concurrent.ListenableFutures;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
@@ -900,12 +901,12 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
final ListenableFuture<TaskStatus> normalReplicaFuture =
runTask(normalReplica);
// Simulating one replica is slower than the other
- final ListenableFuture<TaskStatus> staleReplicaFuture =
ListenableFutures.transformAsync(
+ final ListenableFuture<TaskStatus> staleReplicaFuture = Futures.transform(
taskExec.submit(() -> {
Thread.sleep(1000);
return staleReplica;
}),
- this::runTask
+ (AsyncFunction<Task, TaskStatus>) this::runTask
);
while (normalReplica.getRunner().getStatus() != Status.PAUSED) {
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 4daa2fd..142f81f 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -30,6 +30,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.name.Named;
@@ -78,7 +80,6 @@ import org.apache.druid.indexing.test.TestDataSegmentKiller;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.java.util.common.concurrent.ListenableFutures;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -2429,12 +2430,12 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
((TestableKinesisIndexTask)
staleReplica).setLocalSupplier(recordSupplier2);
final ListenableFuture<TaskStatus> normalReplicaFuture =
runTask(normalReplica);
// Simulating one replica is slower than the other
- final ListenableFuture<TaskStatus> staleReplicaFuture =
ListenableFutures.transformAsync(
+ final ListenableFuture<TaskStatus> staleReplicaFuture = Futures.transform(
taskExec.submit(() -> {
Thread.sleep(1000);
return staleReplica;
}),
- this::runTask
+ (AsyncFunction<Task, TaskStatus>) this::runTask
);
while (normalReplica.getRunner().getStatus() !=
SeekableStreamIndexTaskRunner.Status.PAUSED) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 1f9865d..c96b7f5 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -29,6 +29,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.io.FileUtils;
@@ -65,7 +66,6 @@ import
org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.concurrent.ListenableFutures;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -682,7 +682,10 @@ public class AppenderatorDriverRealtimeIndexTask extends
AbstractTask implements
committerSupplier.get(),
Collections.singletonList(sequenceName)
);
- pendingHandoffs.add(ListenableFutures.transformAsync(publishFuture,
driver::registerHandoff));
+ pendingHandoffs.add(Futures.transform(
+ publishFuture,
+ (AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>)
driver::registerHandoff
+ ));
}
private void waitForSegmentPublishAndHandoff(long timeout) throws
InterruptedException, ExecutionException,
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java
index 7d1f3e2..7be1a0b 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java
@@ -22,10 +22,11 @@ package org.apache.druid.segment.realtime.appenderator;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.concurrent.ListenableFutures;
import org.apache.druid.segment.loading.DataSegmentKiller;
import
org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
import org.apache.druid.timeline.DataSegment;
@@ -138,9 +139,9 @@ public class BatchAppenderatorDriver extends
BaseAppenderatorDriver
{
final Set<SegmentIdWithShardSpec> requestedSegmentIdsForSequences =
getAppendingSegments(sequenceNames);
- final ListenableFuture<SegmentsAndCommitMetadata> future =
ListenableFutures.transformAsync(
+ final ListenableFuture<SegmentsAndCommitMetadata> future =
Futures.transform(
pushInBackground(null, requestedSegmentIdsForSequences, false),
- this::dropInBackground
+ (AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>)
this::dropInBackground
);
final SegmentsAndCommitMetadata segmentsAndCommitMetadata =
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
index 454ea96..0555f2f 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -32,7 +33,6 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.java.util.common.concurrent.ListenableFutures;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.SegmentDescriptor;
@@ -273,11 +273,11 @@ public class StreamAppenderatorDriver extends
BaseAppenderatorDriver
{
final List<SegmentIdWithShardSpec> theSegments =
getSegmentIdsWithShardSpecs(sequenceNames);
- final ListenableFuture<SegmentsAndCommitMetadata> publishFuture =
ListenableFutures.transformAsync(
+ final ListenableFuture<SegmentsAndCommitMetadata> publishFuture =
Futures.transform(
// useUniquePath=true prevents inconsistencies in segment data when
task failures or replicas leads to a second
// version of a segment with the same identifier containing different
data; see DataSegmentPusher.push() docs
pushInBackground(wrapCommitter(committer), theSegments, true),
- sam -> publishInBackground(
+ (AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>)
sam -> publishInBackground(
null,
sam,
publisher
@@ -386,9 +386,9 @@ public class StreamAppenderatorDriver extends
BaseAppenderatorDriver
final Collection<String> sequenceNames
)
{
- return ListenableFutures.transformAsync(
+ return Futures.transform(
publish(publisher, committer, sequenceNames),
- this::registerHandoff
+ (AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>)
this::registerHandoff
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]