This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new b2a0511 TEZ-4253: Revert TEZ-4170 (Mustafa İman via Attila Magyar,
Ashutosh Chauhan)
b2a0511 is described below
commit b2a05115dcce28a2b188435b445b6d7e43103ab1
Author: Mustafa İman <[email protected]>
AuthorDate: Mon Dec 7 22:40:07 2020 +0100
TEZ-4253: Revert TEZ-4170 (Mustafa İman via Attila Magyar, Ashutosh Chauhan)
Signed-off-by: Laszlo Bodor <[email protected]>
---
.../dag/app/dag/RootInputInitializerManager.java | 274 +++++++++++----------
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 25 +-
.../app/dag/TestRootInputInitializerManager.java | 116 +--------
.../tez/dag/app/dag/impl/TestDAGRecovery.java | 6 +-
.../app/dag/impl/TestRootInputVertexManager.java | 5 +-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 99 +-------
6 files changed, 180 insertions(+), 345 deletions(-)
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index bd4bcd8..3c4a05e 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -18,33 +18,35 @@
package org.apache.tez.dag.app.dag;
-import static org.apache.tez.dag.app.dag.VertexState.FAILED;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Objects;
-import javax.annotation.Nullable;
+import org.apache.tez.common.Preconditions;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
-import org.apache.commons.lang.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.common.Preconditions;
+import org.apache.tez.common.GuavaShim;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.InputDescriptor;
@@ -52,38 +54,38 @@ import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.event.*;
import org.apache.tez.dag.api.event.VertexState;
-import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
-import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
import org.apache.tez.dag.app.dag.impl.TezRootInputInitializerContextImpl;
-import org.apache.tez.dag.app.dag.impl.VertexImpl;
+import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.InputInitializerContext;
-import org.apache.tez.runtime.api.events.InputInitializerEvent;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.impl.TezEvent;
public class RootInputInitializerManager {
private static final Logger LOG =
LoggerFactory.getLogger(RootInputInitializerManager.class);
- @VisibleForTesting
- protected ListeningExecutorService executor;
+ private final ExecutorService rawExecutor;
+ private final ListeningExecutorService executor;
@SuppressWarnings("rawtypes")
private final EventHandler eventHandler;
private volatile boolean isStopped = false;
@@ -94,135 +96,51 @@ public class RootInputInitializerManager {
private final AppContext appContext;
@VisibleForTesting
- final Map<String, InitializerWrapper> initializerMap = new
ConcurrentHashMap<>();
+ final Map<String, InitializerWrapper> initializerMap = new HashMap<String,
InitializerWrapper>();
public RootInputInitializerManager(Vertex vertex, AppContext appContext,
UserGroupInformation dagUgi,
StateChangeNotifier stateTracker) {
this.appContext = appContext;
this.vertex = vertex;
this.eventHandler = appContext.getEventHandler();
- this.executor = appContext.getExecService();
+ this.rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+ .setDaemon(true).setNameFormat("InputInitializer {" +
this.vertex.getName() + "} #%d").build());
+ this.executor = MoreExecutors.listeningDecorator(rawExecutor);
this.dagUgi = dagUgi;
this.entityStateTracker = stateTracker;
}
+ public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor,
InputInitializerDescriptor>>
+ inputs) throws TezException {
+ for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>
input : inputs) {
- public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor,
InputInitializerDescriptor>> inputs,
- List<TezEvent> pendingInitializerEvents) {
- List<InitializerWrapper> initWrappers = createInitializerWrappers(inputs);
- if (!initWrappers.isEmpty()) {
- executor.submit(() ->
createAndStartInitializing(pendingInitializerEvents, initWrappers));
- }
- }
-
- /**
- * Create input wrappers for all inputs in parallel.
- *
- * @param inputs
- * @return
- */
- protected List<InitializerWrapper> createInitializerWrappers(
- List<RootInputLeafOutput<InputDescriptor,
InputInitializerDescriptor>> inputs) {
- String current = null;
- final List<InitializerWrapper> result = Collections.synchronizedList(new
ArrayList<>());
- try {
- final List<Future<Void>> fResults = new ArrayList<>();
- for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>
each : inputs) {
- current = each.getName();
- fResults.add(executor.submit(() -> {
- InitializerWrapper initializer = createInitializerWrapper(each);
- initializerMap.put(each.getName(), initializer);
- registerPendingVertex(each, initializer);
- result.add(initializer);
- return null;
- }));
- }
- for(Future<Void> f : fResults) {
- f.get();
- }
- } catch (InterruptedException | ExecutionException t) {
- failVertex(t, current);
- }
- return result;
- }
-
- void failVertex(Throwable t, String inputName) {
- VertexImpl vertexImpl = (VertexImpl) vertex;
- String msg = "Fail to create InputInitializerManager, " +
ExceptionUtils.getStackTrace(t);
- LOG.info(msg);
- vertexImpl.finished(FAILED, VertexTerminationCause.INIT_FAILURE, msg);
- eventHandler.handle(new VertexEventRootInputFailed(vertex.getVertexId(),
inputName,
- new AMUserCodeException(AMUserCodeException.Source.InputInitializer,
t)));
- }
+ InputInitializerContext context =
+ new TezRootInputInitializerContextImpl(input, vertex, appContext,
this);
- /**
- * Start initializers in parallel.
- *
- * @param pendingInitializerEvents
- * @param result
- */
- protected void createAndStartInitializing(List<TezEvent>
pendingInitializerEvents, List<InitializerWrapper> result) {
- handleInitializerEvents(pendingInitializerEvents);
- pendingInitializerEvents.clear();
- for (InitializerWrapper inputWrapper : result) {
- executor.submit(() -> runInitializerAndProcessResult(inputWrapper));
- }
- }
-
- private void runInitializerAndProcessResult(InitializerWrapper initializer) {
- try {
- List<Event> result = runInitializer(initializer);
- LOG.info("Succeeded InputInitializer for Input: " +
initializer.getInput().getName() +
- " on vertex " + initializer.getVertexLogIdentifier());
- eventHandler.handle(new
VertexEventRootInputInitialized(vertex.getVertexId(),
- initializer.getInput().getName(), result));
- } catch (Throwable t) {
- if (t instanceof UndeclaredThrowableException) {
- t = t.getCause();
- }
- LOG.info("Failed InputInitializer for Input: " +
initializer.getInput().getName() +
- " on vertex " + initializer.getVertexLogIdentifier());
- eventHandler.handle(new
VertexEventRootInputFailed(vertex.getVertexId(),
initializer.getInput().getName(),
- new AMUserCodeException(Source.InputInitializer,t)));
- } finally {
- initializer.setComplete();
- }
- }
-
- private List<Event> runInitializer(InitializerWrapper initializer) throws
IOException, InterruptedException {
- return dagUgi.doAs((PrivilegedExceptionAction<List<Event>>) () -> {
- LOG.info(
- "Starting InputInitializer for Input: " +
initializer.getInput().getName() +
- " on vertex " + initializer.getVertexLogIdentifier());
+ InputInitializer initializer;
try {
- TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(),
- initializer.vertexId);
- return initializer.getInitializer().initialize();
+ TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(),
vertex.getVertexId());
+ initializer = createInitializer(input, context);
} finally {
appContext.getHadoopShim().clearHadoopCallerContext();
}
- });
- }
- private InitializerWrapper
createInitializerWrapper(RootInputLeafOutput<InputDescriptor,
InputInitializerDescriptor> input) throws TezException {
- InputInitializerContext context =
- new TezRootInputInitializerContextImpl(input, vertex, appContext,
this);
- try {
- TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(),
vertex.getVertexId());
- InputInitializer initializer = createInitializer(input, context);
- return new InitializerWrapper(input, initializer, context, vertex,
entityStateTracker, appContext);
- } finally {
- appContext.getHadoopShim().clearHadoopCallerContext();
- }
- }
+ InitializerWrapper initializerWrapper =
+ new InitializerWrapper(input, initializer, context, vertex,
entityStateTracker, appContext);
- private void registerPendingVertex(RootInputLeafOutput<InputDescriptor,
InputInitializerDescriptor> input, InitializerWrapper initializerWrapper) {
- // Register pending vertex update registrations
- List<VertexUpdateRegistrationHolder> vertexUpdateRegistrations =
pendingVertexRegistrations.removeAll(input.getName());
- if (vertexUpdateRegistrations != null) {
- for (VertexUpdateRegistrationHolder h : vertexUpdateRegistrations) {
- initializerWrapper.registerForVertexStateUpdates(h.vertexName,
h.stateSet);
+ // Register pending vertex update registrations
+ List<VertexUpdateRegistrationHolder> vertexUpdateRegistrations =
+ pendingVertexRegistrations.removeAll(input.getName());
+ if (vertexUpdateRegistrations != null) {
+ for (VertexUpdateRegistrationHolder h : vertexUpdateRegistrations) {
+ initializerWrapper.registerForVertexStateUpdates(h.vertexName,
h.stateSet);
+ }
}
+
+ initializerMap.put(input.getName(), initializerWrapper);
+ ListenableFuture<List<Event>> future = executor
+ .submit(new InputInitializerCallable(initializerWrapper, dagUgi,
appContext));
+ Futures.addCallback(future,
createInputInitializerCallback(initializerWrapper), GuavaShim.directExecutor());
}
}
@@ -316,13 +234,103 @@ public class RootInputInitializerManager {
}
@VisibleForTesting
+ protected InputInitializerCallback
createInputInitializerCallback(InitializerWrapper initializer) {
+ return new InputInitializerCallback(initializer, eventHandler,
vertex.getVertexId());
+ }
+
+ @VisibleForTesting
@InterfaceAudience.Private
public InitializerWrapper getInitializerWrapper(String inputName) {
return initializerMap.get(inputName);
}
public void shutdown() {
- isStopped = true;
+ if (executor != null && !isStopped) {
+ // Don't really care about what is running if an error occurs. If no
error
+ // occurs, all execution is complete.
+ executor.shutdownNow();
+ isStopped = true;
+ }
+ }
+
+ private static class InputInitializerCallable implements
+ Callable<List<Event>> {
+
+ private final InitializerWrapper initializerWrapper;
+ private final UserGroupInformation ugi;
+ private final AppContext appContext;
+
+ InputInitializerCallable(InitializerWrapper initializer,
UserGroupInformation ugi,
+ AppContext appContext) {
+ this.initializerWrapper = initializer;
+ this.ugi = ugi;
+ this.appContext = appContext;
+ }
+
+ @Override
+ public List<Event> call() throws Exception {
+ List<Event> events = ugi.doAs(new
PrivilegedExceptionAction<List<Event>>() {
+ @Override
+ public List<Event> run() throws Exception {
+ LOG.info(
+ "Starting InputInitializer for Input: " +
initializerWrapper.getInput().getName() +
+ " on vertex " + initializerWrapper.getVertexLogIdentifier());
+ try {
+ TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(),
+ initializerWrapper.vertexId);
+ return initializerWrapper.getInitializer().initialize();
+ } finally {
+ appContext.getHadoopShim().clearHadoopCallerContext();
+ }
+ }
+ });
+ return events;
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ @VisibleForTesting
+ private static class InputInitializerCallback implements
+ FutureCallback<List<Event>> {
+
+ private final InitializerWrapper initializer;
+ private final EventHandler eventHandler;
+ private final TezVertexID vertexID;
+
+ InputInitializerCallback(InitializerWrapper initializer,
+ EventHandler eventHandler, TezVertexID vertexID) {
+ this.initializer = initializer;
+ this.eventHandler = eventHandler;
+ this.vertexID = vertexID;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void onSuccess(List<Event> result) {
+ initializer.setComplete();
+ LOG.info(
+ "Succeeded InputInitializer for Input: " +
initializer.getInput().getName() +
+ " on vertex " + initializer.getVertexLogIdentifier());
+ eventHandler.handle(new VertexEventRootInputInitialized(vertexID,
+ initializer.getInput().getName(), result));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void onFailure(Throwable t) {
+ // catch real root cause of failure, it would throw
UndeclaredThrowableException
+ // if using UGI.doAs
+ if (t instanceof UndeclaredThrowableException) {
+ t = t.getCause();
+ }
+ initializer.setComplete();
+ LOG.info(
+ "Failed InputInitializer for Input: " +
initializer.getInput().getName() +
+ " on vertex " + initializer.getVertexLogIdentifier());
+ eventHandler
+ .handle(new VertexEventRootInputFailed(vertexID,
initializer.getInput().getName(),
+ new AMUserCodeException(Source.InputInitializer, t)));
+ }
}
@VisibleForTesting
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index f86dc98..6ae3ba5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -2416,7 +2416,7 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
}
}
- public VertexState finished(VertexState finalState,
+ VertexState finished(VertexState finalState,
VertexTerminationCause termCause, String diag) {
if (finishTime == 0) setFinishTime();
if (termCause != null) {
@@ -3073,7 +3073,13 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
if (vertex.inputsWithInitializers != null) {
if (vertex.recoveryData == null ||
!vertex.recoveryData.shouldSkipInit()) {
LOG.info("Vertex will initialize from input initializer. " +
vertex.logIdentifier);
- vertex.setupInputInitializerManager();
+ try {
+ vertex.setupInputInitializerManager();
+ } catch (TezException e) {
+ String msg = "Fail to create InputInitializerManager, " +
ExceptionUtils.getStackTrace(e);
+ LOG.info(msg);
+ return vertex.finished(VertexState.FAILED,
VertexTerminationCause.INIT_FAILURE, msg);
+ }
}
return VertexState.INITIALIZING;
} else {
@@ -3106,7 +3112,13 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
if (vertex.inputsWithInitializers != null &&
(vertex.recoveryData == null ||
!vertex.recoveryData.shouldSkipInit())) {
LOG.info("Vertex will initialize from input initializer. " +
vertex.logIdentifier);
- vertex.setupInputInitializerManager();
+ try {
+ vertex.setupInputInitializerManager();
+ } catch (TezException e) {
+ String msg = "Fail to create InputInitializerManager, " +
ExceptionUtils.getStackTrace(e);
+ LOG.error(msg);
+ return vertex.finished(VertexState.FAILED,
VertexTerminationCause.INIT_FAILURE, msg);
+ }
return VertexState.INITIALIZING;
}
if (!vertex.uninitializedEdges.isEmpty()) {
@@ -4243,7 +4255,7 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
}
}
- private void setupInputInitializerManager() {
+ private void setupInputInitializerManager() throws TezException {
rootInputInitializerManager = createRootInputInitializerManager(
getDAG().getName(), getName(), getVertexId(),
eventHandler, getTotalTasks(),
@@ -4258,7 +4270,10 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
LOG.info("Starting " + inputsWithInitializers.size() + " inputInitializers
for vertex " +
logIdentifier);
initWaitsForRootInitializers = true;
- rootInputInitializerManager.runInputInitializers(inputList,
pendingInitializerEvents);
+ rootInputInitializerManager.runInputInitializers(inputList);
+ // Send pending rootInputInitializerEvents
+
rootInputInitializerManager.handleInitializerEvents(pendingInitializerEvents);
+ pendingInitializerEvents.clear();
}
private static class VertexStateChangedCallback
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
index a197e54..ffa4309 100644
---
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
+++
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
@@ -14,9 +14,7 @@
package org.apache.tez.dag.app.dag;
-import static
org.apache.tez.dag.app.TestDAGAppMaster.DAGAppMasterForTest.createCredentials;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doReturn;
@@ -27,43 +25,20 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.io.File;
import java.io.IOException;
import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tez.client.TezApiVersionInfo;
-import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.oldrecords.TaskState;
-import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.DAGAppMaster;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -75,34 +50,11 @@ import org.apache.tez.runtime.api.InputInitializerContext;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TezEvent;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
public class TestRootInputInitializerManager {
- private static final File TEST_DIR = new
File(System.getProperty("test.build.data"),
- TestRootInputInitializerManager.class.getName()).getAbsoluteFile();
- private static ListeningExecutorService execService;
-
- @Before
- public void setUp() {
- ExecutorService rawExecutor = Executors.newCachedThreadPool(new
ThreadFactoryBuilder()
- .setDaemon(true).setNameFormat("Test App Shared Pool - " +
"#%d").build());
- execService = MoreExecutors.listeningDecorator(rawExecutor);
- FileUtil.fullyDelete(TEST_DIR);
- TEST_DIR.mkdirs();
- }
-
- @After
- public void tearDown() {
- if (execService != null) {
- execService.shutdownNow();
- }
- FileUtil.fullyDelete(TEST_DIR);
- }
-
// Simple testing. No events if task doesn't succeed.
// Also exercises path where two attempts are reported as successful via the
stateChangeNotifier.
// Primarily a failure scenario, when a Task moves back to running from
success
@@ -262,7 +214,6 @@ public class TestRootInputInitializerManager {
AppContext appContext = mock(AppContext.class);
doReturn(new DefaultHadoopShim()).when(appContext).getHadoopShim();
doReturn(mock(EventHandler.class)).when(appContext).getEventHandler();
- when(appContext.getExecService()).thenReturn(execService);
UserGroupInformation dagUgi =
UserGroupInformation.createRemoteUser("fakeuser");
StateChangeNotifier stateChangeNotifier = mock(StateChangeNotifier.class);
RootInputInitializerManager rootInputInitializerManager = new
RootInputInitializerManager(vertex, appContext, dagUgi, stateChangeNotifier);
@@ -271,73 +222,12 @@ public class TestRootInputInitializerManager {
InputInitializerDescriptor iid =
InputInitializerDescriptor.create(InputInitializerForUgiTest.class.getName());
RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> rootInput
=
new RootInputLeafOutput<>("InputName", id, iid);
-
rootInputInitializerManager.runInputInitializers(Collections.singletonList(rootInput),
Collections.emptyList());
+
rootInputInitializerManager.runInputInitializers(Collections.singletonList(rootInput));
InputInitializerForUgiTest.awaitInitialize();
assertEquals(dagUgi, InputInitializerForUgiTest.ctorUgi);
- assertEquals(dagUgi.getRealUser(),
InputInitializerForUgiTest.initializeUgi.getRealUser());
- }
-
- @Test (timeout = 10000)
- public synchronized void testParallelInputInitialization() throws
InterruptedException, IOException {
- // Create Local DAGAppMaster with default conf
- Configuration conf = new Configuration(true);
- conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
- conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString());
-
- FileSystem fs = FileSystem.getLocal(conf);
- FSDataOutputStream sessionJarsPBOutStream =
- TezCommonUtils.createFileForAM(fs, new Path(TEST_DIR.toString(),
- TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME));
- DAGProtos.PlanLocalResourcesProto.getDefaultInstance()
- .writeDelimitedTo(sessionJarsPBOutStream);
- sessionJarsPBOutStream.close();
-
- ApplicationId appId = ApplicationId.newInstance(1, 1);
- ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId,
1);
-
- DAGAppMaster am = new DAGAppMaster(attemptId,
- ContainerId.newContainerId(attemptId, 1),
- "127.0.0.1", 0, 0, new SystemClock(), 1, true,
- TEST_DIR.toString(), new String[] {TEST_DIR.toString()},
- new String[] {TEST_DIR.toString()},
- new TezApiVersionInfo().getVersion(), createCredentials(),
- "someuser", null);
- am.init(conf);
-
- Vertex vertex = mock(Vertex.class);
- doReturn(mock(TezVertexID.class)).when(vertex).getVertexId();
- UserGroupInformation dagUgi =
UserGroupInformation.createRemoteUser("fakeuser");
- StateChangeNotifier stateChangeNotifier = mock(StateChangeNotifier.class);
- RootInputInitializerManager rootInputInitializerManager =
- new RootInputInitializerManager(vertex, am.getContext(), dagUgi,
stateChangeNotifier);
-
- List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
inlist = new LinkedList();
- // Make sure we dont have any OOM issue by controlling the capacity of the
thread pool
- // and also block producer (createInitializerWrapper when resources are
saturated)
- InputDescriptor id = mock(InputDescriptor.class);
- InputInitializerDescriptor iid =
InputInitializerDescriptor.create(InputInitializerForUgiTest.class.getName());
- for (int i=0; i < 10000; i++) {
- RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>
rootInput =
- new RootInputLeafOutput<>("InputName"+i, id, iid);
- inlist.add(rootInput);
- }
-
- List<RootInputInitializerManager.InitializerWrapper> initWrappers =
- rootInputInitializerManager.createInitializerWrappers(inlist);
-
- int maxThreadSize =
conf.getInt(TezConfiguration.TEZ_AM_DAG_APPCONTEXT_THREAD_COUNT_LIMIT,
- TezConfiguration.TEZ_AM_DAG_APPCONTEXT_THREAD_COUNT_LIMIT_DEFAULT);
- ThreadPoolExecutor amThreadPool = am.getContext().getThreadPool();
-
- rootInputInitializerManager.executor.submit(() ->
rootInputInitializerManager
- .createAndStartInitializing(Collections.emptyList(), initWrappers));
-
- while (am.getContext().getThreadPool().getQueue().size() > 0) {
- assertTrue(amThreadPool.getPoolSize() <= maxThreadSize);
- Thread.sleep(100);
- }
+ assertEquals(dagUgi, InputInitializerForUgiTest.initializeUgi);
}
public static class InputInitializerForUgiTest extends InputInitializer {
@@ -379,4 +269,4 @@ public class TestRootInputInitializerManager {
}
}
}
-}
+}
\ No newline at end of file
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index 9636329..95ea8a0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -330,10 +330,8 @@ public class TestDAGRecovery {
Mockito.doAnswer(new Answer() {
public ListenableFuture<Void> answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
- if (args[0] instanceof CallableEvent) {
- CallableEvent e = (CallableEvent) args[0];
- dispatcher.getEventHandler().handle(e);
- }
+ CallableEvent e = (CallableEvent) args[0];
+ dispatcher.getEventHandler().handle(e);
return mockFuture;
}
}).when(execService).submit((Callable<Void>) any());
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java
index 16a97d4..39a291e 100644
---
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java
+++
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java
@@ -24,7 +24,7 @@ import static
org.apache.tez.dag.app.dag.impl.RootInputVertexManager.TEZ_ROOT_IN
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@@ -176,7 +176,7 @@ public class TestRootInputVertexManager {
final List<Integer> scheduledTasks = Lists.newLinkedList();
doAnswer(new ScheduledTasksAnswer(scheduledTasks)).when(
- mockContext).scheduleTasks(anyList());
+
mockContext).scheduleTasks(anyListOf(VertexManagerPluginContext.ScheduleTaskRequest.class));
// source vertices have 0 tasks. immediate start of all managed tasks
when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(0);
@@ -568,6 +568,7 @@ public class TestRootInputVertexManager {
public Object answer(InvocationOnMock invocation) throws IOException {
Object[] args = invocation.getArguments();
scheduledTasks.clear();
+ @SuppressWarnings("unchecked")
List<VertexManagerPluginContext.ScheduleTaskRequest> tasks =
(List<VertexManagerPluginContext.ScheduleTaskRequest>)args[0];
for (VertexManagerPluginContext.ScheduleTaskRequest task : tasks) {
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 5ae9556..fb7872f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -43,15 +43,11 @@ import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
import org.apache.commons.lang.StringUtils;
@@ -2425,10 +2421,10 @@ public class TestVertexImpl {
dagConf);
}
} else {
- v = new VertexImplWithRunningInputInitializerWithExecutor(vertexId,
vPlan, vPlan.getName(), conf,
- dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
- clock, thh, appContext, locationHint, dispatcher,
customInitializer, updateTracker,
- dagConf, vertexGroups);
+ v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf,
+ dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
+ clock, thh, true, appContext, locationHint, vertexGroups,
taskSpecificLaunchCmdOption,
+ updateTracker, dagConf);
}
vertices.put(vName, v);
vertexIdMap.put(vertexId, v);
@@ -2532,10 +2528,8 @@ public class TestVertexImpl {
Mockito.doAnswer(new Answer() {
public ListenableFuture<Void> answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
- if (args[0] instanceof CallableEvent) {
- CallableEvent e = (CallableEvent) args[0];
- dispatcher.getEventHandler().handle(e);
- }
+ CallableEvent e = (CallableEvent) args[0];
+ dispatcher.getEventHandler().handle(e);
return mockFuture;
}})
.when(execService).submit((Callable<Void>) any());
@@ -2766,13 +2760,12 @@ public class TestVertexImpl {
}
@Test(timeout=5000)
- public void testNonExistInputInitializer() throws Exception {
+ public void testNonExistInputInitializer() throws TezException {
setupPreDagCreation();
dagPlan = createDAGPlanWithNonExistInputInitializer();
setupPostDagCreation();
VertexImpl v1 = vertices.get("vertex1");
v1.handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
- while (v1.getTerminationCause() == null) Thread.sleep(10);
Assert.assertEquals(VertexState.FAILED, v1.getState());
Assert.assertEquals(VertexTerminationCause.INIT_FAILURE,
v1.getTerminationCause());
Assert.assertTrue(StringUtils.join(v1.getDiagnostics(),"")
@@ -5850,43 +5843,6 @@ public class TestVertexImpl {
}
@SuppressWarnings("rawtypes")
- private static class VertexImplWithRunningInputInitializerWithExecutor
extends VertexImpl {
- private RootInputInitializerManagerWithExecutor
rootInputInitializerManager;
-
- public VertexImplWithRunningInputInitializerWithExecutor(TezVertexID
vertexId,
- VertexPlan
vertexPlan, String vertexName,
- Configuration
conf,
- EventHandler
eventHandler,
-
TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
- Clock clock,
TaskHeartbeatHandler thh,
- AppContext
appContext,
-
VertexLocationHint vertexLocationHint,
- DrainDispatcher
dispatcher,
- InputInitializer
presetInitializer,
-
StateChangeNotifier updateTracker,
- Configuration
dagConf,
- Map<String,
VertexGroupInfo> vertexGroups) {
- super(vertexId, vertexPlan, vertexName, conf, eventHandler,
- taskCommunicatorManagerInterface, clock, thh, true,
- appContext, vertexLocationHint, vertexGroups,
taskSpecificLaunchCmdOption,
- updateTracker, dagConf);
- }
-
- @Override
- protected RootInputInitializerManager createRootInputInitializerManager(
- String dagName, String vertexName, TezVertexID vertexID,
- EventHandler eventHandler, int numTasks, int numNodes,
- Resource taskResource, Resource totalResource) {
- try {
- rootInputInitializerManager = new
RootInputInitializerManagerWithExecutor(this, this.getAppContext(),
stateChangeNotifier);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return rootInputInitializerManager;
- }
- }
-
- @SuppressWarnings("rawtypes")
private static class VertexImplWithControlledInitializerManager extends
VertexImpl {
private final DrainDispatcher dispatcher;
@@ -5942,11 +5898,9 @@ public class TestVertexImpl {
IOException {
super(vertex, appContext, UserGroupInformation.getCurrentUser(),
tracker);
this.presetInitializer = presetInitializer;
- ExecutorService rawExecutor = Executors.newCachedThreadPool(new
ThreadFactoryBuilder()
- .setDaemon(true).setNameFormat("Test App Shared Pool - " +
"#%d").build());
- this.executor = MoreExecutors.listeningDecorator(rawExecutor);
}
+
@Override
protected InputInitializer createInitializer(
RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input,
@@ -5956,31 +5910,6 @@ public class TestVertexImpl {
}
return presetInitializer;
}
-
- @Override
- public void shutdown() {
- super.shutdown();
- if (executor != null) {
- executor.shutdown();
- }
- }
- }
-
- private static class RootInputInitializerManagerWithExecutor extends
RootInputInitializerManager {
- public RootInputInitializerManagerWithExecutor(Vertex vertex, AppContext
appContext, StateChangeNotifier tracker) throws IOException {
- super(vertex, appContext, UserGroupInformation.getCurrentUser(),
tracker);
- ExecutorService rawExecutor = Executors.newCachedThreadPool(new
ThreadFactoryBuilder()
- .setDaemon(true).setNameFormat("Test App Shared Pool - " +
"#%d").build());
- this.executor = MoreExecutors.listeningDecorator(rawExecutor);
- }
-
- @Override
- public void shutdown() {
- super.shutdown();
- if (executor != null) {
- executor.shutdown();
- }
- }
}
@SuppressWarnings({"rawtypes", "unchecked"})
@@ -6002,15 +5931,12 @@ public class TestVertexImpl {
this.eventHandler = eventHandler;
this.dispatcher = dispatcher;
this.vertexID = vertex.getVertexId();
- ExecutorService rawExecutor = Executors.newCachedThreadPool(new
ThreadFactoryBuilder()
- .setDaemon(true).setNameFormat("Test App Shared Pool - " +
"#%d").build());
- this.executor = MoreExecutors.listeningDecorator(rawExecutor);
}
@Override
public void runInputInitializers(
- List<RootInputLeafOutput<InputDescriptor,
InputInitializerDescriptor>> inputs, List<TezEvent> pendingInitializerEvents) {
- this.inputs = inputs;
+ List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
inputList) {
+ this.inputs = inputList;
}
@Override
@@ -6035,13 +5961,10 @@ public class TestVertexImpl {
@Override
public void shutdown() {
hasShutDown = true;
- if (executor != null) {
- executor.shutdown();
- }
}
public void failInputInitialization() throws TezException {
- super.runInputInitializers(inputs, Collections.emptyList());
+ super.runInputInitializers(inputs);
eventHandler.handle(new VertexEventRootInputFailed(vertexID, inputs
.get(0).getName(),
new AMUserCodeException(Source.InputInitializer,