This is an automated email from the ASF dual-hosted git repository.

jeagles pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 58a1463  TEZ-4124. GuavaShim: introduce an interoperability layer for 
different guava versions
58a1463 is described below

commit 58a146383068ec8e9760eb66bc00514435e7008e
Author: László Bodor <[email protected]>
AuthorDate: Wed Feb 12 09:45:15 2020 -0600

    TEZ-4124. GuavaShim: introduce an interoperability layer for different 
guava versions
    
    Signed-off-by: Jonathan Eagles <[email protected]>
    (cherry picked from commit 544290e5da84596de5554863737aaee3fffa4d19)
---
 .../main/java/org/apache/tez/common/GuavaShim.java | 54 ++++++++++++++++++++++
 .../dag/app/dag/RootInputInitializerManager.java   |  3 +-
 .../org/apache/tez/dag/app/dag/impl/DAGImpl.java   |  6 ++-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java    |  3 +-
 .../apache/tez/dag/app/dag/impl/VertexManager.java |  3 +-
 .../org/apache/tez/dag/app/MockDAGAppMaster.java   |  5 +-
 .../dag/app/rm/TestDagAwareYarnTaskScheduler.java  | 12 +++--
 .../tez/dag/app/TezTestServiceCommunicator.java    |  5 +-
 .../tez/service/impl/ContainerRunnerImpl.java      |  5 +-
 .../org/apache/tez/runtime/task/TaskReporter.java  |  3 +-
 .../common/shuffle/impl/ShuffleManager.java        |  5 +-
 .../common/shuffle/orderedgrouped/Shuffle.java     |  3 +-
 .../shuffle/orderedgrouped/ShuffleScheduler.java   |  3 +-
 .../writers/UnorderedPartitionedKVWriter.java      |  3 +-
 14 files changed, 93 insertions(+), 20 deletions(-)

diff --git a/tez-api/src/main/java/org/apache/tez/common/GuavaShim.java 
b/tez-api/src/main/java/org/apache/tez/common/GuavaShim.java
new file mode 100644
index 0000000..d9b8796
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/GuavaShim.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common;
+
+import com.google.common.util.concurrent.MoreExecutors;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.Executor;
+
+/**
+ * A interoperability layer to work with multiple versions of guava.
+ */
+public final class GuavaShim {
+
+  static {
+    try {
+      executorMethod = MoreExecutors.class.getDeclaredMethod("directExecutor");
+    } catch (NoSuchMethodException nsme) {
+      try {
+        executorMethod = 
MoreExecutors.class.getDeclaredMethod("sameThreadExecutor");
+      } catch (NoSuchMethodException nsmeSame) {
+      }
+    }
+  }
+
+  private GuavaShim() {
+  }
+
+  private static Method executorMethod;
+
+  public static Executor directExecutor() {
+    try {
+      return (Executor) executorMethod.invoke(null);
+    } catch (IllegalAccessException | IllegalArgumentException | 
InvocationTargetException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index b2a0c0b..7ff9fa9 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.GuavaShim;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.dag.api.InputDescriptor;
@@ -138,7 +139,7 @@ public class RootInputInitializerManager {
       initializerMap.put(input.getName(), initializerWrapper);
       ListenableFuture<List<Event>> future = executor
           .submit(new InputInitializerCallable(initializerWrapper, dagUgi, 
appContext));
-      Futures.addCallback(future, 
createInputInitializerCallback(initializerWrapper));
+      Futures.addCallback(future, 
createInputInitializerCallback(initializerWrapper), GuavaShim.directExecutor());
     }
   }
 
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 5dd4be6..a0d4db7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -42,6 +42,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.common.GuavaShim;
 import org.apache.tez.common.ProgressHelper;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.LimitExceededException;
@@ -1140,7 +1141,7 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
       }
       for (Map.Entry<OutputKey,CallableEvent> entry : commitEvents.entrySet()) 
{
         ListenableFuture<Void> commitFuture = 
appContext.getExecService().submit(entry.getValue());
-        Futures.addCallback(commitFuture, entry.getValue().getCallback());
+        Futures.addCallback(commitFuture, entry.getValue().getCallback(), 
GuavaShim.directExecutor());
         commitFutures.put(entry.getKey(), commitFuture);
       }
     }
@@ -2150,7 +2151,8 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
                 };
               };
               ListenableFuture<Void> groupCommitFuture = 
appContext.getExecService().submit(groupCommitCallableEvent);
-              Futures.addCallback(groupCommitFuture, 
groupCommitCallableEvent.getCallback());
+              Futures.addCallback(groupCommitFuture, 
groupCommitCallableEvent.getCallback(),
+                  GuavaShim.directExecutor());
               commitFutures.put(outputKey, groupCommitFuture);
             }
           }
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 4af4856..1504c98 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.common.ATSConstants;
+import org.apache.tez.common.GuavaShim;
 import org.apache.tez.common.ProgressHelper;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUtilsInternal;
@@ -2258,7 +2259,7 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
         };
         ListenableFuture<Void> commitFuture = 
             
vertex.getAppContext().getExecService().submit(commitCallableEvent);
-        Futures.addCallback(commitFuture, commitCallableEvent.getCallback());
+        Futures.addCallback(commitFuture, commitCallableEvent.getCallback(), 
GuavaShim.directExecutor());
         vertex.commitFutures.put(outputName, commitFuture);
       }
     }
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 03e03aa..2927425 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -78,6 +78,7 @@ import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 
 import com.google.common.base.Function;
+import org.apache.tez.common.GuavaShim;
 import org.apache.tez.common.Preconditions;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Lists;
@@ -488,7 +489,7 @@ public class VertexManager {
       VertexManagerEvent e = eventQueue.poll();
       if (e != null) {
         ListenableFuture<Void> future = execService.submit(e);
-        Futures.addCallback(future, e.getCallback());
+        Futures.addCallback(future, e.getCallback(), 
GuavaShim.directExecutor());
       } else {
         // This may happen. Lets say Callback succeeded on threadA. It set 
eventInFlight to false 
         // and called tryScheduleNextEvent() and found queue not empty but got 
paused before it 
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 638d084..9bceaec 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -57,6 +57,8 @@ import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.client.TezApiVersionInfo;
 import org.apache.tez.common.ContainerContext;
 import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.GuavaShim;
+import org.apache.tez.common.Preconditions;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -73,7 +75,6 @@ import org.apache.tez.runtime.api.impl.TaskStatistics;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 
-import org.apache.tez.common.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.FutureCallback;
@@ -323,7 +324,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
             Worker worker = workers.remove();
             worker.setContainerData(cData);
             ListenableFuture<Void> future = executorService.submit(worker);
-            Futures.addCallback(future, worker.getCallback());            
+            Futures.addCallback(future, worker.getCallback(), 
GuavaShim.directExecutor());
           } else {
             containers.remove(cData.cId);
           }
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java
 
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java
index 3808890..48dd938 100644
--- 
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java
+++ 
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.tez.common.MockDNSToSwitchMapping;
@@ -1560,6 +1561,12 @@ public class TestDagAwareYarnTaskScheduler {
     }
 
     @Override
+    public RegisterApplicationMasterResponse registerApplicationMaster(String 
appHostName, int appHostPort,
+        String appTrackingUrl) throws YarnException, IOException {
+      return client.registerApplicationMaster(appHostName, appHostPort, 
appTrackingUrl);
+    }
+
+    @Override
     protected void serviceStart() {
     }
 
@@ -1585,10 +1592,9 @@ public class TestDagAwareYarnTaskScheduler {
     protected void serviceStop() {
     }
 
-    @SuppressWarnings("unchecked")
     @Override
-    public RegisterApplicationMasterResponse registerApplicationMaster(
-        String appHostName, int appHostPort, String appTrackingUrl) {
+    public RegisterApplicationMasterResponse registerApplicationMaster(String 
appHostName, int appHostPort,
+        String appTrackingUrl) {
       mockRegResponse = mock(RegisterApplicationMasterResponse.class);
       Resource mockMaxResource = Resources.createResource(1024*1024, 1024);
       Map<ApplicationAccessType, String> mockAcls = Collections.emptyMap();
diff --git 
a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java
 
b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java
index ac50878..713a3d3 100644
--- 
a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java
+++ 
b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java
@@ -29,6 +29,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.Message;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.common.GuavaShim;
 import org.apache.tez.service.TezTestServiceProtocolBlockingPB;
 import org.apache.tez.service.impl.TezTestServiceProtocolClientImpl;
 import 
org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
@@ -69,7 +70,7 @@ public class TezTestServiceCommunicator extends 
AbstractService {
       public void onFailure(Throwable t) {
         callback.indicateError(t);
       }
-    });
+    }, GuavaShim.directExecutor());
 
   }
 
@@ -86,7 +87,7 @@ public class TezTestServiceCommunicator extends 
AbstractService {
       public void onFailure(Throwable t) {
         callback.indicateError(t);
       }
-    });
+    }, GuavaShim.directExecutor());
 
   }
 
diff --git 
a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
 
b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
index d440d1f..e7777e2 100644
--- 
a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
+++ 
b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.tez.common.GuavaShim;
 import org.apache.tez.common.Preconditions;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
@@ -210,7 +211,7 @@ public class ContainerRunnerImpl extends AbstractService 
implements ContainerRun
         workingDir, credentials, memoryPerExecutor);
     ListenableFuture<ContainerExecutionResult> future = executorService
         .submit(callable);
-    Futures.addCallback(future, new ContainerRunnerCallback(request, 
callable));
+    Futures.addCallback(future, new ContainerRunnerCallback(request, 
callable), GuavaShim.directExecutor());
   }
 
   /**
@@ -269,7 +270,7 @@ public class ContainerRunnerImpl extends AbstractService 
implements ContainerRun
         new ExecutionContextImpl(localAddress.get().getHostName()), env, 
localDirs,
         workingDir, credentials, memoryPerExecutor, sharedExecutor);
     ListenableFuture<ContainerExecutionResult> future = 
executorService.submit(callable);
-    Futures.addCallback(future, new TaskRunnerCallback(request, callable));
+    Futures.addCallback(future, new TaskRunnerCallback(request, callable), 
GuavaShim.directExecutor());
   }
 
 
diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index 809ce32..fb066fd 100644
--- 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.tez.common.GuavaShim;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TezException;
@@ -108,7 +109,7 @@ public class TaskReporter implements TaskReporterInterface {
     currentCallable = new HeartbeatCallable(task, umbilical, pollInterval, 
sendCounterInterval,
         maxEventsToGet, requestCounter, containerIdStr);
     ListenableFuture<Boolean> future = 
heartbeatExecutor.submit(currentCallable);
-    Futures.addCallback(future, new HeartbeatCallback(errorReporter));
+    Futures.addCallback(future, new HeartbeatCallback(errorReporter), 
GuavaShim.directExecutor());
   }
 
   /**
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index cde273f..5943d31 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.tez.common.CallableWithNdc;
+import org.apache.tez.common.GuavaShim;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.TaskCounter;
@@ -303,7 +304,7 @@ public class ShuffleManager implements FetcherCallback {
     Preconditions.checkState(inputManager != null, "InputManager must be 
configured");
 
     ListenableFuture<Void> runShuffleFuture = 
schedulerExecutor.submit(schedulerCallable);
-    Futures.addCallback(runShuffleFuture, new SchedulerFutureCallback());
+    Futures.addCallback(runShuffleFuture, new SchedulerFutureCallback(), 
GuavaShim.directExecutor());
     // Shutdown this executor once this task, and the callback complete.
     schedulerExecutor.shutdown();
   }
@@ -374,7 +375,7 @@ public class ShuffleManager implements FetcherCallback {
                 }
                 ListenableFuture<FetchResult> future = fetcherExecutor
                     .submit(fetcher);
-                Futures.addCallback(future, new FetchFutureCallback(fetcher));
+                Futures.addCallback(future, new FetchFutureCallback(fetcher), 
GuavaShim.directExecutor());
                 if (++count >= maxFetchersToRun) {
                   break;
                 }
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index 4b426b7..38f079a 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.CallableWithNdc;
+import org.apache.tez.common.GuavaShim;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.TaskCounter;
@@ -270,7 +271,7 @@ public class Shuffle implements ExceptionReporter {
   public void run() throws IOException {
     merger.configureAndStart();
     runShuffleFuture = executor.submit(runShuffleCallable);
-    Futures.addCallback(runShuffleFuture, new ShuffleRunnerFutureCallback());
+    Futures.addCallback(runShuffleFuture, new ShuffleRunnerFutureCallback(), 
GuavaShim.directExecutor());
     executor.shutdown();
   }
 
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index d388b5b..707f920 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.tez.common.GuavaShim;
 import org.apache.tez.common.Preconditions;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.ListMultimap;
@@ -1440,7 +1441,7 @@ class ShuffleScheduler {
                 FetcherOrderedGrouped fetcherOrderedGrouped = 
constructFetcherForHost(mapHost);
                 runningFetchers.add(fetcherOrderedGrouped);
                 ListenableFuture<Void> future = 
fetcherExecutor.submit(fetcherOrderedGrouped);
-                Futures.addCallback(future, new 
FetchFutureCallback(fetcherOrderedGrouped));
+                Futures.addCallback(future, new 
FetchFutureCallback(fetcherOrderedGrouped), GuavaShim.directExecutor());
               }
             }
           }
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 76a43f9..c979cc0 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.tez.common.CallableWithNdc;
+import org.apache.tez.common.GuavaShim;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.TaskCounter;
@@ -510,7 +511,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
           new ArrayList<WrappedBuffer>(filledBuffers), codec, 
spilledRecordsCounter,
           spillNumber));
       filledBuffers.clear();
-      Futures.addCallback(future, new SpillCallback(spillNumber));
+      Futures.addCallback(future, new SpillCallback(spillNumber), 
GuavaShim.directExecutor());
       // Update once per buffer (instead of every record)
       updateTezCountersAndNotify();
       return true;

Reply via email to