This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new b2a0511  TEZ-4253: Revert TEZ-4170 (Mustafa İman via Attila Magyar, 
Ashutosh Chauhan)
b2a0511 is described below

commit b2a05115dcce28a2b188435b445b6d7e43103ab1
Author: Mustafa İman <[email protected]>
AuthorDate: Mon Dec 7 22:40:07 2020 +0100

    TEZ-4253: Revert TEZ-4170 (Mustafa İman via Attila Magyar, Ashutosh Chauhan)
    
    Signed-off-by: Laszlo Bodor <[email protected]>
---
 .../dag/app/dag/RootInputInitializerManager.java   | 274 +++++++++++----------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java    |  25 +-
 .../app/dag/TestRootInputInitializerManager.java   | 116 +--------
 .../tez/dag/app/dag/impl/TestDAGRecovery.java      |   6 +-
 .../app/dag/impl/TestRootInputVertexManager.java   |   5 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java       |  99 +-------
 6 files changed, 180 insertions(+), 345 deletions(-)

diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index bd4bcd8..3c4a05e 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -18,33 +18,35 @@
 
 package org.apache.tez.dag.app.dag;
 
-import static org.apache.tez.dag.app.dag.VertexState.FAILED;
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Objects;
 
-import javax.annotation.Nullable;
+import org.apache.tez.common.Preconditions;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
 
-import org.apache.commons.lang.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.common.Preconditions;
+import org.apache.tez.common.GuavaShim;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.dag.api.InputDescriptor;
@@ -52,38 +54,38 @@ import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.RootInputLeafOutput;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.event.*;
 import org.apache.tez.dag.api.event.VertexState;
-import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
 import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
-import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
 import org.apache.tez.dag.app.dag.impl.TezRootInputInitializerContextImpl;
-import org.apache.tez.dag.app.dag.impl.VertexImpl;
+import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputInitializer;
 import org.apache.tez.runtime.api.InputInitializerContext;
-import org.apache.tez.runtime.api.events.InputInitializerEvent;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.impl.TezEvent;
 
 public class RootInputInitializerManager {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(RootInputInitializerManager.class);
 
-  @VisibleForTesting
-  protected ListeningExecutorService executor;
+  private final ExecutorService rawExecutor;
+  private final ListeningExecutorService executor;
   @SuppressWarnings("rawtypes")
   private final EventHandler eventHandler;
   private volatile boolean isStopped = false;
@@ -94,135 +96,51 @@ public class RootInputInitializerManager {
   private final AppContext appContext;
 
   @VisibleForTesting
-  final Map<String, InitializerWrapper> initializerMap = new 
ConcurrentHashMap<>();
+  final Map<String, InitializerWrapper> initializerMap = new HashMap<String, 
InitializerWrapper>();
 
   public RootInputInitializerManager(Vertex vertex, AppContext appContext,
                                      UserGroupInformation dagUgi, 
StateChangeNotifier stateTracker) {
     this.appContext = appContext;
     this.vertex = vertex;
     this.eventHandler = appContext.getEventHandler();
-    this.executor = appContext.getExecService();
+    this.rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+        .setDaemon(true).setNameFormat("InputInitializer {" + 
this.vertex.getName() + "} #%d").build());
+    this.executor = MoreExecutors.listeningDecorator(rawExecutor);
     this.dagUgi = dagUgi;
     this.entityStateTracker = stateTracker;
   }
 
+  public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor, 
InputInitializerDescriptor>>
+      inputs) throws TezException {
+    for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> 
input : inputs) {
 
-  public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor, 
InputInitializerDescriptor>> inputs,
-                                   List<TezEvent> pendingInitializerEvents) {
-    List<InitializerWrapper> initWrappers = createInitializerWrappers(inputs);
-    if (!initWrappers.isEmpty()) {
-      executor.submit(() -> 
createAndStartInitializing(pendingInitializerEvents, initWrappers));
-    }
-  }
-
-  /**
-   * Create input wrappers for all inputs in parallel.
-   *
-   * @param inputs
-   * @return
-   */
-  protected List<InitializerWrapper> createInitializerWrappers(
-          List<RootInputLeafOutput<InputDescriptor, 
InputInitializerDescriptor>> inputs) {
-    String current = null;
-    final List<InitializerWrapper> result = Collections.synchronizedList(new 
ArrayList<>());
-    try {
-      final List<Future<Void>> fResults = new ArrayList<>();
-      for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> 
each : inputs) {
-        current = each.getName();
-        fResults.add(executor.submit(() -> {
-          InitializerWrapper initializer = createInitializerWrapper(each);
-          initializerMap.put(each.getName(), initializer);
-          registerPendingVertex(each, initializer);
-          result.add(initializer);
-          return null;
-        }));
-      }
-      for(Future<Void> f : fResults) {
-        f.get();
-      }
-    } catch (InterruptedException | ExecutionException t) {
-      failVertex(t, current);
-    }
-    return result;
-  }
-
-  void failVertex(Throwable t, String inputName) {
-    VertexImpl vertexImpl = (VertexImpl) vertex;
-    String msg = "Fail to create InputInitializerManager, " + 
ExceptionUtils.getStackTrace(t);
-    LOG.info(msg);
-    vertexImpl.finished(FAILED, VertexTerminationCause.INIT_FAILURE, msg);
-    eventHandler.handle(new VertexEventRootInputFailed(vertex.getVertexId(), 
inputName,
-        new AMUserCodeException(AMUserCodeException.Source.InputInitializer, 
t)));
-  }
+      InputInitializerContext context =
+          new TezRootInputInitializerContextImpl(input, vertex, appContext, 
this);
 
-  /**
-   * Start initializers in parallel.
-   *
-   * @param pendingInitializerEvents
-   * @param result
-   */
-  protected void createAndStartInitializing(List<TezEvent> 
pendingInitializerEvents, List<InitializerWrapper> result) {
-    handleInitializerEvents(pendingInitializerEvents);
-    pendingInitializerEvents.clear();
-    for (InitializerWrapper inputWrapper : result) {
-      executor.submit(() -> runInitializerAndProcessResult(inputWrapper));
-    }
-  }
-
-  private void runInitializerAndProcessResult(InitializerWrapper initializer) {
-    try {
-      List<Event> result = runInitializer(initializer);
-      LOG.info("Succeeded InputInitializer for Input: " + 
initializer.getInput().getName() +
-                  " on vertex " + initializer.getVertexLogIdentifier());
-      eventHandler.handle(new 
VertexEventRootInputInitialized(vertex.getVertexId(),
-          initializer.getInput().getName(), result));
-    } catch (Throwable t) {
-        if (t instanceof UndeclaredThrowableException) {
-          t = t.getCause();
-        }
-        LOG.info("Failed InputInitializer for Input: " + 
initializer.getInput().getName() +
-                    " on vertex " + initializer.getVertexLogIdentifier());
-        eventHandler.handle(new 
VertexEventRootInputFailed(vertex.getVertexId(), 
initializer.getInput().getName(),
-                    new AMUserCodeException(Source.InputInitializer,t)));
-    } finally {
-      initializer.setComplete();
-    }
-  }
-
-  private List<Event> runInitializer(InitializerWrapper initializer) throws 
IOException, InterruptedException {
-    return dagUgi.doAs((PrivilegedExceptionAction<List<Event>>) () -> {
-      LOG.info(
-              "Starting InputInitializer for Input: " + 
initializer.getInput().getName() +
-                      " on vertex " + initializer.getVertexLogIdentifier());
+      InputInitializer initializer;
       try {
-        TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(),
-                initializer.vertexId);
-        return initializer.getInitializer().initialize();
+        TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(), 
vertex.getVertexId());
+        initializer = createInitializer(input, context);
       } finally {
         appContext.getHadoopShim().clearHadoopCallerContext();
       }
-    });
-  }
 
-  private InitializerWrapper 
createInitializerWrapper(RootInputLeafOutput<InputDescriptor, 
InputInitializerDescriptor> input) throws TezException {
-    InputInitializerContext context =
-            new TezRootInputInitializerContextImpl(input, vertex, appContext, 
this);
-    try {
-      TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(), 
vertex.getVertexId());
-      InputInitializer initializer = createInitializer(input, context);
-      return new InitializerWrapper(input, initializer, context, vertex, 
entityStateTracker, appContext);
-    } finally {
-      appContext.getHadoopShim().clearHadoopCallerContext();
-    }
-  }
+      InitializerWrapper initializerWrapper =
+          new InitializerWrapper(input, initializer, context, vertex, 
entityStateTracker, appContext);
 
-  private void registerPendingVertex(RootInputLeafOutput<InputDescriptor, 
InputInitializerDescriptor> input, InitializerWrapper initializerWrapper) {
-    // Register pending vertex update registrations
-    List<VertexUpdateRegistrationHolder> vertexUpdateRegistrations = 
pendingVertexRegistrations.removeAll(input.getName());
-    if (vertexUpdateRegistrations != null) {
-      for (VertexUpdateRegistrationHolder h : vertexUpdateRegistrations) {
-        initializerWrapper.registerForVertexStateUpdates(h.vertexName, 
h.stateSet);
+      // Register pending vertex update registrations
+      List<VertexUpdateRegistrationHolder> vertexUpdateRegistrations =
+          pendingVertexRegistrations.removeAll(input.getName());
+      if (vertexUpdateRegistrations != null) {
+        for (VertexUpdateRegistrationHolder h : vertexUpdateRegistrations) {
+          initializerWrapper.registerForVertexStateUpdates(h.vertexName, 
h.stateSet);
+        }
       }
+
+      initializerMap.put(input.getName(), initializerWrapper);
+      ListenableFuture<List<Event>> future = executor
+          .submit(new InputInitializerCallable(initializerWrapper, dagUgi, 
appContext));
+      Futures.addCallback(future, 
createInputInitializerCallback(initializerWrapper), GuavaShim.directExecutor());
     }
   }
 
@@ -316,13 +234,103 @@ public class RootInputInitializerManager {
   }
 
   @VisibleForTesting
+  protected InputInitializerCallback 
createInputInitializerCallback(InitializerWrapper initializer) {
+    return new InputInitializerCallback(initializer, eventHandler, 
vertex.getVertexId());
+  }
+
+  @VisibleForTesting
   @InterfaceAudience.Private
   public InitializerWrapper getInitializerWrapper(String inputName) {
     return initializerMap.get(inputName);
   }
 
   public void shutdown() {
-    isStopped = true;
+    if (executor != null && !isStopped) {
+      // Don't really care about what is running if an error occurs. If no 
error
+      // occurs, all execution is complete.
+      executor.shutdownNow();
+      isStopped = true;
+    }
+  }
+
+  private static class InputInitializerCallable implements
+      Callable<List<Event>> {
+
+    private final InitializerWrapper initializerWrapper;
+    private final UserGroupInformation ugi;
+    private final AppContext appContext;
+
+    InputInitializerCallable(InitializerWrapper initializer, 
UserGroupInformation ugi,
+                                    AppContext appContext) {
+      this.initializerWrapper = initializer;
+      this.ugi = ugi;
+      this.appContext = appContext;
+    }
+
+    @Override
+    public List<Event> call() throws Exception {
+      List<Event> events = ugi.doAs(new 
PrivilegedExceptionAction<List<Event>>() {
+        @Override
+        public List<Event> run() throws Exception {
+          LOG.info(
+              "Starting InputInitializer for Input: " + 
initializerWrapper.getInput().getName() +
+                  " on vertex " + initializerWrapper.getVertexLogIdentifier());
+          try {
+            TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(),
+                initializerWrapper.vertexId);
+            return initializerWrapper.getInitializer().initialize();
+          } finally {
+            appContext.getHadoopShim().clearHadoopCallerContext();
+          }
+        }
+      });
+      return events;
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  @VisibleForTesting
+  private static class InputInitializerCallback implements
+      FutureCallback<List<Event>> {
+
+    private final InitializerWrapper initializer;
+    private final EventHandler eventHandler;
+    private final TezVertexID vertexID;
+
+    InputInitializerCallback(InitializerWrapper initializer,
+        EventHandler eventHandler, TezVertexID vertexID) {
+      this.initializer = initializer;
+      this.eventHandler = eventHandler;
+      this.vertexID = vertexID;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void onSuccess(List<Event> result) {
+      initializer.setComplete();
+      LOG.info(
+          "Succeeded InputInitializer for Input: " + 
initializer.getInput().getName() +
+              " on vertex " + initializer.getVertexLogIdentifier());
+      eventHandler.handle(new VertexEventRootInputInitialized(vertexID,
+          initializer.getInput().getName(), result));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void onFailure(Throwable t) {
+      // catch real root cause of failure, it would throw 
UndeclaredThrowableException
+      // if using UGI.doAs
+      if (t instanceof UndeclaredThrowableException) {
+        t = t.getCause();
+      }
+      initializer.setComplete();
+      LOG.info(
+          "Failed InputInitializer for Input: " + 
initializer.getInput().getName() +
+              " on vertex " + initializer.getVertexLogIdentifier());
+      eventHandler
+          .handle(new VertexEventRootInputFailed(vertexID, 
initializer.getInput().getName(),
+              new AMUserCodeException(Source.InputInitializer, t)));
+    }
   }
 
   @VisibleForTesting
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index f86dc98..6ae3ba5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -2416,7 +2416,7 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
     }
   }
 
-  public VertexState finished(VertexState finalState,
+  VertexState finished(VertexState finalState,
       VertexTerminationCause termCause, String diag) {
     if (finishTime == 0) setFinishTime();
     if (termCause != null) {
@@ -3073,7 +3073,13 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
         if (vertex.inputsWithInitializers != null) {
           if (vertex.recoveryData == null || 
!vertex.recoveryData.shouldSkipInit()) {
             LOG.info("Vertex will initialize from input initializer. " + 
vertex.logIdentifier);
-            vertex.setupInputInitializerManager();
+            try {
+              vertex.setupInputInitializerManager();
+            } catch (TezException e) {
+              String msg = "Fail to create InputInitializerManager, " + 
ExceptionUtils.getStackTrace(e);
+              LOG.info(msg);
+              return vertex.finished(VertexState.FAILED, 
VertexTerminationCause.INIT_FAILURE, msg);
+            }
           }
           return VertexState.INITIALIZING;
         } else {
@@ -3106,7 +3112,13 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
         if (vertex.inputsWithInitializers != null &&
             (vertex.recoveryData == null || 
!vertex.recoveryData.shouldSkipInit())) {
           LOG.info("Vertex will initialize from input initializer. " + 
vertex.logIdentifier);
-          vertex.setupInputInitializerManager();
+          try {
+            vertex.setupInputInitializerManager();
+          } catch (TezException e) {
+            String msg = "Fail to create InputInitializerManager, " + 
ExceptionUtils.getStackTrace(e);
+            LOG.error(msg);
+            return vertex.finished(VertexState.FAILED, 
VertexTerminationCause.INIT_FAILURE, msg);
+          }
           return VertexState.INITIALIZING;
         }
         if (!vertex.uninitializedEdges.isEmpty()) {
@@ -4243,7 +4255,7 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
     }
   }
 
-  private void setupInputInitializerManager() {
+  private void setupInputInitializerManager() throws TezException {
     rootInputInitializerManager = createRootInputInitializerManager(
         getDAG().getName(), getName(), getVertexId(),
         eventHandler, getTotalTasks(),
@@ -4258,7 +4270,10 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
     LOG.info("Starting " + inputsWithInitializers.size() + " inputInitializers 
for vertex " +
         logIdentifier);
     initWaitsForRootInitializers = true;
-    rootInputInitializerManager.runInputInitializers(inputList, 
pendingInitializerEvents);
+    rootInputInitializerManager.runInputInitializers(inputList);
+    // Send pending rootInputInitializerEvents
+    
rootInputInitializerManager.handleInitializerEvents(pendingInitializerEvents);
+    pendingInitializerEvents.clear();
   }
 
   private static class VertexStateChangedCallback
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
index a197e54..ffa4309 100644
--- 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
+++ 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
@@ -14,9 +14,7 @@
 
 package org.apache.tez.dag.app.dag;
 
-import static 
org.apache.tez.dag.app.TestDAGAppMaster.DAGAppMasterForTest.createCredentials;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doReturn;
@@ -27,43 +25,20 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
 
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tez.client.TezApiVersionInfo;
-import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.RootInputLeafOutput;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.oldrecords.TaskState;
-import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.DAGAppMaster;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -75,34 +50,11 @@ import org.apache.tez.runtime.api.InputInitializerContext;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.TezEvent;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 public class TestRootInputInitializerManager {
 
-  private static final File TEST_DIR = new 
File(System.getProperty("test.build.data"),
-          TestRootInputInitializerManager.class.getName()).getAbsoluteFile();
-  private static ListeningExecutorService execService;
-
-  @Before
-  public void setUp() {
-    ExecutorService rawExecutor = Executors.newCachedThreadPool(new 
ThreadFactoryBuilder()
-            .setDaemon(true).setNameFormat("Test App Shared Pool - " + 
"#%d").build());
-    execService = MoreExecutors.listeningDecorator(rawExecutor);
-    FileUtil.fullyDelete(TEST_DIR);
-    TEST_DIR.mkdirs();
-  }
-
-  @After
-  public void tearDown() {
-    if (execService != null) {
-      execService.shutdownNow();
-    }
-    FileUtil.fullyDelete(TEST_DIR);
-  }
-
   // Simple testing. No events if task doesn't succeed.
   // Also exercises path where two attempts are reported as successful via the 
stateChangeNotifier.
   // Primarily a failure scenario, when a Task moves back to running from 
success
@@ -262,7 +214,6 @@ public class TestRootInputInitializerManager {
     AppContext appContext = mock(AppContext.class);
     doReturn(new DefaultHadoopShim()).when(appContext).getHadoopShim();
     doReturn(mock(EventHandler.class)).when(appContext).getEventHandler();
-    when(appContext.getExecService()).thenReturn(execService);
     UserGroupInformation dagUgi = 
UserGroupInformation.createRemoteUser("fakeuser");
     StateChangeNotifier stateChangeNotifier = mock(StateChangeNotifier.class);
     RootInputInitializerManager rootInputInitializerManager = new 
RootInputInitializerManager(vertex, appContext, dagUgi, stateChangeNotifier);
@@ -271,73 +222,12 @@ public class TestRootInputInitializerManager {
     InputInitializerDescriptor iid = 
InputInitializerDescriptor.create(InputInitializerForUgiTest.class.getName());
     RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> rootInput 
=
         new RootInputLeafOutput<>("InputName", id, iid);
-    
rootInputInitializerManager.runInputInitializers(Collections.singletonList(rootInput),
 Collections.emptyList());
+    
rootInputInitializerManager.runInputInitializers(Collections.singletonList(rootInput));
 
     InputInitializerForUgiTest.awaitInitialize();
 
     assertEquals(dagUgi, InputInitializerForUgiTest.ctorUgi);
-    assertEquals(dagUgi.getRealUser(), 
InputInitializerForUgiTest.initializeUgi.getRealUser());
-  }
-
-  @Test (timeout = 10000)
-  public synchronized void testParallelInputInitialization() throws 
InterruptedException, IOException {
-    // Create Local DAGAppMaster with default conf
-    Configuration conf = new Configuration(true);
-    conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-    conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString());
-
-    FileSystem fs = FileSystem.getLocal(conf);
-    FSDataOutputStream sessionJarsPBOutStream =
-            TezCommonUtils.createFileForAM(fs, new Path(TEST_DIR.toString(),
-                    TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME));
-    DAGProtos.PlanLocalResourcesProto.getDefaultInstance()
-            .writeDelimitedTo(sessionJarsPBOutStream);
-    sessionJarsPBOutStream.close();
-
-    ApplicationId appId = ApplicationId.newInstance(1, 1);
-    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 
1);
-
-    DAGAppMaster am = new DAGAppMaster(attemptId,
-            ContainerId.newContainerId(attemptId, 1),
-            "127.0.0.1", 0, 0, new SystemClock(), 1, true,
-            TEST_DIR.toString(), new String[] {TEST_DIR.toString()},
-            new String[] {TEST_DIR.toString()},
-            new TezApiVersionInfo().getVersion(), createCredentials(),
-            "someuser", null);
-    am.init(conf);
-
-    Vertex vertex = mock(Vertex.class);
-    doReturn(mock(TezVertexID.class)).when(vertex).getVertexId();
-    UserGroupInformation dagUgi = 
UserGroupInformation.createRemoteUser("fakeuser");
-    StateChangeNotifier stateChangeNotifier = mock(StateChangeNotifier.class);
-    RootInputInitializerManager rootInputInitializerManager =
-            new RootInputInitializerManager(vertex, am.getContext(), dagUgi, 
stateChangeNotifier);
-
-    List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> 
inlist = new LinkedList();
-    // Make sure we dont have any OOM issue by controlling the capacity of the 
thread pool
-    // and also block producer (createInitializerWrapper when resources are 
saturated)
-    InputDescriptor id = mock(InputDescriptor.class);
-    InputInitializerDescriptor iid = 
InputInitializerDescriptor.create(InputInitializerForUgiTest.class.getName());
-    for (int i=0; i < 10000; i++) {
-      RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> 
rootInput =
-              new RootInputLeafOutput<>("InputName"+i, id, iid);
-      inlist.add(rootInput);
-    }
-
-    List<RootInputInitializerManager.InitializerWrapper> initWrappers =
-            rootInputInitializerManager.createInitializerWrappers(inlist);
-
-    int maxThreadSize = 
conf.getInt(TezConfiguration.TEZ_AM_DAG_APPCONTEXT_THREAD_COUNT_LIMIT,
-            TezConfiguration.TEZ_AM_DAG_APPCONTEXT_THREAD_COUNT_LIMIT_DEFAULT);
-    ThreadPoolExecutor amThreadPool = am.getContext().getThreadPool();
-
-    rootInputInitializerManager.executor.submit(() -> 
rootInputInitializerManager
-        .createAndStartInitializing(Collections.emptyList(), initWrappers));
-
-    while (am.getContext().getThreadPool().getQueue().size() > 0) {
-      assertTrue(amThreadPool.getPoolSize() <= maxThreadSize);
-      Thread.sleep(100);
-    }
+    assertEquals(dagUgi, InputInitializerForUgiTest.initializeUgi);
   }
 
   public static class InputInitializerForUgiTest extends InputInitializer {
@@ -379,4 +269,4 @@ public class TestRootInputInitializerManager {
       }
     }
   }
-}
+}
\ No newline at end of file
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index 9636329..95ea8a0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -330,10 +330,8 @@ public class TestDAGRecovery {
     Mockito.doAnswer(new Answer() {
       public ListenableFuture<Void> answer(InvocationOnMock invocation) {
         Object[] args = invocation.getArguments();
-        if (args[0] instanceof CallableEvent) {
-          CallableEvent e = (CallableEvent) args[0];
-          dispatcher.getEventHandler().handle(e);
-        }
+        CallableEvent e = (CallableEvent) args[0];
+        dispatcher.getEventHandler().handle(e);
         return mockFuture;
       }
     }).when(execService).submit((Callable<Void>) any());
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java
 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java
index 16a97d4..39a291e 100644
--- 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java
+++ 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java
@@ -24,7 +24,7 @@ import static 
org.apache.tez.dag.app.dag.impl.RootInputVertexManager.TEZ_ROOT_IN
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -176,7 +176,7 @@ public class TestRootInputVertexManager {
 
     final List<Integer> scheduledTasks = Lists.newLinkedList();
     doAnswer(new ScheduledTasksAnswer(scheduledTasks)).when(
-        mockContext).scheduleTasks(anyList());
+        
mockContext).scheduleTasks(anyListOf(VertexManagerPluginContext.ScheduleTaskRequest.class));
 
     // source vertices have 0 tasks. immediate start of all managed tasks
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(0);
@@ -568,6 +568,7 @@ public class TestRootInputVertexManager {
     public Object answer(InvocationOnMock invocation) throws IOException {
       Object[] args = invocation.getArguments();
       scheduledTasks.clear();
+      @SuppressWarnings("unchecked")
       List<VertexManagerPluginContext.ScheduleTaskRequest> tasks =
           (List<VertexManagerPluginContext.ScheduleTaskRequest>)args[0];
       for (VertexManagerPluginContext.ScheduleTaskRequest task : tasks) {
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 5ae9556..fb7872f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -43,15 +43,11 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.ByteString;
 
 import org.apache.commons.lang.StringUtils;
@@ -2425,10 +2421,10 @@ public class TestVertexImpl {
               dagConf);
         }
       } else {
-        v = new VertexImplWithRunningInputInitializerWithExecutor(vertexId, 
vPlan, vPlan.getName(), conf,
-                dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
-                clock, thh, appContext, locationHint, dispatcher, 
customInitializer, updateTracker,
-                dagConf, vertexGroups);
+        v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf,
+            dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
+            clock, thh, true, appContext, locationHint, vertexGroups, 
taskSpecificLaunchCmdOption,
+            updateTracker, dagConf);
       }
       vertices.put(vName, v);
       vertexIdMap.put(vertexId, v);
@@ -2532,10 +2528,8 @@ public class TestVertexImpl {
     Mockito.doAnswer(new Answer() {
       public ListenableFuture<Void> answer(InvocationOnMock invocation) {
           Object[] args = invocation.getArguments();
-          if (args[0] instanceof CallableEvent) {
-            CallableEvent e = (CallableEvent) args[0];
-            dispatcher.getEventHandler().handle(e);
-          }
+          CallableEvent e = (CallableEvent) args[0];
+          dispatcher.getEventHandler().handle(e);
           return mockFuture;
       }})
     .when(execService).submit((Callable<Void>) any());
@@ -2766,13 +2760,12 @@ public class TestVertexImpl {
   }
 
   @Test(timeout=5000)
-  public void testNonExistInputInitializer() throws Exception {
+  public void testNonExistInputInitializer() throws TezException {
     setupPreDagCreation();
     dagPlan = createDAGPlanWithNonExistInputInitializer();
     setupPostDagCreation();
     VertexImpl v1 = vertices.get("vertex1");
     v1.handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
-    while (v1.getTerminationCause() == null) Thread.sleep(10);
     Assert.assertEquals(VertexState.FAILED, v1.getState());
     Assert.assertEquals(VertexTerminationCause.INIT_FAILURE, 
v1.getTerminationCause());
     Assert.assertTrue(StringUtils.join(v1.getDiagnostics(),"")
@@ -5850,43 +5843,6 @@ public class TestVertexImpl {
   }
 
   @SuppressWarnings("rawtypes")
-  private static class VertexImplWithRunningInputInitializerWithExecutor 
extends VertexImpl {
-    private RootInputInitializerManagerWithExecutor 
rootInputInitializerManager;
-
-    public VertexImplWithRunningInputInitializerWithExecutor(TezVertexID 
vertexId,
-                                                             VertexPlan 
vertexPlan, String vertexName,
-                                                             Configuration 
conf,
-                                                             EventHandler 
eventHandler,
-                                                             
TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
-                                                             Clock clock, 
TaskHeartbeatHandler thh,
-                                                             AppContext 
appContext,
-                                                             
VertexLocationHint vertexLocationHint,
-                                                             DrainDispatcher 
dispatcher,
-                                                             InputInitializer 
presetInitializer,
-                                                             
StateChangeNotifier updateTracker,
-                                                             Configuration 
dagConf,
-                                                             Map<String, 
VertexGroupInfo> vertexGroups) {
-      super(vertexId, vertexPlan, vertexName, conf, eventHandler,
-              taskCommunicatorManagerInterface, clock, thh, true,
-              appContext, vertexLocationHint, vertexGroups, 
taskSpecificLaunchCmdOption,
-              updateTracker, dagConf);
-    }
-
-    @Override
-    protected RootInputInitializerManager createRootInputInitializerManager(
-            String dagName, String vertexName, TezVertexID vertexID,
-            EventHandler eventHandler, int numTasks, int numNodes,
-            Resource taskResource, Resource totalResource) {
-      try {
-        rootInputInitializerManager = new 
RootInputInitializerManagerWithExecutor(this, this.getAppContext(), 
stateChangeNotifier);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-      return rootInputInitializerManager;
-    }
-  }
-
-  @SuppressWarnings("rawtypes")
   private static class VertexImplWithControlledInitializerManager extends 
VertexImpl {
     
     private final DrainDispatcher dispatcher;
@@ -5942,11 +5898,9 @@ public class TestVertexImpl {
         IOException {
       super(vertex, appContext, UserGroupInformation.getCurrentUser(), 
tracker);
       this.presetInitializer = presetInitializer;
-      ExecutorService rawExecutor = Executors.newCachedThreadPool(new 
ThreadFactoryBuilder()
-              .setDaemon(true).setNameFormat("Test App Shared Pool - " + 
"#%d").build());
-      this.executor = MoreExecutors.listeningDecorator(rawExecutor);
     }
 
+
     @Override
     protected InputInitializer createInitializer(
         RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input,
@@ -5956,31 +5910,6 @@ public class TestVertexImpl {
       }
       return presetInitializer;
     }
-
-    @Override
-    public void shutdown() {
-      super.shutdown();
-      if (executor != null) {
-        executor.shutdown();
-      }
-    }
-  }
-
-  private static class RootInputInitializerManagerWithExecutor extends 
RootInputInitializerManager {
-    public RootInputInitializerManagerWithExecutor(Vertex vertex, AppContext 
appContext, StateChangeNotifier tracker) throws IOException {
-      super(vertex, appContext, UserGroupInformation.getCurrentUser(), 
tracker);
-      ExecutorService rawExecutor = Executors.newCachedThreadPool(new 
ThreadFactoryBuilder()
-              .setDaemon(true).setNameFormat("Test App Shared Pool - " + 
"#%d").build());
-      this.executor = MoreExecutors.listeningDecorator(rawExecutor);
-    }
-
-    @Override
-    public void shutdown() {
-      super.shutdown();
-      if (executor != null) {
-        executor.shutdown();
-      }
-    }
   }
 
   @SuppressWarnings({"rawtypes", "unchecked"})
@@ -6002,15 +5931,12 @@ public class TestVertexImpl {
       this.eventHandler = eventHandler;
       this.dispatcher = dispatcher;
       this.vertexID = vertex.getVertexId();
-      ExecutorService rawExecutor = Executors.newCachedThreadPool(new 
ThreadFactoryBuilder()
-              .setDaemon(true).setNameFormat("Test App Shared Pool - " + 
"#%d").build());
-      this.executor = MoreExecutors.listeningDecorator(rawExecutor);
     }
 
     @Override
     public void runInputInitializers(
-            List<RootInputLeafOutput<InputDescriptor, 
InputInitializerDescriptor>> inputs, List<TezEvent> pendingInitializerEvents) {
-      this.inputs = inputs;
+        List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> 
inputList) {
+      this.inputs = inputList;
     }
 
     @Override
@@ -6035,13 +5961,10 @@ public class TestVertexImpl {
     @Override
     public void shutdown() {
       hasShutDown = true;
-      if (executor != null) {
-        executor.shutdown();
-      }
     }
 
     public void failInputInitialization() throws TezException {
-      super.runInputInitializers(inputs, Collections.emptyList());
+      super.runInputInitializers(inputs);
       eventHandler.handle(new VertexEventRootInputFailed(vertexID, inputs
           .get(0).getName(),
           new AMUserCodeException(Source.InputInitializer,

Reply via email to