DRILL-6036: Create sys.connections table

Introduced a distributed (i.e. each Drillbit executes a fragment) system table 
to list all the active client connections to all the Drillbits in a cluster. 
This is achieved by having the UserServer maintain a static (class-level) map 
of these connections, which is updated as and when connections are established 
and closed.

The following details are provided by the table:
    user
    targetUser
    client
    drillbit
    established
    duration
    queries
    isAuthenticated
    isEncrypted
    usingSSL
    session

Security check to prevent unauthorized access to connection listing

Match is done between the inbound impersonated user and the available list of 
connections' targetUser property

Removed listing of targetUser
Removed getQueryContext from FragmentContext
FragmentContext provides isUserAuthenticationEnabled()

This closes #1076


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/93621eb2
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/93621eb2
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/93621eb2

Branch: refs/heads/master
Commit: 93621eb2491de911b751fe3489d17004ff4acc8a
Parents: 0b17fbf
Author: Kunal Khatua <[email protected]>
Authored: Wed Dec 20 10:28:15 2017 -0800
Committer: Parth Chandra <[email protected]>
Committed: Mon Jan 8 17:28:35 2018 -0800

----------------------------------------------------------------------
 .../apache/drill/exec/rpc/user/UserServer.java  |  70 ++++++++-
 .../drill/exec/server/DrillbitContext.java      |   9 ++
 .../store/sys/BitToUserConnectionIterator.java  | 141 +++++++++++++++++++
 .../drill/exec/store/sys/SystemTable.java       |   7 +
 .../drill/exec/store/sys/TestSystemTable.java   |   7 +-
 .../work/metadata/TestMetadataProvider.java     |   2 +-
 6 files changed, 229 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/93621eb2/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 2b50e50..58d9df0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -19,17 +19,16 @@ package org.apache.drill.exec.rpc.user;
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.net.ssl.SSLEngine;
 import javax.security.sasl.SaslException;
 
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelPipeline;
-import io.netty.handler.ssl.SslHandler;
 import org.apache.drill.common.config.DrillProperties;
 import org.apache.drill.common.exceptions.DrillException;
-import org.apache.drill.exec.ssl.SSLConfig;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
@@ -59,17 +58,22 @@ import 
org.apache.drill.exec.rpc.security.plain.PlainFactory;
 import org.apache.drill.exec.rpc.user.UserServer.BitToUserConnection;
 import org.apache.drill.exec.rpc.user.security.UserAuthenticationException;
 import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.ssl.SSLConfig;
 import org.apache.drill.exec.ssl.SSLConfigBuilder;
 import org.apache.drill.exec.work.user.UserWorker;
 import org.apache.hadoop.security.HadoopKerberosName;
+import org.joda.time.DateTime;
 import org.slf4j.Logger;
 
 import com.google.protobuf.MessageLite;
 
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 
@@ -81,6 +85,12 @@ public class UserServer extends BasicServer<RpcType, 
BitToUserConnection> {
   private final SSLConfig sslConfig;
   private Channel sslChannel;
   private final UserWorker userWorker;
+  private static final ConcurrentHashMap<BitToUserConnection, 
BitToUserConnectionConfig> userConnectionMap;
+
+  //Initializing the singleton map during startup
+  static {
+    userConnectionMap = new ConcurrentHashMap<>();
+  }
 
   public UserServer(BootStrapContext context, BufferAllocator allocator, 
EventLoopGroup eventLoopGroup,
                     UserWorker worker) throws DrillbitStartupException {
@@ -145,6 +155,14 @@ public class UserServer extends BasicServer<RpcType, 
BitToUserConnection> {
   }
 
   /**
+   * Access to set of active connection details for this instance of the 
Drillbit
+   * @return Active connection set
+   */
+  public static Set<Entry<BitToUserConnection, BitToUserConnectionConfig>> 
getUserConnections() {
+    return userConnectionMap.entrySet();
+  }
+
+  /**
    * {@link AbstractRemoteConnection} implementation for user connection. Also 
implements {@link UserClientConnection}.
    */
   public class BitToUserConnection extends 
AbstractServerConnection<BitToUserConnection>
@@ -263,13 +281,23 @@ public class UserServer extends BasicServer<RpcType, 
BitToUserConnection> {
     @Override
     public void decConnectionCounter() {
       UserRpcMetrics.getInstance().decConnectionCount();
+      //Removing entry in connection map (sys table)
+      userConnectionMap.remove(this);
     }
   }
 
   @Override
   protected BitToUserConnection initRemoteConnection(SocketChannel channel) {
     super.initRemoteConnection(channel);
-    return new BitToUserConnection(channel);
+    return registerAndGetConnection(channel);
+  }
+
+  private BitToUserConnection registerAndGetConnection(SocketChannel channel) {
+    BitToUserConnection bit2userConn = new BitToUserConnection(channel);
+    if (bit2userConn != null) {
+      userConnectionMap.put(bit2userConn, new BitToUserConnectionConfig());
+    }
+    return bit2userConn;
   }
 
   @Override
@@ -426,4 +454,36 @@ public class UserServer extends BasicServer<RpcType, 
BitToUserConnection> {
     return new UserProtobufLengthDecoder(allocator, outOfMemoryHandler);
   }
 
+  /**
+   * User Connection's config for System Table access
+   */
+  public class BitToUserConnectionConfig {
+    private DateTime established;
+    private boolean isAuthEnabled;
+    private boolean isEncryptionEnabled;
+    private boolean isSSLEnabled;
+
+    public BitToUserConnectionConfig() {
+      established = new DateTime(); //Current Joda-based Time
+      isAuthEnabled = config.isAuthEnabled();
+      isEncryptionEnabled = config.isEncryptionEnabled();
+      isSSLEnabled = config.isSSLEnabled();
+    }
+
+    public boolean isAuthEnabled() {
+      return isAuthEnabled;
+    }
+
+    public boolean isEncryptionEnabled() {
+      return isEncryptionEnabled;
+    }
+
+    public boolean isSSLEnabled() {
+      return isSSLEnabled;
+    }
+
+    public DateTime getEstablished() {
+      return established;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/93621eb2/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index f65592b..6189068 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -35,6 +35,9 @@ import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.rpc.control.WorkEventBus;
 import org.apache.drill.exec.rpc.data.DataConnectionCreator;
 import org.apache.drill.exec.rpc.security.AuthenticatorProvider;
+import org.apache.drill.exec.rpc.user.UserServer;
+import org.apache.drill.exec.rpc.user.UserServer.BitToUserConnection;
+import org.apache.drill.exec.rpc.user.UserServer.BitToUserConnectionConfig;
 import org.apache.drill.exec.server.options.SystemOptionManager;
 import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.store.StoragePluginRegistry;
@@ -43,6 +46,8 @@ import org.apache.drill.exec.work.foreman.rm.ResourceManager;
 import org.apache.drill.exec.work.foreman.rm.ResourceManagerBuilder;
 
 import java.util.Collection;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -279,6 +284,10 @@ public class DrillbitContext implements AutoCloseable {
     return context.getAuthProvider();
   }
 
+  public Set<Entry<BitToUserConnection, BitToUserConnectionConfig>> 
getUserConnections() {
+    return UserServer.getUserConnections();
+  }
+
   @Override
   public void close() throws Exception {
     getOptionManager().close();

http://git-wip-us.apache.org/repos/asf/drill/blob/93621eb2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BitToUserConnectionIterator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BitToUserConnectionIterator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BitToUserConnectionIterator.java
new file mode 100644
index 0000000..15a56a2
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BitToUserConnectionIterator.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.sql.Timestamp;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.TimeZone;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.rpc.user.UserServer.BitToUserConnection;
+import org.apache.drill.exec.rpc.user.UserServer.BitToUserConnectionConfig;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.rest.profile.SimpleDurationFormat;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.joda.time.DateTime;
+
+/**
+ * Add a system table for listing connected users on a cluster
+ */
+public class BitToUserConnectionIterator implements Iterator<Object> {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BitToUserConnectionIterator.class);
+
+  Iterator<ConnectionInfo> itr;
+  private String queryingUsername;
+  private boolean isAdmin;
+
+  public BitToUserConnectionIterator(FragmentContext context) {
+    queryingUsername = context.getQueryUserName();
+    isAdmin = hasAdminPrivileges(context);
+    itr = iterateConnectionInfo(context);
+  }
+
+  private boolean hasAdminPrivileges(FragmentContext context) {
+    OptionManager options = context.getOptions();
+    if (context.isUserAuthenticationEnabled() &&
+        !ImpersonationUtil.hasAdminPrivileges(
+          this.queryingUsername,
+          ExecConstants.ADMIN_USERS_VALIDATOR.getAdminUsers(options),
+          
ExecConstants.ADMIN_USER_GROUPS_VALIDATOR.getAdminUserGroups(options))) {
+      return false;
+    }
+
+    //Passed checks
+    return true;
+  }
+
+  private Iterator<ConnectionInfo> iterateConnectionInfo(FragmentContext 
context) {
+    Set<Entry<BitToUserConnection, BitToUserConnectionConfig>> 
activeConnections =
+        context.getDrillbitContext().getUserConnections();
+
+    String hostname = context.getIdentity().getAddress();
+    List<ConnectionInfo> connectionInfos = new LinkedList<ConnectionInfo>();
+
+    for (Entry<BitToUserConnection, BitToUserConnectionConfig> connection : 
activeConnections) {
+      if ( isAdmin ||
+          this.queryingUsername.equals(
+              connection.getKey().getSession().getTargetUserName()) ) {
+        connectionInfos.add(new ConnectionInfo(connection, hostname));
+      }
+    }
+
+    return connectionInfos.iterator();
+  }
+
+  @Override
+  public boolean hasNext() {
+    return itr.hasNext();
+  }
+
+  @Override
+  public Object next() {
+    return itr.next();
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+
+  public static class ConnectionInfo {
+    public String user;
+    public String client;
+    public String drillbit;
+    public Timestamp established;
+    public String duration;
+    public int queries;
+    public boolean isAuthenticated;
+    public boolean isEncrypted;
+    public boolean usingSSL;
+    public String session;
+
+    public ConnectionInfo(Entry<BitToUserConnection, 
BitToUserConnectionConfig> connectionConfigPair, String hostname) {
+      BitToUserConnection connection = connectionConfigPair.getKey();
+      BitToUserConnectionConfig config = connectionConfigPair.getValue();
+      UserSession userSession = connection.getSession();
+      this.user = userSession.getCredentials().getUserName();
+      DateTime dateTime = config.getEstablished();
+      this.established = new Timestamp(
+          dateTime
+          .plusMillis(TimeZone.getDefault().getOffset(dateTime.getMillis())) 
//Adjusting for -Duser.timezone
+          .getMillis());
+      this.duration = (new SimpleDurationFormat(dateTime.getMillis(), 
System.currentTimeMillis()))
+          .verbose();
+      this.client = extractIpAddr(connection.getRemoteAddress().toString());
+      this.drillbit = hostname;
+      this.session = userSession.getSessionId();
+      this.queries = userSession.getQueryCount();
+      this.isAuthenticated = config.isAuthEnabled();
+      this.isEncrypted = config.isEncryptionEnabled();
+      this.usingSSL = config.isSSLEnabled();
+    }
+
+    private String extractIpAddr(String clientAddrString) {
+      String ipAddr = clientAddrString
+          .replaceFirst("/","") //Remove any leading '/'
+          .split(":")[0]; //Remove any connected port reference
+      return ipAddr;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/93621eb2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
index 8803304..034f70c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
@@ -87,6 +87,13 @@ public enum SystemTable {
     }
   },
 
+  CONNECTIONS("connections", true, 
BitToUserConnectionIterator.ConnectionInfo.class) {
+    @Override
+    public Iterator<Object> getIterator(final FragmentContext context) {
+      return new BitToUserConnectionIterator(context);
+    }
+  },
+
   PROFILES("profiles", false, ProfileInfoIterator.ProfileInfo.class) {
     @Override
     public Iterator<Object> getIterator(final FragmentContext context) {

http://git-wip-us.apache.org/repos/asf/drill/blob/93621eb2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
index d39d2ec..62f5f4f 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
@@ -71,6 +71,11 @@ public class TestSystemTable extends BaseTestQuery {
   }
 
   @Test
+  public void connectionsTable() throws Exception {
+    test("select * from sys.connections");
+  }
+
+  @Test
   public void profilesTable() throws Exception {
     test("select * from sys.profiles");
   }
@@ -79,4 +84,4 @@ public class TestSystemTable extends BaseTestQuery {
   public void profilesJsonTable() throws Exception {
     test("select * from sys.profiles_json");
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/93621eb2/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
index b2240fc..463bfe5 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
@@ -248,7 +248,7 @@ public class TestMetadataProvider extends BaseTestQuery {
 
     assertEquals(RequestStatus.OK, resp.getStatus());
     List<ColumnMetadata> columns = resp.getColumnsList();
-    assertEquals(118, columns.size());
+    assertEquals(117, columns.size());
     // too many records to verify the output.
   }
 

Reply via email to