Author: rkanter
Date: Fri Oct 11 01:56:24 2013
New Revision: 1531170

URL: http://svn.apache.org/r1531170
Log:
OOZIE-1460 Implement and Document security for HA (rkanter)

Modified:
    
oozie/trunk/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
    oozie/trunk/docs/src/site/twiki/AG_Install.twiki
    oozie/trunk/release-log.txt

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java?rev=1531170&r1=1531169&r2=1531170&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
 Fri Oct 11 01:56:24 2013
@@ -18,27 +18,31 @@
 package org.apache.oozie.service;
 
 import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileReader;
 import org.apache.oozie.util.Instrumentable;
 import org.apache.oozie.util.Instrumentation;
 import org.apache.oozie.util.XLogStreamer;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.io.Writer;
 import java.net.HttpURLConnection;
 import java.net.URL;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import org.apache.curator.x.discovery.ServiceInstance;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
+import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.security.authentication.client.Authenticator;
+import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
+import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.rest.RestConstants;
-import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.SimpleTimestampedMessageParser;
 import org.apache.oozie.util.TimestampedMessageParser;
 import org.apache.oozie.util.XLog;
@@ -54,6 +58,7 @@ public class ZKXLogStreamingService exte
 
     private ZKUtils zk;
     private XLog log;
+    private Class<? extends Authenticator> AuthenticatorClass;
 
     /**
      * Initialize the log streaming service.
@@ -71,6 +76,11 @@ public class ZKXLogStreamingService exte
             throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex);
         }
         log = XLog.getLog(this.getClass());
+        try {
+            AuthenticatorClass = determineAuthenticatorClassType();
+        } catch (Exception ex) {
+            throw new ServiceException(ErrorCode.E0100, ex);
+        }
     }
 
     /**
@@ -149,7 +159,6 @@ public class ZKXLogStreamingService exte
             throw new IOException("Issue communicating with ZooKeeper: " + 
ex.getMessage(), ex);
         }
         List<TimestampedMessageParser> parsers = new 
ArrayList<TimestampedMessageParser>(oozies.size());
-        List<File> localTempFiles = new ArrayList<File>(oozies.size() - 1);
         try {
             // Create a BufferedReader for getting the logs of each server and 
put them in a TimestampedMessageParser
             for (ServiceInstance<Map> oozie : oozies) {
@@ -166,9 +175,7 @@ public class ZKXLogStreamingService exte
                     String otherUrl = 
oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_URL);
                     String jobId = 
filter.getFilterParams().get(DagXLogInfoService.JOB);
                     try {
-                        File f = File.createTempFile(otherId, ".log");  // 
Create a temp file to stream the log to
-                        localTempFiles.add(f);
-                        BufferedReader reader = fetchOtherLog(otherUrl, jobId, 
f);
+                        BufferedReader reader = fetchOtherLog(otherUrl, jobId);
                         parsers.add(new SimpleTimestampedMessageParser(reader, 
filter));
                     }
                     catch(IOException ioe) {
@@ -231,39 +238,85 @@ public class ZKXLogStreamingService exte
             for (TimestampedMessageParser parser : parsers) {
                 parser.closeReader();
             }
-            for (File f : localTempFiles) {
-                f.delete();
-            }
             writer.flush();
         }
     }
 
     /**
-     * Creates a connection over HTTP to another Oozie server and asks it for 
the logs related to the jobId.  It will then stream
-     * the logs to a temporary file in the local filesystem and return a 
BufferedReader to that file (this prevents us from leaving
-     * an Http stream open for too long while processing line-by-line and 
running into timeouts).
+     * Creates a connection over HTTP to another Oozie server and asks it for 
the logs related to the jobId.
      *
      * @param otherUrl The URL of the other Oozie server
      * @param jobId The job id of the job we want logs for
-     * @param tempFile The temporary file to save the log to
      * @return a BufferedReader of the logs from the other server for the jobId
      * @throws IOException If there was a problem connecting to the other 
Oozie server
      */
-    private BufferedReader fetchOtherLog(String otherUrl, String jobId, File 
tempFile) throws IOException {
+    private BufferedReader fetchOtherLog(String otherUrl, String jobId) throws 
IOException {
         // It's important that we specify ALL_SERVERS_PARAM=false in the GET 
request to prevent the other Oozie Server from trying
         // aggregate logs from the other Oozie servers (and creating an 
infinite recursion)
-        URL url = new URL(otherUrl + "/v" + OozieClient.WS_PROTOCOL_VERSION + 
"/" + RestConstants.JOB + "/" + jobId
+        final URL url = new URL(otherUrl + "/v" + 
OozieClient.WS_PROTOCOL_VERSION + "/" + RestConstants.JOB + "/" + jobId
                 + "?" + RestConstants.JOB_SHOW_PARAM + "=" + 
RestConstants.JOB_SHOW_LOG + "&" + ALL_SERVERS_PARAM + "=false");
-        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+
+        log.debug("Fetching logs from [{0}]", url);
         BufferedReader reader = null;
-        if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
-            InputStream is = conn.getInputStream();
-            FileOutputStream fos = new FileOutputStream(tempFile);
-            IOUtils.copyStream(is, fos);
-            is.close();
-            fos.close();
-            reader = new BufferedReader(new FileReader(tempFile));
+        try {
+            reader = UserGroupInformation.getLoginUser().doAs(new 
PrivilegedExceptionAction<BufferedReader>() {
+                @Override
+                public BufferedReader run() throws IOException {
+                    HttpURLConnection conn = getConnection(url);
+                    BufferedReader reader = null;
+                    if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) 
{
+                        InputStream is = conn.getInputStream();
+                        reader = new BufferedReader(new InputStreamReader(is));
+                    }
+                    return reader;
+                }
+            });
+        }
+        catch (InterruptedException ie) {
+            throw new IOException(ie);
         }
         return reader;
     }
+
+    private HttpURLConnection getConnection(URL url) throws IOException {
+        AuthenticatedURL.Token token = new AuthenticatedURL.Token();
+        HttpURLConnection conn;
+        try {
+            conn = new 
AuthenticatedURL(AuthenticatorClass.newInstance()).openConnection(url, token);
+        }
+        catch (AuthenticationException ex) {
+            throw new IOException("Could not authenticate, " + 
ex.getMessage(), ex);
+        } catch (InstantiationException ex) {
+            throw new IOException("Could not authenticate, " + 
ex.getMessage(), ex);
+        } catch (IllegalAccessException ex) {
+            throw new IOException("Could not authenticate, " + 
ex.getMessage(), ex);
+        }
+        if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
+            throw new IOException("Unexpected response code [" + 
conn.getResponseCode() + "], message [" + conn.getResponseMessage()
+                    + "]");
+        }
+        return conn;
+    }
+
+    @SuppressWarnings("unchecked")
+    private Class<? extends Authenticator> determineAuthenticatorClassType() 
throws Exception {
+        // Adapted from 
org.apache.hadoop.security.authentication.server.AuthenticationFilter#init
+        Class<? extends Authenticator> authClass;
+        String authName = 
Services.get().getConf().get("oozie.authentication.type");
+        String authClassName;
+        if (authName == null) {
+            throw new IOException("Authentication type must be specified: 
simple|kerberos|<class>");
+        }
+        authName = authName.trim();
+        if (authName.equals("simple")) {
+            authClassName = PseudoAuthenticator.class.getName();
+        } else if (authName.equals("kerberos")) {
+            authClassName = KerberosAuthenticator.class.getName();
+        } else {
+            authClassName = authName;
+        }
+
+        authClass = (Class<? extends Authenticator>) 
Thread.currentThread().getContextClassLoader().loadClass(authClassName);
+        return authClass;
+    }
 }

Modified: oozie/trunk/docs/src/site/twiki/AG_Install.twiki
URL: 
http://svn.apache.org/viewvc/oozie/trunk/docs/src/site/twiki/AG_Install.twiki?rev=1531170&r1=1531169&r2=1531170&view=diff
==============================================================================
--- oozie/trunk/docs/src/site/twiki/AG_Install.twiki (original)
+++ oozie/trunk/docs/src/site/twiki/AG_Install.twiki Fri Oct 11 01:56:24 2013
@@ -787,6 +787,12 @@ export OOZIE_INSTANCE_ID="${OOZIE_HTTP_H
 Note: If one of the Oozie servers becomes unavailable, querying Oozie for the 
logs from a job in the Web UI, REST API, or client may
 be missing information until that server comes back up.
 
+---++++ Security
+
+Oozie HA works with the existing Oozie security framework and settings.  There 
are no additional security settings specific to HA;
+however, for log streaming to work properly in a secure setup, 
=oozie.authentication.type= must be set properly on each server
+(though this is already required if using security in the first place).
+
 ---++ Starting and Stopping Oozie
 
 Use the standard Tomcat commands to start and stop Oozie.

Modified: oozie/trunk/release-log.txt
URL: 
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1531170&r1=1531169&r2=1531170&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Fri Oct 11 01:56:24 2013
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1460 Implement and Document security for HA (rkanter)
 OOZIE-1570 Make openjpa connection properties configurable (rohini)
 OOZIE-1560 Log messages should have a way of identifying which server they 
came from when using HA (rkanter)
 OOZIE-1566 Add reference to Quartz module in cron documentation (bowenzhangusa 
via rkanter)


Reply via email to