Author: rkanter
Date: Fri Oct 11 01:56:24 2013
New Revision: 1531170
URL: http://svn.apache.org/r1531170
Log:
OOZIE-1460 Implement and Document security for HA (rkanter)
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
oozie/trunk/docs/src/site/twiki/AG_Install.twiki
oozie/trunk/release-log.txt
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java?rev=1531170&r1=1531169&r2=1531170&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
Fri Oct 11 01:56:24 2013
@@ -18,27 +18,31 @@
package org.apache.oozie.service;
import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileReader;
import org.apache.oozie.util.Instrumentable;
import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.util.XLogStreamer;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
import java.io.Writer;
import java.net.HttpURLConnection;
import java.net.URL;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.curator.x.discovery.ServiceInstance;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
+import
org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.security.authentication.client.Authenticator;
+import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
+import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.rest.RestConstants;
-import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.SimpleTimestampedMessageParser;
import org.apache.oozie.util.TimestampedMessageParser;
import org.apache.oozie.util.XLog;
@@ -54,6 +58,7 @@ public class ZKXLogStreamingService exte
private ZKUtils zk;
private XLog log;
+ private Class<? extends Authenticator> AuthenticatorClass;
/**
* Initialize the log streaming service.
@@ -71,6 +76,11 @@ public class ZKXLogStreamingService exte
throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex);
}
log = XLog.getLog(this.getClass());
+ try {
+ AuthenticatorClass = determineAuthenticatorClassType();
+ } catch (Exception ex) {
+ throw new ServiceException(ErrorCode.E0100, ex);
+ }
}
/**
@@ -149,7 +159,6 @@ public class ZKXLogStreamingService exte
throw new IOException("Issue communicating with ZooKeeper: " +
ex.getMessage(), ex);
}
List<TimestampedMessageParser> parsers = new
ArrayList<TimestampedMessageParser>(oozies.size());
- List<File> localTempFiles = new ArrayList<File>(oozies.size() - 1);
try {
// Create a BufferedReader for getting the logs of each server and
put them in a TimestampedMessageParser
for (ServiceInstance<Map> oozie : oozies) {
@@ -166,9 +175,7 @@ public class ZKXLogStreamingService exte
String otherUrl =
oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_URL);
String jobId =
filter.getFilterParams().get(DagXLogInfoService.JOB);
try {
- File f = File.createTempFile(otherId, ".log"); //
Create a temp file to stream the log to
- localTempFiles.add(f);
- BufferedReader reader = fetchOtherLog(otherUrl, jobId,
f);
+ BufferedReader reader = fetchOtherLog(otherUrl, jobId);
parsers.add(new SimpleTimestampedMessageParser(reader,
filter));
}
catch(IOException ioe) {
@@ -231,39 +238,85 @@ public class ZKXLogStreamingService exte
for (TimestampedMessageParser parser : parsers) {
parser.closeReader();
}
- for (File f : localTempFiles) {
- f.delete();
- }
writer.flush();
}
}
/**
- * Creates a connection over HTTP to another Oozie server and asks it for
the logs related to the jobId. It will then stream
- * the logs to a temporary file in the local filesystem and return a
BufferedReader to that file (this prevents us from leaving
- * an Http stream open for too long while processing line-by-line and
running into timeouts).
+ * Creates a connection over HTTP to another Oozie server and asks it for
the logs related to the jobId.
*
* @param otherUrl The URL of the other Oozie server
* @param jobId The job id of the job we want logs for
- * @param tempFile The temporary file to save the log to
* @return a BufferedReader of the logs from the other server for the jobId
* @throws IOException If there was a problem connecting to the other
Oozie server
*/
- private BufferedReader fetchOtherLog(String otherUrl, String jobId, File
tempFile) throws IOException {
+ private BufferedReader fetchOtherLog(String otherUrl, String jobId) throws
IOException {
// It's important that we specify ALL_SERVERS_PARAM=false in the GET
request to prevent the other Oozie Server from trying
// aggregate logs from the other Oozie servers (and creating an
infinite recursion)
- URL url = new URL(otherUrl + "/v" + OozieClient.WS_PROTOCOL_VERSION +
"/" + RestConstants.JOB + "/" + jobId
+ final URL url = new URL(otherUrl + "/v" +
OozieClient.WS_PROTOCOL_VERSION + "/" + RestConstants.JOB + "/" + jobId
+ "?" + RestConstants.JOB_SHOW_PARAM + "=" +
RestConstants.JOB_SHOW_LOG + "&" + ALL_SERVERS_PARAM + "=false");
- HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+
+ log.debug("Fetching logs from [{0}]", url);
BufferedReader reader = null;
- if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
- InputStream is = conn.getInputStream();
- FileOutputStream fos = new FileOutputStream(tempFile);
- IOUtils.copyStream(is, fos);
- is.close();
- fos.close();
- reader = new BufferedReader(new FileReader(tempFile));
+ try {
+ reader = UserGroupInformation.getLoginUser().doAs(new
PrivilegedExceptionAction<BufferedReader>() {
+ @Override
+ public BufferedReader run() throws IOException {
+ HttpURLConnection conn = getConnection(url);
+ BufferedReader reader = null;
+ if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK))
{
+ InputStream is = conn.getInputStream();
+ reader = new BufferedReader(new InputStreamReader(is));
+ }
+ return reader;
+ }
+ });
+ }
+ catch (InterruptedException ie) {
+ throw new IOException(ie);
}
return reader;
}
+
+ private HttpURLConnection getConnection(URL url) throws IOException {
+ AuthenticatedURL.Token token = new AuthenticatedURL.Token();
+ HttpURLConnection conn;
+ try {
+ conn = new
AuthenticatedURL(AuthenticatorClass.newInstance()).openConnection(url, token);
+ }
+ catch (AuthenticationException ex) {
+ throw new IOException("Could not authenticate, " +
ex.getMessage(), ex);
+ } catch (InstantiationException ex) {
+ throw new IOException("Could not authenticate, " +
ex.getMessage(), ex);
+ } catch (IllegalAccessException ex) {
+ throw new IOException("Could not authenticate, " +
ex.getMessage(), ex);
+ }
+ if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
+ throw new IOException("Unexpected response code [" +
conn.getResponseCode() + "], message [" + conn.getResponseMessage()
+ + "]");
+ }
+ return conn;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Class<? extends Authenticator> determineAuthenticatorClassType()
throws Exception {
+ // Adapted from
org.apache.hadoop.security.authentication.server.AuthenticationFilter#init
+ Class<? extends Authenticator> authClass;
+ String authName =
Services.get().getConf().get("oozie.authentication.type");
+ String authClassName;
+ if (authName == null) {
+ throw new IOException("Authentication type must be specified:
simple|kerberos|<class>");
+ }
+ authName = authName.trim();
+ if (authName.equals("simple")) {
+ authClassName = PseudoAuthenticator.class.getName();
+ } else if (authName.equals("kerberos")) {
+ authClassName = KerberosAuthenticator.class.getName();
+ } else {
+ authClassName = authName;
+ }
+
+ authClass = (Class<? extends Authenticator>)
Thread.currentThread().getContextClassLoader().loadClass(authClassName);
+ return authClass;
+ }
}
Modified: oozie/trunk/docs/src/site/twiki/AG_Install.twiki
URL:
http://svn.apache.org/viewvc/oozie/trunk/docs/src/site/twiki/AG_Install.twiki?rev=1531170&r1=1531169&r2=1531170&view=diff
==============================================================================
--- oozie/trunk/docs/src/site/twiki/AG_Install.twiki (original)
+++ oozie/trunk/docs/src/site/twiki/AG_Install.twiki Fri Oct 11 01:56:24 2013
@@ -787,6 +787,12 @@ export OOZIE_INSTANCE_ID="${OOZIE_HTTP_H
Note: If one of the Oozie servers becomes unavailable, querying Oozie for the
logs from a job in the Web UI, REST API, or client may
be missing information until that server comes back up.
+---++++ Security
+
+Oozie HA works with the existing Oozie security framework and settings. There
are no additional security settings specific to HA;
+however, for log streaming to work properly in a secure setup,
=oozie.authentication.type= must be set properly on each server
+(though this is already required if using security in the first place).
+
---++ Starting and Stopping Oozie
Use the standard Tomcat commands to start and stop Oozie.
Modified: oozie/trunk/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1531170&r1=1531169&r2=1531170&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Fri Oct 11 01:56:24 2013
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1460 Implement and Document security for HA (rkanter)
OOZIE-1570 Make openjpa connection properties configurable (rohini)
OOZIE-1560 Log messages should have a way of identifying which server they
came from when using HA (rkanter)
OOZIE-1566 Add reference to Quartz module in cron documentation (bowenzhangusa
via rkanter)