DRILL-1432: Propagate user credentials from client to the fragments
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/a44cb917 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/a44cb917 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/a44cb917 Branch: refs/heads/master Commit: a44cb9170c660cd9ee66750b0917d562942df1fb Parents: 1c40b5e Author: Mehant Baid <meha...@gmail.com> Authored: Wed Sep 17 16:50:42 2014 -0700 Committer: Steven Phillips <sphill...@maprtech.com> Committed: Mon Sep 29 18:21:44 2014 -0700 ---------------------------------------------------------------------- .../apache/drill/exec/client/DrillClient.java | 24 ++++++++++++++++++-- .../apache/drill/exec/ops/FragmentContext.java | 7 ++++++ .../planner/fragment/SimpleParallelizer.java | 9 +++++--- .../apache/drill/exec/rpc/user/UserClient.java | 6 +++-- .../apache/drill/exec/rpc/user/UserSession.java | 4 ++++ .../apache/drill/exec/work/foreman/Foreman.java | 2 +- .../drill/exec/pop/TestFragmentChecker.java | 5 +++- 7 files changed, 48 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a44cb917/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java index 74cc6a6..2c9d2fe 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -37,6 +37,7 @@ import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.TopLevelAllocator; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; +import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryType; import org.apache.drill.exec.proto.UserProtos; @@ -199,7 +200,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{ private void connect(DrillbitEndpoint endpoint) throws RpcException { FutureHandler f = new FutureHandler(); try { - client.connect(f, endpoint, props); + client.connect(f, endpoint, props, getUserCredentials()); f.checkedGet(); } catch (InterruptedException e) { throw new RpcException(e); @@ -247,6 +248,26 @@ public class DrillClient implements Closeable, ConnectionThrottle{ return listener.getResults(); } + + /* + * Helper method to generate the UserCredentials message from the properties. + */ + private UserBitShared.UserCredentials getUserCredentials() { + // If username is not propagated as one of the properties + String userName = "anonymous"; + + if (props != null) { + for (Property property: props.getPropertiesList()) { + if (property.getKey().equalsIgnoreCase("user")) { + userName = property.getValue(); + break; + } + } + } + + return UserBitShared.UserCredentials.newBuilder().setUserName(userName).build(); + } + public DrillRpcFuture<Ack> cancelQuery(QueryId id) { logger.debug("Cancelling query {}", QueryIdHelper.getQueryId(id)); return client.send(RpcType.CANCEL_QUERY, id, Ack.class); @@ -350,5 +371,4 @@ public class DrillClient implements Closeable, ConnectionThrottle{ } } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a44cb917/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index 04e1937..0564c1a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -40,6 +40,7 @@ import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.proto.UserBitShared.UserCredentials; import org.apache.drill.exec.rpc.control.ControlTunnel; import org.apache.drill.exec.rpc.data.DataTunnel; import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection; @@ -75,6 +76,7 @@ public class FragmentContext implements Closeable { private final long queryStartTime; private final int rootFragmentTimeZone; private final OptionManager fragmentOptions; + private final UserCredentials credentials; private LongObjectOpenHashMap<DrillBuf> managedBuffers = new LongObjectOpenHashMap<>(); private volatile Throwable failureCause; @@ -91,6 +93,7 @@ public class FragmentContext implements Closeable { this.funcRegistry = funcRegistry; this.queryStartTime = fragment.getQueryStartTime(); this.rootFragmentTimeZone = fragment.getTimeZone(); + this.credentials = fragment.getCredentials(); logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial()); logger.debug("Fragment max allocation: {}", fragment.getMemMax()); try { @@ -259,6 +262,10 @@ public class FragmentContext implements Closeable { return funcRegistry; } + public UserCredentials getCredentials() { + return credentials; + } + public QueryClassLoader getClassLoader() { return loader; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a44cb917/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java index bf4dae7..cd37c17 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java @@ -34,7 +34,9 @@ import org.apache.drill.exec.planner.fragment.Materializer.IndexedFragmentNode; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.rpc.user.UserSession; import org.apache.drill.exec.server.options.OptionList; import org.apache.drill.exec.work.QueryWorkUnit; @@ -89,13 +91,13 @@ public class SimpleParallelizer { * @throws ExecutionSetupException */ public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, Collection<DrillbitEndpoint> activeEndpoints, - PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet) throws ExecutionSetupException { + PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet, UserSession session) throws ExecutionSetupException { assignEndpoints(activeEndpoints, planningSet); - return generateWorkUnit(options, foremanNode, queryId, reader, rootNode, planningSet); + return generateWorkUnit(options, foremanNode, queryId, reader, rootNode, planningSet, session); } private QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, PhysicalPlanReader reader, Fragment rootNode, - PlanningSet planningSet) throws ExecutionSetupException { + PlanningSet planningSet, UserSession session) throws ExecutionSetupException { List<PlanFragment> fragments = Lists.newArrayList(); @@ -157,6 +159,7 @@ public class SimpleParallelizer { .setMemInitial(wrapper.getInitialAllocation())// .setMemMax(wrapper.getMaxAllocation()) .setOptionsJson(optionsData) + .setCredentials(session.getCredentials()) .build(); if (isRootNode) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a44cb917/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java index 4df6bfe..4e7fc92 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java @@ -23,6 +23,7 @@ import io.netty.channel.EventLoopGroup; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; +import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryResult; import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake; @@ -55,12 +56,13 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand send(queryResultHandler.getWrappedListener(resultsListener), RpcType.RUN_QUERY, query, QueryId.class); } - public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint endpoint, UserProperties props) + public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint endpoint, UserProperties props, UserBitShared.UserCredentials credentials) throws RpcException, InterruptedException { UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder() .setRpcVersion(UserRpcConfig.RPC_VERSION) .setSupportListening(true) - .setSupportComplexTypes(supportComplexTypes); + .setSupportComplexTypes(supportComplexTypes) + .setCredentials(credentials); if (props != null) { hsBuilder.setProperties(props); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a44cb917/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java index d196743..efb0cdf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java @@ -99,6 +99,10 @@ public class UserSession { return user; } + public UserCredentials getCredentials() { + return credentials; + } + /** * Update the schema path for the session. * @param fullPath The desired path to set to. http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a44cb917/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 1e5d8b8..0a34a22 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -373,7 +373,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ } QueryWorkUnit work = parallelizer.getFragments(context.getOptions().getOptionList(), context.getCurrentEndpoint(), - queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, planningSet); + queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, planningSet, initiatingClient.getSession()); this.context.getWorkBus().setFragmentStatusListener(work.getRootFragment().getHandle().getQueryId(), fragmentManager); List<PlanFragment> leafFragments = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a44cb917/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java index 58ddd06..6349b76 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java @@ -26,7 +26,9 @@ import org.apache.drill.exec.planner.fragment.SimpleParallelizer; import org.apache.drill.exec.planner.fragment.StatsCollector; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.rpc.user.UserSession; import org.apache.drill.exec.server.options.OptionList; import org.apache.drill.exec.work.QueryWorkUnit; import org.junit.Test; @@ -59,7 +61,8 @@ public class TestFragmentChecker extends PopUnitTestBase{ endpoints.add(b1); } - QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet); + QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet, + UserSession.Builder.newBuilder().withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build()).build()); System.out.println(String.format("=========ROOT FRAGMENT [%d:%d] =========", qwu.getRootFragment().getHandle().getMajorFragmentId(), qwu.getRootFragment().getHandle().getMinorFragmentId())); System.out.print(qwu.getRootFragment().getFragmentJson());