Author: mona
Date: Wed Oct 16 23:39:08 2013
New Revision: 1532942
URL: http://svn.apache.org/r1532942
Log:
OOZIE-1569 Maintain backward incompatibility for running jobs before upgrade
(mona)
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
oozie/trunk/release-log.txt
oozie/trunk/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java
oozie/trunk/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java
oozie/trunk/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java
oozie/trunk/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java
oozie/trunk/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java?rev=1532942&r1=1532941&r2=1532942&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java
Wed Oct 16 23:39:08 2013
@@ -19,13 +19,21 @@ package org.apache.oozie.action.hadoop;
import static
org.apache.oozie.action.hadoop.LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS;
+import java.io.IOException;
+import java.io.StringReader;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.RunningJob;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.XOozieClient;
+import org.apache.oozie.service.HadoopAccessorException;
import org.jdom.Element;
import org.jdom.JDOMException;
import org.jdom.Namespace;
@@ -102,6 +110,20 @@ public class HiveActionExecutor extends
return true;
}
+ @Override
+ protected void getActionData(FileSystem actionFs, RunningJob runningJob,
WorkflowAction action, Context context)
+ throws HadoopAccessorException, JDOMException, IOException,
URISyntaxException {
+ super.getActionData(actionFs, runningJob, action, context);
+
+ if (action.getData() != null) {
+ // Load stored Hadoop jobs ids and promote them as external child
+ // ids on job success
+ Properties props = new Properties();
+ props.load(new StringReader(action.getData()));
+ context.setExternalChildIDs((String)
props.get(LauncherMain.HADOOP_JOBS));
+ }
+ }
+
/**
* Return the sharelib name for the action.
*
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java?rev=1532942&r1=1532941&r2=1532942&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
Wed Oct 16 23:39:08 2013
@@ -17,11 +17,8 @@
*/
package org.apache.oozie.action.hadoop;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.PrintStream;
import java.io.StringReader;
import java.net.ConnectException;
import java.net.URI;
@@ -65,7 +62,6 @@ import org.apache.oozie.service.URIHandl
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.servlet.CallbackServlet;
import org.apache.oozie.util.ELEvaluator;
-import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.PropertiesUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;
@@ -1127,7 +1123,7 @@ public class JavaActionExecutor extends
XLog.getLog(getClass()).info(XLog.STD, "action completed,
external ID [{0}]",
action.getExternalId());
if (LauncherMapperHelper.isMainSuccessful(runningJob)) {
- if (LauncherMapperHelper.hasOutputData(actionData)) {
+ if (getCaptureOutput(action) &&
LauncherMapperHelper.hasOutputData(actionData)) {
context.setExecutionData(SUCCEEDED,
PropertiesUtils.stringToProperties(actionData
.get(LauncherMapper.ACTION_DATA_OUTPUT_PROPS)));
XLog.getLog(getClass()).info(XLog.STD, "action
produced output");
@@ -1139,6 +1135,7 @@ public class JavaActionExecutor extends
context.setExecutionStats(actionData.get(LauncherMapper.ACTION_DATA_STATS));
XLog.getLog(getClass()).info(XLog.STD, "action
produced stats");
}
+ getActionData(actionFs, runningJob, action, context);
}
else {
XLog log = XLog.getLog(getClass());
@@ -1205,6 +1202,20 @@ public class JavaActionExecutor extends
}
}
+ /**
+ * Get the output data of an action. Subclasses should override this method
+ * to get action specific output data.
+ *
+ * @param actionFs the FileSystem object
+ * @param runningJob the runningJob
+ * @param action the Workflow action
+ * @param context executor context
+ *
+ */
+ protected void getActionData(FileSystem actionFs, RunningJob runningJob,
WorkflowAction action, Context context)
+ throws HadoopAccessorException, JDOMException, IOException,
URISyntaxException {
+ }
+
protected boolean getCaptureOutput(WorkflowAction action) throws
JDOMException {
Element eConf = XmlUtils.parseXml(action.getConf());
Namespace ns = eConf.getNamespace();
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java?rev=1532942&r1=1532941&r2=1532942&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
Wed Oct 16 23:39:08 2013
@@ -30,6 +30,8 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -45,6 +47,8 @@ import org.apache.oozie.service.HadoopAc
import org.apache.oozie.service.Services;
import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.service.UserGroupInformationService;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.PropertiesUtils;
public class LauncherMapperHelper {
@@ -254,6 +258,50 @@ public class LauncherMapperHelper {
}
seqFile.close();
}
+ else { // maintain backward-compatibility. to be deprecated
+ org.apache.hadoop.fs.FileStatus[] files =
fs.listStatus(actionDir);
+ InputStream is;
+ BufferedReader reader = null;
+ Properties props;
+ if (files != null && files.length > 0) {
+ for (int x = 0; x < files.length; x++) {
+ Path file = files[x].getPath();
+ if (file.equals(new Path(actionDir,
"externalChildIds.properties"))) {
+ is = fs.open(file);
+ reader = new BufferedReader(new
InputStreamReader(is));
+
ret.put(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS,
+ IOUtils.getReaderAsString(reader, -1));
+ }
+ else if (file.equals(new Path(actionDir,
"newId.properties"))) {
+ is = fs.open(file);
+ reader = new BufferedReader(new
InputStreamReader(is));
+ props = PropertiesUtils.readProperties(reader,
-1);
+ ret.put(LauncherMapper.ACTION_DATA_NEW_ID,
props.getProperty("id"));
+ }
+ else if (file.equals(new Path(actionDir,
LauncherMapper.ACTION_DATA_OUTPUT_PROPS))) {
+ int maxOutputData =
conf.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA,
+ 2 * 1024);
+ is = fs.open(file);
+ reader = new BufferedReader(new
InputStreamReader(is));
+
ret.put(LauncherMapper.ACTION_DATA_OUTPUT_PROPS, PropertiesUtils
+
.propertiesToString(PropertiesUtils.readProperties(reader, maxOutputData)));
+ }
+ else if (file.equals(new Path(actionDir,
LauncherMapper.ACTION_DATA_STATS))) {
+ int statsMaxOutputData =
conf.getInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE,
+ Integer.MAX_VALUE);
+ is = fs.open(file);
+ reader = new BufferedReader(new
InputStreamReader(is));
+ ret.put(LauncherMapper.ACTION_DATA_STATS,
PropertiesUtils
+
.propertiesToString(PropertiesUtils.readProperties(reader,
statsMaxOutputData)));
+ }
+ else if (file.equals(new Path(actionDir,
LauncherMapper.ACTION_DATA_ERROR_PROPS))) {
+ is = fs.open(file);
+ reader = new BufferedReader(new
InputStreamReader(is));
+
ret.put(LauncherMapper.ACTION_DATA_ERROR_PROPS,
IOUtils.getReaderAsString(reader, -1));
+ }
+ }
+ }
+ }
return ret;
}
});
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java?rev=1532942&r1=1532941&r2=1532942&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
Wed Oct 16 23:39:08 2013
@@ -19,11 +19,14 @@ package org.apache.oozie.action.hadoop;
import java.io.IOException;
import java.io.StringReader;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobClient;
@@ -32,6 +35,7 @@ import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
import org.apache.oozie.util.XLog;
@@ -225,6 +229,29 @@ public class SqoopActionExecutor extends
}
}
+ /**
+ * Get the stats and external child IDs
+ *
+ * @param actionFs the FileSystem object
+ * @param runningJob the runningJob
+ * @param action the Workflow action
+ * @param context executor context
+ *
+ */
+ @Override
+ protected void getActionData(FileSystem actionFs, RunningJob runningJob,
WorkflowAction action, Context context)
+ throws HadoopAccessorException, JDOMException, IOException,
URISyntaxException{
+ super.getActionData(actionFs, runningJob, action, context);
+
+ if (action.getData() != null) {
+ // Load stored Hadoop jobs ids and promote them as external child
+ // ids
+ Properties props = new Properties();
+ props.load(new StringReader(action.getData()));
+ context.setExternalChildIDs((String)
props.get(LauncherMain.HADOOP_JOBS));
+ }
+ }
+
@Override
protected boolean getCaptureOutput(WorkflowAction action) throws
JDOMException {
return true;
Modified: oozie/trunk/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1532942&r1=1532941&r2=1532942&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Wed Oct 16 23:39:08 2013
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1569 Maintain backward incompatibility for running jobs before upgrade
(mona)
OOZIE-1568 TestWorkflowClient.testSla is flakey (rkanter)
OOZIE-1517 Support using MS SQL Server as a metastore (dwann via rkanter)
OOZIE-1460 Implement and Document security for HA (rkanter)
Modified:
oozie/trunk/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java?rev=1532942&r1=1532941&r2=1532942&view=diff
==============================================================================
---
oozie/trunk/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java
(original)
+++
oozie/trunk/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java
Wed Oct 16 23:39:08 2013
@@ -277,6 +277,7 @@ public class HiveMain extends LauncherMa
}
}
finally {
+ System.out.println("\n<<< Invocation of Hive command completed
<<<\n");
writeExternalChildIDs(logFile);
}
@@ -287,12 +288,15 @@ public class HiveMain extends LauncherMa
try {
Properties jobIds = getHadoopJobIds(logFile,
HIVE_JOB_IDS_PATTERNS);
File file = new
File(System.getProperty(LauncherMapper.ACTION_PREFIX
- + LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS));
- final String hadoopJobIDs = jobIds.getProperty(HADOOP_JOBS);
+ + LauncherMapper.ACTION_DATA_OUTPUT_PROPS));
OutputStream os = new FileOutputStream(file);
- os.write(hadoopJobIDs.getBytes());
- os.close();
- System.out.println(" Hadoop Job IDs executed by Hive: " +
hadoopJobIDs);
+ try {
+ jobIds.store(os, "");
+ }
+ finally {
+ os.close();
+ }
+ System.out.println(" Hadoop Job IDs executed by Hive: " +
jobIds.getProperty(HADOOP_JOBS));
System.out.println();
}
catch (Exception e) {
Modified:
oozie/trunk/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java?rev=1532942&r1=1532941&r2=1532942&view=diff
==============================================================================
---
oozie/trunk/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java
(original)
+++
oozie/trunk/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java
Wed Oct 16 23:39:08 2013
@@ -26,6 +26,7 @@ import java.io.Writer;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.Map;
+import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -151,11 +152,15 @@ public class TestHiveActionExecutor exte
ae.check(context, context.getAction());
assertTrue(launcherId.equals(context.getAction().getExternalId()));
assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
+ assertNotNull(context.getAction().getData());
ae.end(context, context.getAction());
assertEquals(WorkflowAction.Status.OK,
context.getAction().getStatus());
- assertNotNull(context.getExternalChildIDs());
-
assertEquals(actionData.get(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS),
context.getExternalChildIDs());
+ assertNotNull(context.getAction().getData());
+ Properties outputData = new Properties();
+ outputData.load(new StringReader(context.getAction().getData()));
+ assertTrue(outputData.containsKey(LauncherMain.HADOOP_JOBS));
+ assertEquals(outputData.get(LauncherMain.HADOOP_JOBS),
context.getExternalChildIDs());
//while this works in a real cluster, it does not with miniMR
//assertTrue(outputData.getProperty(LauncherMain.HADOOP_JOBS).trim().length() >
0);
Modified:
oozie/trunk/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java?rev=1532942&r1=1532941&r2=1532942&view=diff
==============================================================================
---
oozie/trunk/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java
(original)
+++
oozie/trunk/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java
Wed Oct 16 23:39:08 2013
@@ -116,7 +116,7 @@ public class TestHiveMain extends MainTe
setSystemProperty("oozie.launcher.job.id", "" +
System.currentTimeMillis());
setSystemProperty("oozie.action.conf.xml",
actionXml.getAbsolutePath());
- setSystemProperty("oozie.action.externalChildIDs",
outputDataFile.getAbsolutePath());
+ setSystemProperty("oozie.action.output.properties",
outputDataFile.getAbsolutePath());
new LauncherSecurityManager();
String user = System.getProperty("user.name");
Modified:
oozie/trunk/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java?rev=1532942&r1=1532941&r2=1532942&view=diff
==============================================================================
---
oozie/trunk/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java
(original)
+++
oozie/trunk/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java
Wed Oct 16 23:39:08 2013
@@ -187,10 +187,14 @@ public class SqoopMain extends LauncherM
Properties jobIds = getHadoopJobIds(logFile, SQOOP_JOB_IDS_PATTERNS);
File file = new File(System.getProperty(LauncherMapper.ACTION_PREFIX
- + LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS));
+ + LauncherMapper.ACTION_DATA_OUTPUT_PROPS));
OutputStream os = new FileOutputStream(file);
- os.write(jobIds.getProperty(HADOOP_JOBS).getBytes());
- os.close();
+ try {
+ jobIds.store(os, "");
+ }
+ finally {
+ os.close();
+ }
System.out.println(" Hadoop Job IDs executed by Sqoop: " +
jobIds.getProperty(HADOOP_JOBS));
System.out.println();
}
Modified:
oozie/trunk/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java?rev=1532942&r1=1532941&r2=1532942&view=diff
==============================================================================
---
oozie/trunk/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java
(original)
+++
oozie/trunk/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java
Wed Oct 16 23:39:08 2013
@@ -51,6 +51,7 @@ import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
public class TestSqoopActionExecutor extends ActionExecutorTestCase {
@@ -199,6 +200,7 @@ public class TestSqoopActionExecutor ext
ae.check(context, context.getAction());
assertTrue(launcherId.equals(context.getAction().getExternalId()));
assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
+ assertNotNull(context.getAction().getData());
assertNotNull(context.getAction().getExternalChildIDs());
ae.end(context, context.getAction());
assertEquals(WorkflowAction.Status.OK,
context.getAction().getStatus());
@@ -219,7 +221,11 @@ public class TestSqoopActionExecutor ext
br.close();
assertEquals(3, count);
-
assertTrue(actionData.get(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS).trim().length()
> 0);
+ assertNotNull(context.getAction().getData());
+ Properties outputData = new Properties();
+ outputData.load(new StringReader(context.getAction().getData()));
+ assertTrue(outputData.containsKey(LauncherMain.HADOOP_JOBS));
+
assertTrue(outputData.getProperty(LauncherMain.HADOOP_JOBS).trim().length() >
0);
}
public void testSqoopEval() throws Exception {
@@ -251,7 +257,11 @@ public class TestSqoopActionExecutor ext
assertNotNull(hadoopCounters);
assertTrue(hadoopCounters.isEmpty());
-
assertTrue(actionData.get(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS).isEmpty());
+ assertNotNull(context.getAction().getData());
+ Properties outputData = new Properties();
+ outputData.load(new StringReader(context.getAction().getData()));
+ assertTrue(outputData.containsKey(LauncherMain.HADOOP_JOBS));
+ assertEquals(0,
outputData.getProperty(LauncherMain.HADOOP_JOBS).trim().length());
}
public void testSqoopActionFreeFormQuery() throws Exception {
@@ -302,7 +312,11 @@ public class TestSqoopActionExecutor ext
}
assertEquals(3, count);
-
assertTrue(actionData.get(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS).trim().length()
> 0);
+ assertNotNull(context.getAction().getData());
+ Properties outputData = new Properties();
+ outputData.load(new StringReader(context.getAction().getData()));
+ assertTrue(outputData.containsKey(LauncherMain.HADOOP_JOBS));
+
assertTrue(outputData.getProperty(LauncherMain.HADOOP_JOBS).trim().length() >
0);
}