TEZ-3025. InputInitializer creation should use the dag ugi. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d5c9649e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d5c9649e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d5c9649e Branch: refs/heads/TEZ-2980 Commit: d5c9649e5849021dd1bfdb796d821f7a1524aaf0 Parents: 9816a49 Author: Siddharth Seth <[email protected]> Authored: Thu Jan 7 17:38:40 2016 -0800 Committer: Siddharth Seth <[email protected]> Committed: Thu Jan 7 17:38:40 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../app/dag/RootInputInitializerManager.java | 32 +++++++-- .../dag/TestRootInputInitializerManager.java | 71 ++++++++++++++++++++ 3 files changed, 98 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/d5c9649e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ce35fd1..5e944b6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,7 @@ INCOMPATIBLE CHANGES TEZ-2972. Avoid task rescheduling when a node turns unhealthy ALL CHANGES: + TEZ-3025. InputInitializer creation should use the dag ugi. TEZ-3017. HistoryACLManager does not have a close method for cleanup TEZ-2914. Ability to limit vertex concurrency TEZ-3011. Link Vertex Name in Dag Tasks/Task Attempts to Vertex @@ -301,6 +302,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES + TEZ-3025. InputInitializer creation should use the dag ugi. TEZ-3017. HistoryACLManager does not have a close method for cleanup TEZ-2914. Ability to limit vertex concurrency TEZ-2918. Make progress notifications in IOs http://git-wip-us.apache.org/repos/asf/tez/blob/d5c9649e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java index 57a7172..e03b469 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java @@ -20,6 +20,7 @@ package org.apache.tez.dag.app.dag; import javax.annotation.Nullable; +import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.security.PrivilegedExceptionAction; import java.util.Collection; @@ -107,7 +108,7 @@ public class RootInputInitializerManager { this.entityStateTracker = stateTracker; } - public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> + public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs) throws TezException { for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input : inputs) { @@ -141,12 +142,29 @@ public class RootInputInitializerManager { } @VisibleForTesting - protected InputInitializer createInitializer(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> - input, InputInitializerContext context) throws TezException { - InputInitializer initializer = ReflectionUtils - .createClazzInstance(input.getControllerDescriptor().getClassName(), - new Class[]{InputInitializerContext.class}, new Object[]{context}); - return initializer; + protected InputInitializer createInitializer(final RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> + input, final InputInitializerContext context) throws TezException { + try { + return dagUgi.doAs(new PrivilegedExceptionAction<InputInitializer>() { + @Override + public InputInitializer run() throws Exception { + InputInitializer initializer = ReflectionUtils + .createClazzInstance(input.getControllerDescriptor().getClassName(), + new Class[]{InputInitializerContext.class}, new Object[]{context}); + return initializer; + } + }); + } catch (IOException e) { + throw new TezException(e); + } catch (InterruptedException e) { + throw new TezException(e); + } catch (UndeclaredThrowableException e) { + if (e.getCause() instanceof TezException) { + throw (TezException) e.getCause(); + } else { + throw e; + } + } } public void handleInitializerEvents(List<TezEvent> events) { http://git-wip-us.apache.org/repos/asf/tez/blob/d5c9649e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java index 89eb2a6..b79b4af 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java @@ -25,19 +25,26 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.IOException; +import java.util.Collections; import java.util.List; import com.google.common.collect.Lists; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.InputInitializerDescriptor; import org.apache.tez.dag.api.RootInputLeafOutput; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.hadoop.shim.DefaultHadoopShim; +import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.InputInitializer; import org.apache.tez.runtime.api.InputInitializerContext; import org.apache.tez.runtime.api.events.InputInitializerEvent; @@ -198,4 +205,68 @@ public class TestRootInputInitializerManager { verify(initializer, never()).handleInputInitializerEvent(any(List.class)); } + + + @Test (timeout = 5000) + public void testCorrectUgiUsage() throws TezException, InterruptedException { + Vertex vertex = mock(Vertex.class); + doReturn(mock(TezVertexID.class)).when(vertex).getVertexId(); + AppContext appContext = mock(AppContext.class); + doReturn(new DefaultHadoopShim()).when(appContext).getHadoopShim(); + doReturn(mock(EventHandler.class)).when(appContext).getEventHandler(); + UserGroupInformation dagUgi = UserGroupInformation.createRemoteUser("fakeuser"); + StateChangeNotifier stateChangeNotifier = mock(StateChangeNotifier.class); + RootInputInitializerManager rootInputInitializerManager = new RootInputInitializerManager(vertex, appContext, dagUgi, stateChangeNotifier); + + InputDescriptor id = mock(InputDescriptor.class); + InputInitializerDescriptor iid = InputInitializerDescriptor.create(InputInitializerForUgiTest.class.getName()); + RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> rootInput = + new RootInputLeafOutput<>("InputName", id, iid); + rootInputInitializerManager.runInputInitializers(Collections.singletonList(rootInput)); + + InputInitializerForUgiTest.awaitInitialize(); + + assertEquals(dagUgi, InputInitializerForUgiTest.ctorUgi); + assertEquals(dagUgi, InputInitializerForUgiTest.initializeUgi); + } + + public static class InputInitializerForUgiTest extends InputInitializer { + + static volatile UserGroupInformation ctorUgi; + static volatile UserGroupInformation initializeUgi; + + static boolean initialized = false; + static final Object initializeSync = new Object(); + + public InputInitializerForUgiTest(InputInitializerContext initializerContext) { + super(initializerContext); + try { + ctorUgi = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public List<Event> initialize() throws Exception { + initializeUgi = UserGroupInformation.getCurrentUser(); + synchronized (initializeSync) { + initialized = true; + initializeSync.notify(); + } + return null; + } + + @Override + public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception { + } + + static void awaitInitialize() throws InterruptedException { + synchronized (initializeSync) { + while (!initialized) { + initializeSync.wait(); + } + } + } + } }
