DRILL-1432: Propagate user credentials from client to the fragments

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/a44cb917
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/a44cb917
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/a44cb917

Branch: refs/heads/master
Commit: a44cb9170c660cd9ee66750b0917d562942df1fb
Parents: 1c40b5e
Author: Mehant Baid <meha...@gmail.com>
Authored: Wed Sep 17 16:50:42 2014 -0700
Committer: Steven Phillips <sphill...@maprtech.com>
Committed: Mon Sep 29 18:21:44 2014 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/client/DrillClient.java   | 24 ++++++++++++++++++--
 .../apache/drill/exec/ops/FragmentContext.java  |  7 ++++++
 .../planner/fragment/SimpleParallelizer.java    |  9 +++++---
 .../apache/drill/exec/rpc/user/UserClient.java  |  6 +++--
 .../apache/drill/exec/rpc/user/UserSession.java |  4 ++++
 .../apache/drill/exec/work/foreman/Foreman.java |  2 +-
 .../drill/exec/pop/TestFragmentChecker.java     |  5 +++-
 7 files changed, 48 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a44cb917/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 74cc6a6..2c9d2fe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.proto.UserProtos;
@@ -199,7 +200,7 @@ public class DrillClient implements Closeable, 
ConnectionThrottle{
   private void connect(DrillbitEndpoint endpoint) throws RpcException {
     FutureHandler f = new FutureHandler();
     try {
-      client.connect(f, endpoint, props);
+      client.connect(f, endpoint, props, getUserCredentials());
       f.checkedGet();
     } catch (InterruptedException e) {
       throw new RpcException(e);
@@ -247,6 +248,26 @@ public class DrillClient implements Closeable, 
ConnectionThrottle{
     return listener.getResults();
   }
 
+
+  /*
+   * Helper method to generate the UserCredentials message from the properties.
+   */
+  private UserBitShared.UserCredentials getUserCredentials() {
+    // If username is not propagated as one of the properties
+    String userName = "anonymous";
+
+    if (props != null) {
+      for (Property property: props.getPropertiesList()) {
+        if (property.getKey().equalsIgnoreCase("user")) {
+          userName = property.getValue();
+          break;
+        }
+      }
+    }
+
+    return 
UserBitShared.UserCredentials.newBuilder().setUserName(userName).build();
+  }
+
   public DrillRpcFuture<Ack> cancelQuery(QueryId id) {
     logger.debug("Cancelling query {}", QueryIdHelper.getQueryId(id));
     return client.send(RpcType.CANCEL_QUERY, id, Ack.class);
@@ -350,5 +371,4 @@ public class DrillClient implements Closeable, 
ConnectionThrottle{
     }
 
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a44cb917/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 04e1937..0564c1a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -40,6 +40,7 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.rpc.control.ControlTunnel;
 import org.apache.drill.exec.rpc.data.DataTunnel;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
@@ -75,6 +76,7 @@ public class FragmentContext implements Closeable {
   private final long queryStartTime;
   private final int rootFragmentTimeZone;
   private final OptionManager fragmentOptions;
+  private final UserCredentials credentials;
   private LongObjectOpenHashMap<DrillBuf> managedBuffers = new 
LongObjectOpenHashMap<>();
 
   private volatile Throwable failureCause;
@@ -91,6 +93,7 @@ public class FragmentContext implements Closeable {
     this.funcRegistry = funcRegistry;
     this.queryStartTime = fragment.getQueryStartTime();
     this.rootFragmentTimeZone = fragment.getTimeZone();
+    this.credentials = fragment.getCredentials();
     logger.debug("Getting initial memory allocation of {}", 
fragment.getMemInitial());
     logger.debug("Fragment max allocation: {}", fragment.getMemMax());
     try {
@@ -259,6 +262,10 @@ public class FragmentContext implements Closeable {
     return funcRegistry;
   }
 
+  public UserCredentials getCredentials() {
+    return credentials;
+  }
+
   public QueryClassLoader getClassLoader() {
     return loader;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a44cb917/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index bf4dae7..cd37c17 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -34,7 +34,9 @@ import 
org.apache.drill.exec.planner.fragment.Materializer.IndexedFragmentNode;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.options.OptionList;
 import org.apache.drill.exec.work.QueryWorkUnit;
 
@@ -89,13 +91,13 @@ public class SimpleParallelizer {
    * @throws ExecutionSetupException
    */
   public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint 
foremanNode, QueryId queryId, Collection<DrillbitEndpoint> activeEndpoints,
-      PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet) 
throws ExecutionSetupException {
+      PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet, 
UserSession session) throws ExecutionSetupException {
     assignEndpoints(activeEndpoints, planningSet);
-    return generateWorkUnit(options, foremanNode, queryId, reader, rootNode, 
planningSet);
+    return generateWorkUnit(options, foremanNode, queryId, reader, rootNode, 
planningSet, session);
   }
 
   private QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint 
foremanNode, QueryId queryId, PhysicalPlanReader reader, Fragment rootNode,
-                                         PlanningSet planningSet) throws 
ExecutionSetupException {
+                                         PlanningSet planningSet, UserSession 
session) throws ExecutionSetupException {
 
     List<PlanFragment> fragments = Lists.newArrayList();
 
@@ -157,6 +159,7 @@ public class SimpleParallelizer {
             .setMemInitial(wrapper.getInitialAllocation())//
             .setMemMax(wrapper.getMaxAllocation())
             .setOptionsJson(optionsData)
+            .setCredentials(session.getCredentials())
             .build();
 
         if (isRootNode) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a44cb917/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 4df6bfe..4e7fc92 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -23,6 +23,7 @@ import io.netty.channel.EventLoopGroup;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
 import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
@@ -55,12 +56,13 @@ public class UserClient extends 
BasicClientWithConnection<RpcType, UserToBitHand
     send(queryResultHandler.getWrappedListener(resultsListener), 
RpcType.RUN_QUERY, query, QueryId.class);
   }
 
-  public void connect(RpcConnectionHandler<ServerConnection> handler, 
DrillbitEndpoint endpoint, UserProperties props)
+  public void connect(RpcConnectionHandler<ServerConnection> handler, 
DrillbitEndpoint endpoint, UserProperties props, UserBitShared.UserCredentials 
credentials)
       throws RpcException, InterruptedException {
     UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder()
         .setRpcVersion(UserRpcConfig.RPC_VERSION)
         .setSupportListening(true)
-        .setSupportComplexTypes(supportComplexTypes);
+        .setSupportComplexTypes(supportComplexTypes)
+        .setCredentials(credentials);
 
     if (props != null) {
       hsBuilder.setProperties(props);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a44cb917/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
index d196743..efb0cdf 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
@@ -99,6 +99,10 @@ public class UserSession {
     return user;
   }
 
+  public UserCredentials getCredentials() {
+    return credentials;
+  }
+
   /**
    * Update the schema path for the session.
    * @param fullPath The desired path to set to.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a44cb917/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 1e5d8b8..0a34a22 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -373,7 +373,7 @@ public class Foreman implements Runnable, Closeable, 
Comparable<Object>{
       }
 
       QueryWorkUnit work = 
parallelizer.getFragments(context.getOptions().getOptionList(), 
context.getCurrentEndpoint(),
-          queryId, context.getActiveEndpoints(), context.getPlanReader(), 
rootFragment, planningSet);
+          queryId, context.getActiveEndpoints(), context.getPlanReader(), 
rootFragment, planningSet, initiatingClient.getSession());
 
       
this.context.getWorkBus().setFragmentStatusListener(work.getRootFragment().getHandle().getQueryId(),
 fragmentManager);
       List<PlanFragment> leafFragments = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a44cb917/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
index 58ddd06..6349b76 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
@@ -26,7 +26,9 @@ import 
org.apache.drill.exec.planner.fragment.SimpleParallelizer;
 import org.apache.drill.exec.planner.fragment.StatsCollector;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.options.OptionList;
 import org.apache.drill.exec.work.QueryWorkUnit;
 import org.junit.Test;
@@ -59,7 +61,8 @@ public class TestFragmentChecker extends PopUnitTestBase{
       endpoints.add(b1);
     }
 
-    QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, 
QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet);
+    QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, 
QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet,
+        
UserSession.Builder.newBuilder().withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build()).build());
     System.out.println(String.format("=========ROOT FRAGMENT [%d:%d] 
=========", qwu.getRootFragment().getHandle().getMajorFragmentId(), 
qwu.getRootFragment().getHandle().getMinorFragmentId()));
 
     System.out.print(qwu.getRootFragment().getFragmentJson());

Reply via email to