Repository: tez Updated Branches: refs/heads/branch-0.7 d80e30d3a -> 150d494da
TEZ-3025. InputInitializer creation should use the dag ugi. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/150d494d Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/150d494d Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/150d494d Branch: refs/heads/branch-0.7 Commit: 150d494da3402bb20d936cf340f1bc2bb31a019f Parents: d80e30d Author: Siddharth Seth <[email protected]> Authored: Thu Jan 7 17:39:34 2016 -0800 Committer: Siddharth Seth <[email protected]> Committed: Thu Jan 7 17:39:34 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../app/dag/RootInputInitializerManager.java | 32 +++++++-- .../dag/TestRootInputInitializerManager.java | 69 ++++++++++++++++++++ 3 files changed, 95 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/150d494d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c2364a7..489580f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-2972. Avoid task rescheduling when a node turns unhealthy ALL CHANGES + TEZ-3025. InputInitializer creation should use the dag ugi. TEZ-2972. Avoid task rescheduling when a node turns unhealthy TEZ-3017. HistoryACLManager does not have a close method for cleanup TEZ-2914. Ability to limit vertex concurrency http://git-wip-us.apache.org/repos/asf/tez/blob/150d494d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java index 13128f8..6624455 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java @@ -20,6 +20,7 @@ package org.apache.tez.dag.app.dag; import javax.annotation.Nullable; +import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.security.PrivilegedExceptionAction; import java.util.Collection; @@ -106,7 +107,7 @@ public class RootInputInitializerManager { this.entityStateTracker = stateTracker; } - public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> + public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs) throws TezException { for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input : inputs) { @@ -133,12 +134,29 @@ public class RootInputInitializerManager { } @VisibleForTesting - protected InputInitializer createInitializer(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> - input, InputInitializerContext context) throws TezException { - InputInitializer initializer = ReflectionUtils - .createClazzInstance(input.getControllerDescriptor().getClassName(), - new Class[]{InputInitializerContext.class}, new Object[]{context}); - return initializer; + protected InputInitializer createInitializer(final RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> + input, final InputInitializerContext context) throws TezException { + try { + return dagUgi.doAs(new PrivilegedExceptionAction<InputInitializer>() { + @Override + public InputInitializer run() throws Exception { + InputInitializer initializer = ReflectionUtils + .createClazzInstance(input.getControllerDescriptor().getClassName(), + new Class[]{InputInitializerContext.class}, new Object[]{context}); + return initializer; + } + }); + } catch (IOException e) { + throw new TezException(e); + } catch (InterruptedException e) { + throw new TezException(e); + } catch (UndeclaredThrowableException e) { + if (e.getCause() instanceof TezException) { + throw (TezException) e.getCause(); + } else { + throw e; + } + } } public void handleInitializerEvents(List<TezEvent> events) { http://git-wip-us.apache.org/repos/asf/tez/blob/150d494d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java index 89eb2a6..0dee5dc 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java @@ -25,19 +25,25 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.IOException; +import java.util.Collections; import java.util.List; import com.google.common.collect.Lists; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.InputInitializerDescriptor; import org.apache.tez.dag.api.RootInputLeafOutput; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.InputInitializer; import org.apache.tez.runtime.api.InputInitializerContext; import org.apache.tez.runtime.api.events.InputInitializerEvent; @@ -198,4 +204,67 @@ public class TestRootInputInitializerManager { verify(initializer, never()).handleInputInitializerEvent(any(List.class)); } + + + @Test (timeout = 5000) + public void testCorrectUgiUsage() throws TezException, InterruptedException { + Vertex vertex = mock(Vertex.class); + doReturn(mock(TezVertexID.class)).when(vertex).getVertexId(); + AppContext appContext = mock(AppContext.class); + doReturn(mock(EventHandler.class)).when(appContext).getEventHandler(); + UserGroupInformation dagUgi = UserGroupInformation.createRemoteUser("fakeuser"); + StateChangeNotifier stateChangeNotifier = mock(StateChangeNotifier.class); + RootInputInitializerManager rootInputInitializerManager = new RootInputInitializerManager(vertex, appContext, dagUgi, stateChangeNotifier); + + InputDescriptor id = mock(InputDescriptor.class); + InputInitializerDescriptor iid = InputInitializerDescriptor.create(InputInitializerForUgiTest.class.getName()); + RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> rootInput = + new RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>("InputName", id, iid); + rootInputInitializerManager.runInputInitializers(Collections.singletonList(rootInput)); + + InputInitializerForUgiTest.awaitInitialize(); + + assertEquals(dagUgi, InputInitializerForUgiTest.ctorUgi); + assertEquals(dagUgi, InputInitializerForUgiTest.initializeUgi); + } + + public static class InputInitializerForUgiTest extends InputInitializer { + + static volatile UserGroupInformation ctorUgi; + static volatile UserGroupInformation initializeUgi; + + static boolean initialized = false; + static final Object initializeSync = new Object(); + + public InputInitializerForUgiTest(InputInitializerContext initializerContext) { + super(initializerContext); + try { + ctorUgi = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public List<Event> initialize() throws Exception { + initializeUgi = UserGroupInformation.getCurrentUser(); + synchronized (initializeSync) { + initialized = true; + initializeSync.notify(); + } + return null; + } + + @Override + public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception { + } + + static void awaitInitialize() throws InterruptedException { + synchronized (initializeSync) { + while (!initialized) { + initializeSync.wait(); + } + } + } + } }
