This is an automated email from the ASF dual-hosted git repository.

xvrl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a934b26  remove ListenableFutures and revert to using the Guava 
implementation (#9944)
a934b26 is described below

commit a934b2664ca930dfa04c65bd47c225b4cb8dfed1
Author: Xavier Léauté <[email protected]>
AuthorDate: Wed Jun 3 10:46:03 2020 -0700

    remove ListenableFutures and revert to using the Guava implementation 
(#9944)
    
    This change removes ListenableFutures.transformAsync in favor of the
    existing Guava Futures.transform implementation. Our own implementation
    had a bug which did not fail the future if the applied function threw an
    exception, resulting in the future never completing.
    
    An attempt was made to fix this bug, however when running againts Guava's 
own
    tests, our version failed another half dozen tests, so it was decided to not
    continue down that path and scrap our own implementation.
    
    Explanation for how was this bug manifested itself:
    
    An exception thrown in BaseAppenderatorDriver.publishInBackground when
    invoked via transformAsync in StreamAppenderatorDriver.publish will
    cause the resulting future to never complete.
    
    This explains why when encountering 
https://github.com/apache/druid/issues/9845
    the task will never complete, forever waiting for the publishFuture to
    register the handoff. As a result, the corresponding "Error while
    publishing segments ..." message only gets logged once the index task
    times out and is forcefully shutdown when the future is force-cancelled
    by the executor.
---
 codestyle/druid-forbidden-apis.txt                 |  1 -
 .../util/common/concurrent/ListenableFutures.java  | 75 ----------------------
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |  7 +-
 .../indexing/kinesis/KinesisIndexTaskTest.java     |  7 +-
 .../task/AppenderatorDriverRealtimeIndexTask.java  |  7 +-
 .../appenderator/BatchAppenderatorDriver.java      |  7 +-
 .../appenderator/StreamAppenderatorDriver.java     | 10 +--
 7 files changed, 22 insertions(+), 92 deletions(-)

diff --git a/codestyle/druid-forbidden-apis.txt 
b/codestyle/druid-forbidden-apis.txt
index 0ae33f5..95c7d77 100644
--- a/codestyle/druid-forbidden-apis.txt
+++ b/codestyle/druid-forbidden-apis.txt
@@ -24,7 +24,6 @@ com.google.common.io.Files#createTempDir() @ Use 
org.apache.druid.java.util.comm
 com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor() @ Use 
org.apache.druid.java.util.common.concurrent.Execs#directExecutor()
 com.google.common.util.concurrent.MoreExecutors#newDirectExecutorService() @ 
Use org.apache.druid.java.util.common.concurrent.Execs#directExecutor()
 com.google.common.util.concurrent.MoreExecutors#directExecutor() @ Use 
org.apache.druid.java.util.common.concurrent.Execs#directExecutor()
-com.google.common.util.concurrent.Futures#transform(com.google.common.util.concurrent.ListenableFuture,
 com.google.common.util.concurrent.AsyncFunction) @ Use 
org.apache.druid.java.util.common.concurrent.ListenableFutures#transformAsync
 java.io.File#toURL() @ Use java.io.File#toURI() and java.net.URI#toURL() 
instead
 java.lang.String#matches(java.lang.String) @ Use startsWith(), endsWith(), 
contains(), or compile and cache a Pattern explicitly
 java.lang.String#replace(java.lang.CharSequence,java.lang.CharSequence) @ Use 
one of the appropriate methods in StringUtils instead
diff --git 
a/core/src/main/java/org/apache/druid/java/util/common/concurrent/ListenableFutures.java
 
b/core/src/main/java/org/apache/druid/java/util/common/concurrent/ListenableFutures.java
deleted file mode 100644
index 1722e4a..0000000
--- 
a/core/src/main/java/org/apache/druid/java/util/common/concurrent/ListenableFutures.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.java.util.common.concurrent;
-
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-
-import javax.annotation.Nullable;
-import java.util.function.Function;
-
-public class ListenableFutures
-{
-  /**
-   * Guava 19 changes the Futures.transform signature so that the async form 
is different. This is here as a
-   * compatability layer until such a time as druid only supports Guava 19 or 
later, in which case
-   * Futures.transformAsync should be used
-   *
-   * This is NOT copied from guava.
-   */
-  public static <I, O> ListenableFuture<O> transformAsync(
-      final ListenableFuture<I> inFuture,
-      final Function<I, ListenableFuture<O>> transform
-  )
-  {
-    final SettableFuture<O> finalFuture = SettableFuture.create();
-    Futures.addCallback(inFuture, new FutureCallback<I>()
-    {
-      @Override
-      public void onSuccess(@Nullable I result)
-      {
-        final ListenableFuture<O> transformFuture = transform.apply(result);
-        Futures.addCallback(transformFuture, new FutureCallback<O>()
-        {
-          @Override
-          public void onSuccess(@Nullable O result)
-          {
-            finalFuture.set(result);
-          }
-
-          @Override
-          public void onFailure(Throwable t)
-          {
-            finalFuture.setException(t);
-          }
-        });
-      }
-
-      @Override
-      public void onFailure(Throwable t)
-      {
-        finalFuture.setException(t);
-      }
-    });
-    return finalFuture;
-  }
-}
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index eaa2f99..46a6a04 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -27,6 +27,8 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.curator.test.TestingCluster;
@@ -74,7 +76,6 @@ import org.apache.druid.indexing.test.TestDataSegmentKiller;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.java.util.common.concurrent.ListenableFutures;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.core.NoopEmitter;
@@ -900,12 +901,12 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
     final ListenableFuture<TaskStatus> normalReplicaFuture = 
runTask(normalReplica);
     // Simulating one replica is slower than the other
-    final ListenableFuture<TaskStatus> staleReplicaFuture = 
ListenableFutures.transformAsync(
+    final ListenableFuture<TaskStatus> staleReplicaFuture = Futures.transform(
         taskExec.submit(() -> {
           Thread.sleep(1000);
           return staleReplica;
         }),
-        this::runTask
+        (AsyncFunction<Task, TaskStatus>) this::runTask
     );
 
     while (normalReplica.getRunner().getStatus() != Status.PAUSED) {
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 4daa2fd..142f81f 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -30,6 +30,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.name.Named;
@@ -78,7 +80,6 @@ import org.apache.druid.indexing.test.TestDataSegmentKiller;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.java.util.common.concurrent.ListenableFutures;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.core.NoopEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -2429,12 +2430,12 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     ((TestableKinesisIndexTask) 
staleReplica).setLocalSupplier(recordSupplier2);
     final ListenableFuture<TaskStatus> normalReplicaFuture = 
runTask(normalReplica);
     // Simulating one replica is slower than the other
-    final ListenableFuture<TaskStatus> staleReplicaFuture = 
ListenableFutures.transformAsync(
+    final ListenableFuture<TaskStatus> staleReplicaFuture = Futures.transform(
         taskExec.submit(() -> {
           Thread.sleep(1000);
           return staleReplica;
         }),
-        this::runTask
+        (AsyncFunction<Task, TaskStatus>) this::runTask
     );
 
     while (normalReplica.getRunner().getStatus() != 
SeekableStreamIndexTaskRunner.Status.PAUSED) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 1f9865d..c96b7f5 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -29,6 +29,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.commons.io.FileUtils;
@@ -65,7 +66,6 @@ import 
org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.concurrent.ListenableFutures;
 import org.apache.druid.java.util.common.guava.CloseQuietly;
 import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -682,7 +682,10 @@ public class AppenderatorDriverRealtimeIndexTask extends 
AbstractTask implements
         committerSupplier.get(),
         Collections.singletonList(sequenceName)
     );
-    pendingHandoffs.add(ListenableFutures.transformAsync(publishFuture, 
driver::registerHandoff));
+    pendingHandoffs.add(Futures.transform(
+        publishFuture,
+        (AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>) 
driver::registerHandoff
+    ));
   }
 
   private void waitForSegmentPublishAndHandoff(long timeout) throws 
InterruptedException, ExecutionException,
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java
index 7d1f3e2..7be1a0b 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java
@@ -22,10 +22,11 @@ package org.apache.druid.segment.realtime.appenderator;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.concurrent.ListenableFutures;
 import org.apache.druid.segment.loading.DataSegmentKiller;
 import 
org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
 import org.apache.druid.timeline.DataSegment;
@@ -138,9 +139,9 @@ public class BatchAppenderatorDriver extends 
BaseAppenderatorDriver
   {
     final Set<SegmentIdWithShardSpec> requestedSegmentIdsForSequences = 
getAppendingSegments(sequenceNames);
 
-    final ListenableFuture<SegmentsAndCommitMetadata> future = 
ListenableFutures.transformAsync(
+    final ListenableFuture<SegmentsAndCommitMetadata> future = 
Futures.transform(
         pushInBackground(null, requestedSegmentIdsForSequences, false),
-        this::dropInBackground
+        (AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>) 
this::dropInBackground
     );
 
     final SegmentsAndCommitMetadata segmentsAndCommitMetadata =
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
index 454ea96..0555f2f 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -32,7 +33,6 @@ import org.apache.druid.data.input.InputRow;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.java.util.common.concurrent.ListenableFutures;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.SegmentDescriptor;
@@ -273,11 +273,11 @@ public class StreamAppenderatorDriver extends 
BaseAppenderatorDriver
   {
     final List<SegmentIdWithShardSpec> theSegments = 
getSegmentIdsWithShardSpecs(sequenceNames);
 
-    final ListenableFuture<SegmentsAndCommitMetadata> publishFuture = 
ListenableFutures.transformAsync(
+    final ListenableFuture<SegmentsAndCommitMetadata> publishFuture = 
Futures.transform(
         // useUniquePath=true prevents inconsistencies in segment data when 
task failures or replicas leads to a second
         // version of a segment with the same identifier containing different 
data; see DataSegmentPusher.push() docs
         pushInBackground(wrapCommitter(committer), theSegments, true),
-        sam -> publishInBackground(
+        (AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>) 
sam -> publishInBackground(
             null,
             sam,
             publisher
@@ -386,9 +386,9 @@ public class StreamAppenderatorDriver extends 
BaseAppenderatorDriver
       final Collection<String> sequenceNames
   )
   {
-    return ListenableFutures.transformAsync(
+    return Futures.transform(
         publish(publisher, committer, sequenceNames),
-        this::registerHandoff
+        (AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>) 
this::registerHandoff
     );
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to