RANGER-261 Fix audit to HDFS in Kerberos environment

Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/859f3bd3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/859f3bd3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/859f3bd3

Branch: refs/heads/tag-policy
Commit: 859f3bd3ec7a9098b27429646547d1d52f0f353c
Parents: b12e761
Author: Don Bosco Durai <[email protected]>
Authored: Thu Jun 4 21:55:44 2015 -0700
Committer: Don Bosco Durai <[email protected]>
Committed: Fri Jun 5 08:26:48 2015 -0700

----------------------------------------------------------------------
 .../audit/destination/HDFSAuditDestination.java |   6 +-
 .../audit/provider/AuditProviderFactory.java    |   3 +-
 .../apache/ranger/audit/provider/MiscUtil.java  | 132 +++++++++++++++++--
 .../ranger/audit/queue/AuditAsyncQueue.java     |  27 +++-
 .../ranger/audit/queue/AuditBatchQueue.java     |  26 +++-
 .../ranger/audit/queue/AuditFileSpool.java      |  25 +++-
 .../apache/ranger/audit/queue/AuditQueue.java   |   8 ++
 .../ranger/audit/queue/AuditSummaryQueue.java   |  28 +++-
 .../plugin/audit/RangerDefaultAuditHandler.java |   3 +-
 .../service-defs/ranger-servicedef-kafka.json   |  29 +++-
 .../service-defs/ranger-servicedef-solr.json    |   8 +-
 .../scripts/ranger-admin-services.sh            |   4 +-
 .../hadoop/RangerHdfsAuthorizer.java            |   3 +-
 kms/scripts/ranger-kms                          |   2 +-
 plugin-kafka/conf/kafka-ranger-env.sh           |   2 +-
 .../kafka/authorizer/RangerKafkaAuthorizer.java |  86 ++++++++----
 .../service/filter/RangerRESTAPIFilter.java     |   9 --
 17 files changed, 327 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/859f3bd3/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
index e79e42d..4fc3a0b 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
@@ -119,6 +119,10 @@ public class HDFSAuditDestination extends AuditDestination 
{
                }
 
                try {
+                       if (logger.isDebugEnabled()) {
+                               logger.debug("UGI=" + MiscUtil.getUGILoginUser()
+                                               + ". Will write to HDFS file=" 
+ currentFileName);
+                       }
                        PrintWriter out = getLogFileStream();
                        for (String event : events) {
                                out.println(event);
@@ -154,7 +158,7 @@ public class HDFSAuditDestination extends AuditDestination {
                                jsonList.add(MiscUtil.stringify(event));
                        } catch (Throwable t) {
                                logger.error("Error converting to JSON. event=" 
+ event);
-                               addTotalCount(1);                               
+                               addTotalCount(1);
                                addFailedCount(1);
                                logFailedEvent(event);
                        }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/859f3bd3/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
index f02ba62..1146e0b 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
@@ -125,9 +125,8 @@ public class AuditProviderFactory {
 
                List<AuditHandler> providers = new ArrayList<AuditHandler>();
 
-               // TODO: Delete me
                for (Object propNameObj : props.keySet()) {
-                       LOG.info("DELETE ME: " + propNameObj.toString() + "="
+                       LOG.info("AUDIT PROPERTY: " + propNameObj.toString() + 
"="
                                        + 
props.getProperty(propNameObj.toString()));
                }
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/859f3bd3/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
index abb0a90..907b9b8 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
@@ -17,17 +17,25 @@
 package org.apache.ranger.audit.provider;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.rmi.dgc.VMID;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.UUID;
 
+import javax.security.auth.Subject;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.helpers.LogLog;
 import org.apache.ranger.authorization.hadoop.utils.RangerCredentialProvider;
 
@@ -35,6 +43,8 @@ import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 
 public class MiscUtil {
+       private static final Log logger = LogFactory.getLog(MiscUtil.class);
+
        public static final String TOKEN_START = "%";
        public static final String TOKEN_END = "%";
        public static final String TOKEN_HOSTNAME = "hostname";
@@ -51,6 +61,11 @@ public class MiscUtil {
 
        private static Gson sGsonBuilder = null;
        private static String sApplicationType = null;
+       private static UserGroupInformation ugiLoginUser = null;
+       private static Subject subjectLoginUser = null;
+
+       private static Map<String, LogHistory> logHistoryList = new 
Hashtable<String,LogHistory>();
+       private static int logInterval = 30000; // 30 seconds
 
        static {
                try {
@@ -371,26 +386,127 @@ public class MiscUtil {
        public static List<String> toArray(String destListStr, String delim) {
                List<String> list = new ArrayList<String>();
                if (destListStr != null && !destListStr.isEmpty()) {
-                       StringTokenizer tokenizer = new 
StringTokenizer(destListStr, delim.trim());
+                       StringTokenizer tokenizer = new 
StringTokenizer(destListStr,
+                                       delim.trim());
                        while (tokenizer.hasMoreTokens()) {
                                list.add(tokenizer.nextToken());
                        }
                }
                return list;
        }
-       
-       public static String getCredentialString(String url,String alias) {
+
+       public static String getCredentialString(String url, String alias) {
                String ret = null;
 
-               if(url != null && alias != null) {
-                       char[] cred = 
RangerCredentialProvider.getInstance().getCredentialString(url,alias);
+               if (url != null && alias != null) {
+                       char[] cred = RangerCredentialProvider.getInstance()
+                                       .getCredentialString(url, alias);
 
-                       if ( cred != null ) {
-                               ret = new String(cred); 
+                       if (cred != null) {
+                               ret = new String(cred);
                        }
                }
-               
+
                return ret;
        }
 
+       /**
+        * @param ugiLoginUser
+        */
+       public static void setUGILoginUser(UserGroupInformation newUGI, Subject 
newSubject) {
+               if (newUGI != null) {
+                       UserGroupInformation.setLoginUser(newUGI);
+                       ugiLoginUser = newUGI;
+                       logger.info("Setting UGI=" + newUGI );
+               } else {
+                       logger.error("UGI is null. Not setting it.");
+                       ugiLoginUser = null;
+               }
+               logger.info("Setting SUBJECT");
+               subjectLoginUser = newSubject;
+       }
+
+       public static UserGroupInformation getUGILoginUser() {
+               if (ugiLoginUser == null) {
+                       try {
+                               ugiLoginUser = 
UserGroupInformation.getLoginUser();
+                       } catch (IOException e) {
+                               logger.error("Error getting UGI.", e);
+                       }
+               }
+               return ugiLoginUser;
+       }
+
+       
+       public static Subject getSubjectLoginUser() {
+               return subjectLoginUser;
+       }
+
+       /**
+        * @param userName
+        * @return
+        */
+       static public Set<String> getGroupsForRequestUser(String userName) {
+               if (userName == null) {
+                       return null;
+               }
+               try {
+                       UserGroupInformation ugi = UserGroupInformation
+                                       .createRemoteUser(userName);
+                       // UserGroupInformation ugi = 
UserGroupInformation.getCurrentUser();
+                       String groups[] = ugi.getGroupNames();
+                       if (groups != null && groups.length > 0) {
+                               java.util.Set<String> groupsSet = new 
java.util.HashSet<String>();
+                               for (int i = 0; i < groups.length; i++) {
+                                       groupsSet.add(groups[i]);
+                               }
+                               return groupsSet;
+                       }
+               } catch (Throwable e) {
+                       logErrorMessageByInterval(
+                                       logger, "Error getting groups for 
users. userName=" + userName, e);
+               }
+               return null;
+       }
+
+       static public boolean logErrorMessageByInterval(Log useLogger, String 
message) {
+               return logErrorMessageByInterval(useLogger, message, null);
+       }
+
+       /**
+        * @param string
+        * @param e
+        */
+       static public boolean logErrorMessageByInterval(Log useLogger, String 
message, Throwable e) {
+               LogHistory log = logHistoryList.get(message);
+               if (log == null) {
+                       log = new LogHistory();
+                       logHistoryList.put(message, log);
+               }
+               if ((System.currentTimeMillis() - log.lastLogTime) > 
logInterval) {
+                       log.lastLogTime = System.currentTimeMillis();
+                       int counter = log.counter;
+                       log.counter = 0;
+                       if (counter > 0) {
+                               message += ". Messages suppressed before: " + 
counter;
+                       }
+                       if (e == null) {
+                               useLogger.error(message);
+                       } else {
+                               useLogger.error(message, e);
+                       }
+                       
+                       return true;
+               } else {
+                       log.counter++;
+               }
+               return false;
+
+       }
+
+       static class LogHistory {
+               long lastLogTime = 0;
+               int counter = 0;
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/859f3bd3/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
index 47480da..9b4fcbd 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
@@ -19,6 +19,7 @@
 
 package org.apache.ranger.audit.queue;
 
+import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -27,6 +28,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.model.AuditEventBase;
 import org.apache.ranger.audit.provider.AuditHandler;
+import org.apache.ranger.audit.provider.MiscUtil;
 
 /**
  * This is a non-blocking queue with no limit on capacity.
@@ -124,6 +126,26 @@ public class AuditAsyncQueue extends AuditQueue implements 
Runnable {
         */
        @Override
        public void run() {
+               try {
+                       if (isConsumerDestination) {
+                               PrivilegedAction<Void> action = new 
PrivilegedAction<Void>() {
+                                       public Void run() {
+                                               runDoAs();
+                                               return null;
+                                       };
+                               };
+                               logger.info("Running queue " + getName() + " as 
user "
+                                               + MiscUtil.getUGILoginUser());
+                               MiscUtil.getUGILoginUser().doAs(action);
+                       } else {
+                               runDoAs();
+                       }
+               } catch (Throwable t) {
+                       logger.fatal("Exited thread abnormaly. queue=" + 
getName(), t);
+               }
+       }
+
+       public void runDoAs() {
                while (true) {
                        try {
                                AuditEventBase event = null;
@@ -141,9 +163,7 @@ public class AuditAsyncQueue extends AuditQueue implements 
Runnable {
                                        consumer.log(eventList);
                                }
                        } catch (InterruptedException e) {
-                               logger.info(
-                                               "Caught exception in consumer 
thread. Shutdown might be in progress",
-                                               e);
+                               logger.info("Caught exception in consumer 
thread. Shutdown might be in progress");
                        } catch (Throwable t) {
                                logger.error("Caught error during processing 
request.", t);
                        }
@@ -172,7 +192,6 @@ public class AuditAsyncQueue extends AuditQueue implements 
Runnable {
                        logger.error("Error while calling stop on consumer.", 
t);
                }
                logger.info("Exiting consumerThread.run() method. name=" + 
getName());
-
        }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/859f3bd3/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
index 2361bbf..dca3030 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
@@ -19,6 +19,7 @@
 
 package org.apache.ranger.audit.queue;
 
+import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Properties;
@@ -30,6 +31,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.model.AuditEventBase;
 import org.apache.ranger.audit.provider.AuditHandler;
+import org.apache.ranger.audit.provider.MiscUtil;
 
 public class AuditBatchQueue extends AuditQueue implements Runnable {
        private static final Log logger = 
LogFactory.getLog(AuditBatchQueue.class);
@@ -206,6 +208,26 @@ public class AuditBatchQueue extends AuditQueue implements 
Runnable {
         */
        @Override
        public void run() {
+               try {
+                       if (isConsumerDestination) {
+                               PrivilegedAction<Void> action = new 
PrivilegedAction<Void>() {
+                                       public Void run() {
+                                               runDoAs();
+                                               return null;
+                                       };
+                               };
+                               logger.info("Running queue " + getName() + " as 
user "
+                                               + MiscUtil.getUGILoginUser());
+                               MiscUtil.getUGILoginUser().doAs(action);
+                       } else {
+                               runDoAs();
+                       }
+               } catch (Throwable t) {
+                       logger.fatal("Exited thread abnormaly. queue=" + 
getName(), t);
+               }
+       }
+
+       public void runDoAs() {
                long lastDispatchTime = System.currentTimeMillis();
                boolean isDestActive = true;
                while (true) {
@@ -265,9 +287,7 @@ public class AuditBatchQueue extends AuditQueue implements 
Runnable {
                                        lastDispatchTime = 
System.currentTimeMillis();
                                }
                        } catch (InterruptedException e) {
-                               logger.info(
-                                               "Caught exception in consumer 
thread. Shutdown might be in progress",
-                                               e);
+                               logger.info("Caught exception in consumer 
thread. Shutdown might be in progress");
                                setDrain(true);
                        } catch (Throwable t) {
                                logger.error("Caught error during processing 
request.", t);

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/859f3bd3/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java
index 9c10c54..4a4e707 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java
@@ -28,6 +28,7 @@ import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
@@ -108,6 +109,8 @@ public class AuditFileSpool implements Runnable {
        boolean isDrain = false;
        boolean isDestDown = true;
 
+       int ugiVersion = -1;
+
        private Gson gson = null;
 
        public AuditFileSpool(AuditQueue queueProvider,
@@ -751,9 +754,27 @@ public class AuditFileSpool implements Runnable {
         */
        @Override
        public void run() {
+               try {
+                       PrivilegedAction<Void> action = new 
PrivilegedAction<Void>() {
+                               public Void run() {
+                                       runDoAs();
+                                       return null;
+                               };
+                       };
+                       logger.info("Running fileSpool " + 
consumerProvider.getName()
+                                       + " as user " + 
MiscUtil.getUGILoginUser());
+                       MiscUtil.getUGILoginUser().doAs(action);
+               } catch (Throwable t) {
+                       logger.fatal("Exited thread without abnormaly. queue="
+                                       + consumerProvider.getName(), t);
+               }
+       }
+
+       public void runDoAs() {
                // boolean isResumed = false;
                while (true) {
                        try {
+
                                // Let's pause between each iteration
                                if (currentConsumerIndexRecord == null) {
                                        currentConsumerIndexRecord = 
indexQueue.poll(
@@ -843,9 +864,7 @@ public class AuditFileSpool implements Runnable {
                                        closeFileIfNeeded();
                                }
                        } catch (InterruptedException e) {
-                               logger.info(
-                                               "Caught exception in consumer 
thread. Shutdown might be in progress",
-                                               e);
+                               logger.info("Caught exception in consumer 
thread. Shutdown might be in progress");
                        } catch (Throwable t) {
                                logger.error("Exception in destination writing 
thread.", t);
                        }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/859f3bd3/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
index 057f192..000a658 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
@@ -23,6 +23,7 @@ import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.destination.AuditDestination;
 import org.apache.ranger.audit.provider.AuditHandler;
 import org.apache.ranger.audit.provider.BaseAuditHandler;
 import org.apache.ranger.audit.provider.MiscUtil;
@@ -61,6 +62,7 @@ public abstract class AuditQueue extends BaseAuditHandler {
        protected int fileSpoolMaxWaitTime = 5 * 60 * 1000; // Default 5 minutes
        protected int fileSpoolDrainThresholdPercent = 80;
 
+       boolean isConsumerDestination = false;
        // This is set when the first time stop is called.
        protected long stopTime = 0;
 
@@ -73,6 +75,12 @@ public abstract class AuditQueue extends BaseAuditHandler {
                        BaseAuditHandler baseAuditHander = (BaseAuditHandler) 
consumer;
                        baseAuditHander.setParentPath(getName());
                }
+
+               if (consumer != null && consumer instanceof AuditDestination) {
+                       // If consumer is destination, then the thread should 
run as server
+                       // user
+                       isConsumerDestination = true;
+               }
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/859f3bd3/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
index f1ce799..56ba55e 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
@@ -19,6 +19,7 @@
 
 package org.apache.ranger.audit.queue;
 
+import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
@@ -145,6 +146,28 @@ public class AuditSummaryQueue extends AuditQueue 
implements Runnable {
         */
        @Override
        public void run() {
+               try {
+                       if (isConsumerDestination) {
+                               PrivilegedAction<Void> action = new 
PrivilegedAction<Void>() {
+                                       public Void run() {
+                                               runDoAs();
+                                               return null;
+                                       };
+                               };
+                               logger.info("Running queue " + getName() + " as 
user "
+                                               + MiscUtil.getUGILoginUser());
+                               MiscUtil.getUGILoginUser().doAs(action);
+                       } else {
+                               runDoAs();
+                       }
+               } catch (Throwable t) {
+                       logger.fatal("Exited thread without abnormaly. queue=" 
+ getName(),
+                                       t);
+               }
+       }
+
+       public void runDoAs() {
+
                long lastDispatchTime = System.currentTimeMillis();
 
                while (true) {
@@ -174,9 +197,7 @@ public class AuditSummaryQueue extends AuditQueue 
implements Runnable {
                                        lastDispatchTime = 
System.currentTimeMillis();
                                }
                        } catch (InterruptedException e) {
-                               logger.info(
-                                               "Caught exception in consumer 
thread. Shutdown might be in progress",
-                                               e);
+                               logger.info("Caught exception in consumer 
thread. Shutdown might be in progress");
                        } catch (Throwable t) {
                                logger.error("Caught error during processing 
request.", t);
                        }
@@ -243,7 +264,6 @@ public class AuditSummaryQueue extends AuditQueue 
implements Runnable {
                        logger.error("Error while calling stop on consumer.", 
t);
                }
                logger.info("Exiting consumerThread.run() method. name=" + 
getName());
-
        }
 
        class AuditSummary {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/859f3bd3/agents-common/src/main/java/org/apache/ranger/plugin/audit/RangerDefaultAuditHandler.java
----------------------------------------------------------------------
diff --git 
a/agents-common/src/main/java/org/apache/ranger/plugin/audit/RangerDefaultAuditHandler.java
 
b/agents-common/src/main/java/org/apache/ranger/plugin/audit/RangerDefaultAuditHandler.java
index 844d0ac..c553618 100644
--- 
a/agents-common/src/main/java/org/apache/ranger/plugin/audit/RangerDefaultAuditHandler.java
+++ 
b/agents-common/src/main/java/org/apache/ranger/plugin/audit/RangerDefaultAuditHandler.java
@@ -28,7 +28,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
 import org.apache.ranger.audit.provider.AuditProviderFactory;
 import org.apache.ranger.audit.provider.MiscUtil;
-import org.apache.ranger.plugin.model.RangerServiceDef;
 import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
 import org.apache.ranger.plugin.policyengine.RangerAccessResult;
 import org.apache.ranger.plugin.policyengine.RangerAccessResource;
@@ -83,7 +82,7 @@ public class RangerDefaultAuditHandler implements 
RangerAccessResultProcessor {
                RangerAccessRequest request = result != null ? 
result.getAccessRequest() : null;
 
                if(request != null && result != null && result.getIsAudited()) {
-                       RangerServiceDef     serviceDef   = 
result.getServiceDef();
+                       //RangerServiceDef     serviceDef   = 
result.getServiceDef();
                        RangerAccessResource resource     = 
request.getResource();
                        String               resourceType = resource == null ? 
null : resource.getLeafName();
                        String               resourcePath = resource == null ? 
null : resource.getAsString();

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/859f3bd3/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
----------------------------------------------------------------------
diff --git 
a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json 
b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
index 4752eff..bf7a4df 100644
--- a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
+++ b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
@@ -31,17 +31,29 @@
                {
                        "itemId":1,
                        "name":"publish",
-                       "label":"Publish"
+                       "label":"Publish",
+                       "impliedGrants":[
+                               "describe"
+                       ]
+                       
                },
                {
                        "itemId":2,
                        "name":"consume",
-                       "label":"Consume"
+                       "label":"Consume",
+                       "impliedGrants":[
+                               "describe"
+                       ]
+                       
                },
                {
                        "itemId":5,
                        "name":"configure",
-                       "label":"Configure"
+                       "label":"Configure",
+                       "impliedGrants":[
+                               "describe"
+                       ]
+                       
                },
                {
                        "itemId":6,
@@ -51,7 +63,14 @@
                {
                        "itemId":7,
                        "name":"kafka_admin",
-                       "label":"Kafka Admin"
+                       "label":"Kafka Admin",
+                       "impliedGrants":[
+                               "publish",
+                               "consume",
+                               "configure",
+                               "describe"
+                       ]
+                       
                }
                
        ],
@@ -109,4 +128,4 @@
                }
                
        ]
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/859f3bd3/agents-common/src/main/resources/service-defs/ranger-servicedef-solr.json
----------------------------------------------------------------------
diff --git 
a/agents-common/src/main/resources/service-defs/ranger-servicedef-solr.json 
b/agents-common/src/main/resources/service-defs/ranger-servicedef-solr.json
index 107b5d6..bc27352 100644
--- a/agents-common/src/main/resources/service-defs/ranger-servicedef-solr.json
+++ b/agents-common/src/main/resources/service-defs/ranger-servicedef-solr.json
@@ -47,7 +47,11 @@
                {
                        "itemId":900,
                        "name":"solr_admin",
-                       "label":"Solr Admin"
+                       "label":"Solr Admin",
+                       "impliedGrants":[
+                               "query,update,others"
+                       ]
+                       
                }
                
        ],
@@ -117,4 +121,4 @@
                }
                
        ]
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/859f3bd3/embeddedwebserver/scripts/ranger-admin-services.sh
----------------------------------------------------------------------
diff --git a/embeddedwebserver/scripts/ranger-admin-services.sh 
b/embeddedwebserver/scripts/ranger-admin-services.sh
index 6931dbf..880ff49 100755
--- a/embeddedwebserver/scripts/ranger-admin-services.sh
+++ b/embeddedwebserver/scripts/ranger-admin-services.sh
@@ -53,12 +53,12 @@ then
 fi
 
 start() {
-       java -Dproc_rangeradmin ${JAVA_OPTS} 
-Dlogdir=${XAPOLICYMGR_EWS_DIR}/logs/ -Dcatalina.base=${XAPOLICYMGR_EWS_DIR} 
-cp 
"${XAPOLICYMGR_EWS_DIR}/webapp/WEB-INF/classes/conf:${XAPOLICYMGR_EWS_DIR}/lib/*:${RANGER_JAAS_LIB_DIR}/*:${RANGER_JAAS_CONF_DIR}:${JAVA_HOME}/lib/*"
 org.apache.ranger.server.tomcat.EmbeddedServer > logs/catalina.out 2>&1 &
+       java -Dproc_rangeradmin ${JAVA_OPTS} 
-Dlogdir=${XAPOLICYMGR_EWS_DIR}/logs/ -Dcatalina.base=${XAPOLICYMGR_EWS_DIR} 
-cp 
"${XAPOLICYMGR_EWS_DIR}/webapp/WEB-INF/classes/conf:${XAPOLICYMGR_EWS_DIR}/lib/*:${RANGER_JAAS_LIB_DIR}/*:${RANGER_JAAS_CONF_DIR}:${JAVA_HOME}/lib/*:$CLASSPATH"
 org.apache.ranger.server.tomcat.EmbeddedServer > logs/catalina.out 2>&1 &
        echo "Apache Ranger Admin has started."
 }
 
 stop(){
-       java ${JAVA_OPTS} -Dcatalina.base=${XAPOLICYMGR_EWS_DIR} -cp 
"${XAPOLICYMGR_EWS_DIR}/webapp/WEB-INF/classes/conf:${XAPOLICYMGR_EWS_DIR}/lib/*:${RANGER_JAAS_LIB_DIR}/*:${RANGER_JAAS_CONF_DIR}"
 org.apache.ranger.server.tomcat.StopEmbeddedServer > logs/catalina.out 2>&1
+       java ${JAVA_OPTS} -Dcatalina.base=${XAPOLICYMGR_EWS_DIR} -cp 
"${XAPOLICYMGR_EWS_DIR}/webapp/WEB-INF/classes/conf:${XAPOLICYMGR_EWS_DIR}/lib/*:${RANGER_JAAS_LIB_DIR}/*:${RANGER_JAAS_CONF_DIR}:$CLASSPATH"
 org.apache.ranger.server.tomcat.StopEmbeddedServer > logs/catalina.out 2>&1
        echo "Apache Ranger Admin has been stopped."
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/859f3bd3/hdfs-agent/src/main/java/org/apache/ranger/authorization/hadoop/RangerHdfsAuthorizer.java
----------------------------------------------------------------------
diff --git 
a/hdfs-agent/src/main/java/org/apache/ranger/authorization/hadoop/RangerHdfsAuthorizer.java
 
b/hdfs-agent/src/main/java/org/apache/ranger/authorization/hadoop/RangerHdfsAuthorizer.java
index f6fa8bd..fa2155c 100644
--- 
a/hdfs-agent/src/main/java/org/apache/ranger/authorization/hadoop/RangerHdfsAuthorizer.java
+++ 
b/hdfs-agent/src/main/java/org/apache/ranger/authorization/hadoop/RangerHdfsAuthorizer.java
@@ -49,7 +49,6 @@ import 
org.apache.ranger.authorization.hadoop.constants.RangerHadoopConstants;
 import 
org.apache.ranger.authorization.hadoop.exceptions.RangerAccessControlException;
 import org.apache.ranger.authorization.utils.StringUtil;
 import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
-import org.apache.ranger.plugin.model.RangerServiceDef;
 import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
 import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
 import org.apache.ranger.plugin.policyengine.RangerAccessResource;
@@ -478,7 +477,7 @@ class RangerHdfsAuditHandler extends 
RangerDefaultAuditHandler {
                }
 
                RangerAccessRequest  request      = result.getAccessRequest();
-               RangerServiceDef     serviceDef   = result.getServiceDef();
+//             RangerServiceDef     serviceDef   = result.getServiceDef();
                RangerAccessResource resource     = request.getResource();
                String               resourceType = resource != null ? 
resource.getLeafName() : null;
                String               resourcePath = resource != null ? 
resource.getAsString() : null;

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/859f3bd3/kms/scripts/ranger-kms
----------------------------------------------------------------------
diff --git a/kms/scripts/ranger-kms b/kms/scripts/ranger-kms
index f0fa1b4..805ebcc 100755
--- a/kms/scripts/ranger-kms
+++ b/kms/scripts/ranger-kms
@@ -72,7 +72,7 @@ fi
 
 KMS_CONF_DIR=${RANGER_KMS_EWS_DIR}/webapp/WEB-INF/classes/conf
 
-JAVA_OPTS="${JAVA_OPTS} -Dcatalina.base=${RANGER_KMS_EWS_DIR} 
-Dkms.config.dir=${KMS_CONF_DIR} -Dkms.log.dir=${TOMCAT_LOG_DIR} -cp 
${RANGER_KMS_EWS_CONF_DIR}:${RANGER_KMS_EWS_LIB_DIR}/*:${RANGER_KMS_EWS_DIR}/webapp/lib/*:${JAVA_HOME}/lib/*
 "
+JAVA_OPTS="${JAVA_OPTS} -Dcatalina.base=${RANGER_KMS_EWS_DIR} 
-Dkms.config.dir=${KMS_CONF_DIR} -Dkms.log.dir=${TOMCAT_LOG_DIR} -cp 
${RANGER_KMS_EWS_CONF_DIR}:${RANGER_KMS_EWS_LIB_DIR}/*:${RANGER_KMS_EWS_DIR}/webapp/lib/*:${JAVA_HOME}/lib/*:$CLASSPATH
 "
 
 if [ "${action}" == "START" ]; then
        echo "+ java -D${PROC_NAME} ${JAVA_OPTS} ${START_CLASS_NAME} 
${KMS_CONFIG_FILENAME} "

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/859f3bd3/plugin-kafka/conf/kafka-ranger-env.sh
----------------------------------------------------------------------
diff --git a/plugin-kafka/conf/kafka-ranger-env.sh 
b/plugin-kafka/conf/kafka-ranger-env.sh
index 2ec122d..219353d 100755
--- a/plugin-kafka/conf/kafka-ranger-env.sh
+++ b/plugin-kafka/conf/kafka-ranger-env.sh
@@ -17,4 +17,4 @@
 
 curr_dir=`pwd`
 cd `dirname $0`; script_dir=`pwd`; cd $curr_dir
-export CLASSPATH="$CLASSPATH:${script_dir}:/etc/kafka/conf"
+export 
CLASSPATH="$CLASSPATH:${script_dir}:/etc/kafka/conf:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*"

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/859f3bd3/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
----------------------------------------------------------------------
diff --git 
a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
 
b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index 0d0cffc..f982bbf 100644
--- 
a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ 
b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -19,8 +19,11 @@
 
 package org.apache.ranger.authorization.kafka.authorizer;
 
+import java.security.Principal;
 import java.util.Date;
 
+import javax.security.auth.Subject;
+
 import kafka.security.auth.Acl;
 import kafka.security.auth.Authorizer;
 import kafka.security.auth.KafkaPrincipal;
@@ -28,11 +31,14 @@ import kafka.security.auth.Operation;
 import kafka.security.auth.Resource;
 import kafka.security.auth.ResourceType;
 import kafka.server.KafkaConfig;
+import kafka.common.security.LoginManager;
 import kafka.network.RequestChannel.Session;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ranger.audit.provider.MiscUtil;
 import org.apache.ranger.authorization.utils.StringUtil;
 import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
 import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
@@ -64,9 +70,6 @@ public class RangerKafkaAuthorizer implements Authorizer {
        int errorLogFreq = 30000; // Log after every 30 seconds
 
        public RangerKafkaAuthorizer() {
-               if (rangerPlugin == null) {
-                       rangerPlugin = new RangerBasePlugin("kafka", "kafka");
-               }
        }
 
        /*
@@ -76,23 +79,70 @@ public class RangerKafkaAuthorizer implements Authorizer {
         */
        @Override
        public void initialize(KafkaConfig kafkaConfig) {
-               rangerPlugin.init();
-               RangerDefaultAuditHandler auditHandler = new 
RangerDefaultAuditHandler();
 
-               rangerPlugin.setResultProcessor(auditHandler);
+               if (rangerPlugin == null) {
+                       rangerPlugin = new RangerBasePlugin("kafka", "kafka");
+
+                       try {
+                               Subject subject = LoginManager.subject();
+                               logger.info("SUBJECT "
+                                               + (subject == null ? "not 
found" : "found"));
+                               if (subject != null) {
+                                       logger.info("SUBJECT.PRINCIPALS.size()="
+                                                       + 
subject.getPrincipals().size());
+                                       java.util.Set<Principal> principals = 
subject
+                                                       .getPrincipals();
+                                       for (Principal principal : principals) {
+                                               
logger.info("SUBJECT.PRINCIPAL.NAME="
+                                                               + 
principal.getName());
+                                       }
+                                       try {
+                                               // Do not remove the below 
statement. The default
+                                               // getLoginUser does some 
initialization which is needed
+                                               // for getUGIFromSubject() to 
work.
+                                               logger.info("Default UGI before 
using Subject from Kafka:"
+                                                               + 
UserGroupInformation.getLoginUser());
+                                       } catch (Throwable t) {
+                                               logger.error(t);
+                                       }
+                                       UserGroupInformation ugi = 
UserGroupInformation
+                                                       
.getUGIFromSubject(subject);
+                                       logger.info("SUBJECT.UGI.NAME=" + 
ugi.getUserName()
+                                                       + ", ugi=" + ugi);
+                                       MiscUtil.setUGILoginUser(ugi, subject);
+                               } else {
+                                       logger.info("Server username is not 
available");
+                               }
+                               logger.info("LoginUser=" + 
MiscUtil.getUGILoginUser());
+                       } catch (Throwable t) {
+                               logger.error("Error getting principal.", t);
+                       }
+
+                       logger.info("Calling plugin.init()");
+                       rangerPlugin.init();
+
+                       RangerDefaultAuditHandler auditHandler = new 
RangerDefaultAuditHandler();
+                       rangerPlugin.setResultProcessor(auditHandler);
+               }
        }
 
        @Override
        public boolean authorize(Session session, Operation operation,
                        Resource resource) {
 
+               if (rangerPlugin == null) {
+                       MiscUtil.logErrorMessageByInterval(logger,
+                                       "Authorizer is still not initialized");
+                       return false;
+               }
                String userName = null;
                if (session.principal() != null) {
                        userName = session.principal().getName();
                        userName = StringUtils.substringBefore(userName, "/");
                        userName = StringUtils.substringBefore(userName, "@");
                }
-               java.util.Set<String> userGroups = getGroupsForUser(userName);
+               java.util.Set<String> userGroups = MiscUtil
+                               .getGroupsForRequestUser(userName);
                String ip = session.host();
 
                Date eventTime = StringUtil.getUTCDate();
@@ -101,9 +151,8 @@ public class RangerKafkaAuthorizer implements Authorizer {
                String validationStr = "";
 
                if (accessType == null) {
-                       if (rangerPlugin
-                                       .logErrorMessage("Unsupported access 
type. operation="
-                                                       + operation)) {
+                       if (MiscUtil.logErrorMessageByInterval(logger,
+                                       "Unsupported access type. operation=" + 
operation)) {
                                logger.fatal("Unsupported access type. 
session=" + session
                                                + ", operation=" + operation + 
", resource=" + resource);
                        }
@@ -138,8 +187,8 @@ public class RangerKafkaAuthorizer implements Authorizer {
 
                boolean returnValue = true;
                if (validationFailed) {
-                       rangerPlugin.logErrorMessage(validationStr + ", 
request="
-                                       + rangerRequest);
+                       MiscUtil.logErrorMessageByInterval(logger, validationStr
+                                       + ", request=" + rangerRequest);
                        returnValue = false;
                } else {
 
@@ -229,19 +278,6 @@ public class RangerKafkaAuthorizer implements Authorizer {
        }
 
        /**
-        * @param userName
-        * @return
-        */
-       private java.util.Set<String> getGroupsForUser(String userName) {
-               if (userName == null) {
-                       return null;
-               }
-
-               // TODO: Need to implement this method
-               return null;
-       }
-
-       /**
         * @param operation
         * @return
         */

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/859f3bd3/security-admin/src/main/java/org/apache/ranger/service/filter/RangerRESTAPIFilter.java
----------------------------------------------------------------------
diff --git 
a/security-admin/src/main/java/org/apache/ranger/service/filter/RangerRESTAPIFilter.java
 
b/security-admin/src/main/java/org/apache/ranger/service/filter/RangerRESTAPIFilter.java
index 21536ac..0e5bb08 100644
--- 
a/security-admin/src/main/java/org/apache/ranger/service/filter/RangerRESTAPIFilter.java
+++ 
b/security-admin/src/main/java/org/apache/ranger/service/filter/RangerRESTAPIFilter.java
@@ -93,15 +93,6 @@ public class RangerRESTAPIFilter extends LoggingFilter {
                if (logStdOut) {
                        String path = request.getRequestUri().getPath();
 
-                       // 
mediaType=multipart/form-data;boundary=----WebKitFormBoundaryTHan76r5AkgpAuVG
-                       if (request.getMediaType() != null) {
-                               // logger.info("DELETE ME: mediaType=" + 
request.getMediaType()
-                               // + ", getType()" + 
request.getMediaType().getType()
-                               // + ", getSubType()="
-                               // + request.getMediaType().getSubtype());
-                       } else {
-                               logger.info("DELETE ME: mediaType is null. 
path=" + path);
-                       }
                        if ((request.getMediaType() == null || 
!request.getMediaType()
                                        .getType().equals("multipart"))
                                        && 
!path.endsWith("/service/general/logs")) {


Reply via email to