This is an automated email from the ASF dual-hosted git repository.
jeagles pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 58a1463 TEZ-4124. GuavaShim: introduce an interoperability layer for
different guava versions
58a1463 is described below
commit 58a146383068ec8e9760eb66bc00514435e7008e
Author: László Bodor <[email protected]>
AuthorDate: Wed Feb 12 09:45:15 2020 -0600
TEZ-4124. GuavaShim: introduce an interoperability layer for different
guava versions
Signed-off-by: Jonathan Eagles <[email protected]>
(cherry picked from commit 544290e5da84596de5554863737aaee3fffa4d19)
---
.../main/java/org/apache/tez/common/GuavaShim.java | 54 ++++++++++++++++++++++
.../dag/app/dag/RootInputInitializerManager.java | 3 +-
.../org/apache/tez/dag/app/dag/impl/DAGImpl.java | 6 ++-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 3 +-
.../apache/tez/dag/app/dag/impl/VertexManager.java | 3 +-
.../org/apache/tez/dag/app/MockDAGAppMaster.java | 5 +-
.../dag/app/rm/TestDagAwareYarnTaskScheduler.java | 12 +++--
.../tez/dag/app/TezTestServiceCommunicator.java | 5 +-
.../tez/service/impl/ContainerRunnerImpl.java | 5 +-
.../org/apache/tez/runtime/task/TaskReporter.java | 3 +-
.../common/shuffle/impl/ShuffleManager.java | 5 +-
.../common/shuffle/orderedgrouped/Shuffle.java | 3 +-
.../shuffle/orderedgrouped/ShuffleScheduler.java | 3 +-
.../writers/UnorderedPartitionedKVWriter.java | 3 +-
14 files changed, 93 insertions(+), 20 deletions(-)
diff --git a/tez-api/src/main/java/org/apache/tez/common/GuavaShim.java
b/tez-api/src/main/java/org/apache/tez/common/GuavaShim.java
new file mode 100644
index 0000000..d9b8796
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/GuavaShim.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common;
+
+import com.google.common.util.concurrent.MoreExecutors;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.Executor;
+
+/**
+ * A interoperability layer to work with multiple versions of guava.
+ */
+public final class GuavaShim {
+
+ static {
+ try {
+ executorMethod = MoreExecutors.class.getDeclaredMethod("directExecutor");
+ } catch (NoSuchMethodException nsme) {
+ try {
+ executorMethod =
MoreExecutors.class.getDeclaredMethod("sameThreadExecutor");
+ } catch (NoSuchMethodException nsmeSame) {
+ }
+ }
+ }
+
+ private GuavaShim() {
+ }
+
+ private static Method executorMethod;
+
+ public static Executor directExecutor() {
+ try {
+ return (Executor) executorMethod.invoke(null);
+ } catch (IllegalAccessException | IllegalArgumentException |
InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index b2a0c0b..7ff9fa9 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.GuavaShim;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.InputDescriptor;
@@ -138,7 +139,7 @@ public class RootInputInitializerManager {
initializerMap.put(input.getName(), initializerWrapper);
ListenableFuture<List<Event>> future = executor
.submit(new InputInitializerCallable(initializerWrapper, dagUgi,
appContext));
- Futures.addCallback(future,
createInputInitializerCallback(initializerWrapper));
+ Futures.addCallback(future,
createInputInitializerCallback(initializerWrapper), GuavaShim.directExecutor());
}
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 5dd4be6..a0d4db7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -42,6 +42,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.common.GuavaShim;
import org.apache.tez.common.ProgressHelper;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.LimitExceededException;
@@ -1140,7 +1141,7 @@ public class DAGImpl implements
org.apache.tez.dag.app.dag.DAG,
}
for (Map.Entry<OutputKey,CallableEvent> entry : commitEvents.entrySet())
{
ListenableFuture<Void> commitFuture =
appContext.getExecService().submit(entry.getValue());
- Futures.addCallback(commitFuture, entry.getValue().getCallback());
+ Futures.addCallback(commitFuture, entry.getValue().getCallback(),
GuavaShim.directExecutor());
commitFutures.put(entry.getKey(), commitFuture);
}
}
@@ -2150,7 +2151,8 @@ public class DAGImpl implements
org.apache.tez.dag.app.dag.DAG,
};
};
ListenableFuture<Void> groupCommitFuture =
appContext.getExecService().submit(groupCommitCallableEvent);
- Futures.addCallback(groupCommitFuture,
groupCommitCallableEvent.getCallback());
+ Futures.addCallback(groupCommitFuture,
groupCommitCallableEvent.getCallback(),
+ GuavaShim.directExecutor());
commitFutures.put(outputKey, groupCommitFuture);
}
}
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 4af4856..1504c98 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.common.ATSConstants;
+import org.apache.tez.common.GuavaShim;
import org.apache.tez.common.ProgressHelper;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtilsInternal;
@@ -2258,7 +2259,7 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
};
ListenableFuture<Void> commitFuture =
vertex.getAppContext().getExecService().submit(commitCallableEvent);
- Futures.addCallback(commitFuture, commitCallableEvent.getCallback());
+ Futures.addCallback(commitFuture, commitCallableEvent.getCallback(),
GuavaShim.directExecutor());
vertex.commitFutures.put(outputName, commitFuture);
}
}
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 03e03aa..2927425 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -78,6 +78,7 @@ import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
import com.google.common.base.Function;
+import org.apache.tez.common.GuavaShim;
import org.apache.tez.common.Preconditions;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
@@ -488,7 +489,7 @@ public class VertexManager {
VertexManagerEvent e = eventQueue.poll();
if (e != null) {
ListenableFuture<Void> future = execService.submit(e);
- Futures.addCallback(future, e.getCallback());
+ Futures.addCallback(future, e.getCallback(),
GuavaShim.directExecutor());
} else {
// This may happen. Lets say Callback succeeded on threadA. It set
eventInFlight to false
// and called tryScheduleNextEvent() and found queue not empty but got
paused before it
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 638d084..9bceaec 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -57,6 +57,8 @@ import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.client.TezApiVersionInfo;
import org.apache.tez.common.ContainerContext;
import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.GuavaShim;
+import org.apache.tez.common.Preconditions;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
@@ -73,7 +75,6 @@ import org.apache.tez.runtime.api.impl.TaskStatistics;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.common.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.FutureCallback;
@@ -323,7 +324,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
Worker worker = workers.remove();
worker.setContainerData(cData);
ListenableFuture<Void> future = executorService.submit(worker);
- Futures.addCallback(future, worker.getCallback());
+ Futures.addCallback(future, worker.getCallback(),
GuavaShim.directExecutor());
} else {
containers.remove(cData.cId);
}
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java
index 3808890..48dd938 100644
---
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java
+++
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.tez.common.MockDNSToSwitchMapping;
@@ -1560,6 +1561,12 @@ public class TestDagAwareYarnTaskScheduler {
}
@Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(String
appHostName, int appHostPort,
+ String appTrackingUrl) throws YarnException, IOException {
+ return client.registerApplicationMaster(appHostName, appHostPort,
appTrackingUrl);
+ }
+
+ @Override
protected void serviceStart() {
}
@@ -1585,10 +1592,9 @@ public class TestDagAwareYarnTaskScheduler {
protected void serviceStop() {
}
- @SuppressWarnings("unchecked")
@Override
- public RegisterApplicationMasterResponse registerApplicationMaster(
- String appHostName, int appHostPort, String appTrackingUrl) {
+ public RegisterApplicationMasterResponse registerApplicationMaster(String
appHostName, int appHostPort,
+ String appTrackingUrl) {
mockRegResponse = mock(RegisterApplicationMasterResponse.class);
Resource mockMaxResource = Resources.createResource(1024*1024, 1024);
Map<ApplicationAccessType, String> mockAcls = Collections.emptyMap();
diff --git
a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java
b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java
index ac50878..713a3d3 100644
---
a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java
+++
b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java
@@ -29,6 +29,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.Message;
import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.common.GuavaShim;
import org.apache.tez.service.TezTestServiceProtocolBlockingPB;
import org.apache.tez.service.impl.TezTestServiceProtocolClientImpl;
import
org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
@@ -69,7 +70,7 @@ public class TezTestServiceCommunicator extends
AbstractService {
public void onFailure(Throwable t) {
callback.indicateError(t);
}
- });
+ }, GuavaShim.directExecutor());
}
@@ -86,7 +87,7 @@ public class TezTestServiceCommunicator extends
AbstractService {
public void onFailure(Throwable t) {
callback.indicateError(t);
}
- });
+ }, GuavaShim.directExecutor());
}
diff --git
a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
index d440d1f..e7777e2 100644
---
a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
+++
b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.tez.common.GuavaShim;
import org.apache.tez.common.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
@@ -210,7 +211,7 @@ public class ContainerRunnerImpl extends AbstractService
implements ContainerRun
workingDir, credentials, memoryPerExecutor);
ListenableFuture<ContainerExecutionResult> future = executorService
.submit(callable);
- Futures.addCallback(future, new ContainerRunnerCallback(request,
callable));
+ Futures.addCallback(future, new ContainerRunnerCallback(request,
callable), GuavaShim.directExecutor());
}
/**
@@ -269,7 +270,7 @@ public class ContainerRunnerImpl extends AbstractService
implements ContainerRun
new ExecutionContextImpl(localAddress.get().getHostName()), env,
localDirs,
workingDir, credentials, memoryPerExecutor, sharedExecutor);
ListenableFuture<ContainerExecutionResult> future =
executorService.submit(callable);
- Futures.addCallback(future, new TaskRunnerCallback(request, callable));
+ Futures.addCallback(future, new TaskRunnerCallback(request, callable),
GuavaShim.directExecutor());
}
diff --git
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index 809ce32..fb066fd 100644
---
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.tez.common.GuavaShim;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezException;
@@ -108,7 +109,7 @@ public class TaskReporter implements TaskReporterInterface {
currentCallable = new HeartbeatCallable(task, umbilical, pollInterval,
sendCounterInterval,
maxEventsToGet, requestCounter, containerIdStr);
ListenableFuture<Boolean> future =
heartbeatExecutor.submit(currentCallable);
- Futures.addCallback(future, new HeartbeatCallback(errorReporter));
+ Futures.addCallback(future, new HeartbeatCallback(errorReporter),
GuavaShim.directExecutor());
}
/**
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index cde273f..5943d31 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.tez.common.CallableWithNdc;
+import org.apache.tez.common.GuavaShim;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
@@ -303,7 +304,7 @@ public class ShuffleManager implements FetcherCallback {
Preconditions.checkState(inputManager != null, "InputManager must be
configured");
ListenableFuture<Void> runShuffleFuture =
schedulerExecutor.submit(schedulerCallable);
- Futures.addCallback(runShuffleFuture, new SchedulerFutureCallback());
+ Futures.addCallback(runShuffleFuture, new SchedulerFutureCallback(),
GuavaShim.directExecutor());
// Shutdown this executor once this task, and the callback complete.
schedulerExecutor.shutdown();
}
@@ -374,7 +375,7 @@ public class ShuffleManager implements FetcherCallback {
}
ListenableFuture<FetchResult> future = fetcherExecutor
.submit(fetcher);
- Futures.addCallback(future, new FetchFutureCallback(fetcher));
+ Futures.addCallback(future, new FetchFutureCallback(fetcher),
GuavaShim.directExecutor());
if (++count >= maxFetchersToRun) {
break;
}
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index 4b426b7..38f079a 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.CallableWithNdc;
+import org.apache.tez.common.GuavaShim;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
@@ -270,7 +271,7 @@ public class Shuffle implements ExceptionReporter {
public void run() throws IOException {
merger.configureAndStart();
runShuffleFuture = executor.submit(runShuffleCallable);
- Futures.addCallback(runShuffleFuture, new ShuffleRunnerFutureCallback());
+ Futures.addCallback(runShuffleFuture, new ShuffleRunnerFutureCallback(),
GuavaShim.directExecutor());
executor.shutdown();
}
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index d388b5b..707f920 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.tez.common.GuavaShim;
import org.apache.tez.common.Preconditions;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
@@ -1440,7 +1441,7 @@ class ShuffleScheduler {
FetcherOrderedGrouped fetcherOrderedGrouped =
constructFetcherForHost(mapHost);
runningFetchers.add(fetcherOrderedGrouped);
ListenableFuture<Void> future =
fetcherExecutor.submit(fetcherOrderedGrouped);
- Futures.addCallback(future, new
FetchFutureCallback(fetcherOrderedGrouped));
+ Futures.addCallback(future, new
FetchFutureCallback(fetcherOrderedGrouped), GuavaShim.directExecutor());
}
}
}
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 76a43f9..c979cc0 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.tez.common.CallableWithNdc;
+import org.apache.tez.common.GuavaShim;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
@@ -510,7 +511,7 @@ public class UnorderedPartitionedKVWriter extends
BaseUnorderedPartitionedKVWrit
new ArrayList<WrappedBuffer>(filledBuffers), codec,
spilledRecordsCounter,
spillNumber));
filledBuffers.clear();
- Futures.addCallback(future, new SpillCallback(spillNumber));
+ Futures.addCallback(future, new SpillCallback(spillNumber),
GuavaShim.directExecutor());
// Update once per buffer (instead of every record)
updateTezCountersAndNotify();
return true;