Repository: oozie Updated Branches: refs/heads/oya aa1dd9613 -> 4a9542caf
OOZIE-2698 Refactor LauncherAM to make it more testable Change-Id: I63acfd9a8ec8d4077221deff2e954dda4f3a8281 Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/9151f4eb Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/9151f4eb Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/9151f4eb Branch: refs/heads/oya Commit: 9151f4eb189da00022465f2c87f3b7db3ae2f024 Parents: aa1dd96 Author: Peter Bacsko <[email protected]> Authored: Mon Oct 17 17:08:16 2016 +0200 Committer: Peter Bacsko <[email protected]> Committed: Mon Oct 17 17:08:16 2016 +0200 ---------------------------------------------------------------------- .../oozie/action/hadoop/TestLauncherAM.java | 46 -- .../hadoop/TestLauncherAMCallbackNotifier.java | 25 +- .../action/hadoop/AMRMCallBackHandler.java | 72 +++ .../action/hadoop/AMRMClientAsyncFactory.java | 32 + .../apache/oozie/action/hadoop/ErrorHolder.java | 56 ++ .../oozie/action/hadoop/HdfsOperations.java | 88 +++ .../apache/oozie/action/hadoop/LauncherAM.java | 602 ++++++++----------- .../hadoop/LauncherAMCallbackNotifier.java | 15 +- .../LauncherAMCallbackNotifierFactory.java | 27 + .../oozie/action/hadoop/LocalFsOperations.java | 100 +++ .../action/hadoop/PrepareActionsDriver.java | 38 +- .../action/hadoop/PrepareActionsHandler.java | 100 +++ .../hadoop/SequenceFileWriterFactory.java | 34 ++ 13 files changed, 764 insertions(+), 471 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java deleted file mode 100644 index ed29299..0000000 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.action.hadoop; - -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.filecache.DistributedCache; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RunningJob; -import org.apache.oozie.service.HadoopAccessorService; -import org.apache.oozie.service.Services; -import org.apache.oozie.test.XFsTestCase; -import org.apache.oozie.util.IOUtils; -import org.apache.oozie.util.XConfiguration; - -import java.io.File; -import java.io.FileWriter; -import java.io.Writer; -import java.net.URI; -import java.util.Map; - -public class TestLauncherAM extends XFsTestCase { - - - // TODO: OYA: write tests later - -} http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java index 9ba04da..fcf99e3 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java @@ -21,6 +21,7 @@ package org.apache.oozie.action.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.oozie.QueryServlet; +import org.apache.oozie.action.hadoop.LauncherAM.OozieActionResult; import org.apache.oozie.command.wf.HangServlet; import org.apache.oozie.test.EmbeddedServletContainer; import org.apache.oozie.test.XTestCase; @@ -109,7 +110,7 @@ public class TestLauncherAMCallbackNotifier extends XTestCase { LauncherAMCallbackNotifier cnSpy = Mockito.spy(new LauncherAMCallbackNotifier(conf)); long start = System.currentTimeMillis(); - cnSpy.notifyURL(FinalApplicationStatus.SUCCEEDED, false); + cnSpy.notifyURL(OozieActionResult.SUCCEEDED); long end = System.currentTimeMillis(); Mockito.verify(cnSpy, Mockito.times(1)).notifyURLOnce(); Assert.assertTrue("Should have taken more than 5 seconds but it only took " + (end - start), end - start >= 5000); @@ -120,7 +121,7 @@ public class TestLauncherAMCallbackNotifier extends XTestCase { cnSpy = Mockito.spy(new LauncherAMCallbackNotifier(conf)); start = System.currentTimeMillis(); - cnSpy.notifyURL(FinalApplicationStatus.SUCCEEDED, false); + cnSpy.notifyURL(OozieActionResult.SUCCEEDED); end = System.currentTimeMillis(); Mockito.verify(cnSpy, Mockito.times(3)).notifyURLOnce(); Assert.assertTrue("Should have taken more than 9 seconds but it only took " + (end - start), end - start >= 9000); @@ -133,7 +134,7 @@ public class TestLauncherAMCallbackNotifier extends XTestCase { LauncherAMCallbackNotifier cnSpy = Mockito.spy(new LauncherAMCallbackNotifier(conf)); long start = System.currentTimeMillis(); - cnSpy.notifyURL(FinalApplicationStatus.SUCCEEDED, false); + cnSpy.notifyURL(OozieActionResult.SUCCEEDED); long end = System.currentTimeMillis(); Mockito.verify(cnSpy, Mockito.times(1)).notifyURLOnce(); Assert.assertTrue("Should have taken more than 5 seconds but it only took " + (end - start), end - start >= 5000); @@ -145,7 +146,7 @@ public class TestLauncherAMCallbackNotifier extends XTestCase { LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf); assertNull(QueryServlet.lastQueryString); - cn.notifyURL(FinalApplicationStatus.SUCCEEDED, false); + cn.notifyURL(OozieActionResult.SUCCEEDED); waitForCallbackAndCheckResult(FinalApplicationStatus.SUCCEEDED.toString()); } @@ -155,18 +156,8 @@ public class TestLauncherAMCallbackNotifier extends XTestCase { LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf); assertNull(QueryServlet.lastQueryString); - cn.notifyURL(FinalApplicationStatus.SUCCEEDED, true); - waitForCallbackAndCheckResult("RUNNING"); - } - - public void testNotifyBackgroundActionWhenSubmitFailsWithKilled() throws Exception { - Configuration conf = setupEmbeddedContainer(QueryServlet.class, "/count/*", "/count/?status=$jobStatus", null); - - LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf); - - assertNull(QueryServlet.lastQueryString); - cn.notifyURL(FinalApplicationStatus.KILLED, true); - waitForCallbackAndCheckResult(FinalApplicationStatus.KILLED.toString()); + cn.notifyURL(OozieActionResult.RUNNING); + waitForCallbackAndCheckResult(OozieActionResult.RUNNING.toString()); } public void testNotifyBackgroundActionWhenSubmitFailsWithFailed() throws Exception { @@ -175,7 +166,7 @@ public class TestLauncherAMCallbackNotifier extends XTestCase { LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf); assertNull(QueryServlet.lastQueryString); - cn.notifyURL(FinalApplicationStatus.FAILED, true); + cn.notifyURL(OozieActionResult.FAILED); waitForCallbackAndCheckResult(FinalApplicationStatus.FAILED.toString()); } http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMCallBackHandler.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMCallBackHandler.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMCallBackHandler.java new file mode 100644 index 0000000..63213e6 --- /dev/null +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMCallBackHandler.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import java.util.List; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; + +// Note: methods which modify/read the state of errorHolder are synchronized to avoid data races when LauncherAM invokes getError() +public class AMRMCallBackHandler implements AMRMClientAsync.CallbackHandler { + private ErrorHolder errorHolder; + + @Override + public void onContainersCompleted(List<ContainerStatus> containerStatuses) { + //noop + } + + @Override + public void onContainersAllocated(List<Container> containers) { + //noop + } + + @Override + public synchronized void onShutdownRequest() { + System.out.println("Resource manager requested AM Shutdown"); + errorHolder = new ErrorHolder(); + errorHolder.setErrorCode(0); + errorHolder.setErrorMessage("ResourceManager requested AM Shutdown"); + } + + @Override + public void onNodesUpdated(List<NodeReport> nodeReports) { + //noop + } + + @Override + public float getProgress() { + return 0.5f; //TODO: OYA: maybe some action types can report better progress? + } + + @Override + public synchronized void onError(final Throwable ex) { + System.out.println("Received asynchronous error"); + ex.printStackTrace(); + errorHolder = new ErrorHolder(); + errorHolder.setErrorCode(0); + errorHolder.setErrorMessage(ex.getMessage()); + errorHolder.setErrorCause(ex); + } + + public synchronized ErrorHolder getError() { + return errorHolder; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java new file mode 100644 index 0000000..b4cbb4b --- /dev/null +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; + +public class AMRMClientAsyncFactory { + + public AMRMClientAsync<?> createAMRMClientAsync(int intervalMs) { + AMRMClient<?> amRmClient = AMRMClient.createAMRMClient(); + AMRMCallBackHandler callBackHandler = new AMRMCallBackHandler(); + AMRMClientAsync<?> amRmClientAsync = AMRMClientAsync.createAMRMClientAsync(amRmClient, intervalMs, callBackHandler); + + return amRmClientAsync; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ErrorHolder.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ErrorHolder.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ErrorHolder.java new file mode 100644 index 0000000..6a755db --- /dev/null +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ErrorHolder.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +public class ErrorHolder { + private int errorCode = 0; + private Throwable errorCause = null; + private String errorMessage = null; + private boolean populated = false; + + public int getErrorCode() { + return errorCode; + } + + public void setErrorCode(int errorCode) { + this.errorCode = errorCode; + this.populated = true; + } + + public Throwable getErrorCause() { + return errorCause; + } + + public void setErrorCause(Throwable errorCause) { + this.errorCause = errorCause; + this.populated = true; + } + + public String getErrorMessage() { + return errorMessage; + } + + public void setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + this.populated = true; + } + + public boolean isPopulated() { + return populated; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java new file mode 100644 index 0000000..593de00 --- /dev/null +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import java.io.IOException; +import java.security.PrivilegedAction; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; + +import com.google.common.base.Preconditions; + +public class HdfsOperations { + private final SequenceFileWriterFactory seqFileWriterFactory; + private final UserGroupInformation ugi; + + public HdfsOperations(SequenceFileWriterFactory seqFileWriterFactory, UserGroupInformation ugi) { + this.seqFileWriterFactory = Preconditions.checkNotNull(seqFileWriterFactory, "seqFileWriterFactory should not be null"); + this.ugi = Preconditions.checkNotNull(ugi, "ugi should not be null"); + } + + /** + * Creates a Sequence file which contains the output from an action and uploads it to HDFS. + */ + public void uploadActionDataToHDFS(final Configuration launcherJobConf, final Path actionDir, final Map<String, String> actionData) throws IOException { + IOException ioe = ugi.doAs(new PrivilegedAction<IOException>() { + @Override + public IOException run() { + Path finalPath = new Path(actionDir, LauncherAM.ACTION_DATA_SEQUENCE_FILE); + // upload into sequence file + System.out.println("Oozie Launcher, uploading action data to HDFS sequence file: " + + new Path(actionDir, LauncherAM.ACTION_DATA_SEQUENCE_FILE).toUri()); + + SequenceFile.Writer wr = null; + try { + wr = seqFileWriterFactory.createSequenceFileWriter(launcherJobConf, finalPath, Text.class, Text.class); + + if (wr != null) { + Set<String> keys = actionData.keySet(); + for (String propsKey : keys) { + wr.append(new Text(propsKey), new Text(actionData.get(propsKey))); + } + } else { + throw new IOException("SequenceFile.Writer is null for " + finalPath); + } + } catch (IOException e) { + e.printStackTrace(); + return e; + } finally { + if (wr != null) { + try { + wr.close(); + } catch (IOException e) { + e.printStackTrace(); + return e; + } + } + } + + return null; + } + }); + + if (ioe != null) { + throw ioe; + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java index 85d78c6..89357ad 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java @@ -17,120 +17,133 @@ */ package org.apache.oozie.action.hadoop; -import java.io.BufferedReader; import java.io.File; -import java.io.FileReader; import java.io.IOException; import java.io.PrintWriter; -import java.io.Reader; import java.io.StringWriter; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.security.Permission; import java.security.PrivilegedAction; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.StringTokenizer; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.NodeReport; -import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.exceptions.YarnException; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; public class LauncherAM { - - static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class"; - - static final String ACTION_PREFIX = "oozie.action."; + private static final String OOZIE_ACTION_CONF_XML = "oozie.action.conf.xml"; + private static final String OOZIE_LAUNCHER_JOB_ID = "oozie.launcher.job.id"; + public static final String ACTIONOUTPUTTYPE_ID_SWAP = "IdSwap"; + public static final String ACTIONOUTPUTTYPE_OUTPUT = "Output"; + public static final String ACTIONOUTPUTTYPE_STATS = "Stats"; + public static final String ACTIONOUTPUTTYPE_EXT_CHILD_ID = "ExtChildID"; + + public static final String JAVA_CLASS_PATH = "java.class.path"; + public static final String OOZIE_ACTION_ID = "oozie.action.id"; + public static final String OOZIE_JOB_ID = "oozie.job.id"; + public static final String ACTION_PREFIX = "oozie.action."; public static final String CONF_OOZIE_ACTION_MAX_OUTPUT_DATA = ACTION_PREFIX + "max.output.data"; - static final String CONF_OOZIE_ACTION_MAIN_ARG_PREFIX = ACTION_PREFIX + "main.arg."; - static final String CONF_OOZIE_ACTION_MAIN_ARG_COUNT = CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + "count"; - static final String CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE = "oozie.external.stats.max.size"; - - static final String OOZIE_ACTION_DIR_PATH = ACTION_PREFIX + "dir.path"; - static final String ACTION_PREPARE_XML = ACTION_PREFIX + "prepare.xml"; - static final String ACTION_DATA_SEQUENCE_FILE = "action-data.seq"; // COMBO FILE - static final String ACTION_DATA_EXTERNAL_CHILD_IDS = "externalChildIDs"; - static final String ACTION_DATA_OUTPUT_PROPS = "output.properties"; - static final String ACTION_DATA_STATS = "stats.properties"; - static final String ACTION_DATA_NEW_ID = "newId"; - static final String ACTION_DATA_ERROR_PROPS = "error.properties"; + public static final String CONF_OOZIE_ACTION_MAIN_ARG_PREFIX = ACTION_PREFIX + "main.arg."; + public static final String CONF_OOZIE_ACTION_MAIN_ARG_COUNT = CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + "count"; + public static final String CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE = "oozie.external.stats.max.size"; + public static final String OOZIE_ACTION_DIR_PATH = ACTION_PREFIX + "dir.path"; + public static final String ACTION_PREPARE_XML = ACTION_PREFIX + "prepare.xml"; + public static final String ACTION_DATA_SEQUENCE_FILE = "action-data.seq"; // COMBO FILE + public static final String ACTION_DATA_EXTERNAL_CHILD_IDS = "externalChildIDs"; + public static final String ACTION_DATA_OUTPUT_PROPS = "output.properties"; + public static final String ACTION_DATA_STATS = "stats.properties"; + public static final String ACTION_DATA_NEW_ID = "newId"; + public static final String ACTION_DATA_ERROR_PROPS = "error.properties"; + public static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class"; // TODO: OYA: more unique file names? action.xml may be stuck for backwards compat though public static final String LAUNCHER_JOB_CONF_XML = "launcher.xml"; public static final String ACTION_CONF_XML = "action.xml"; public static final String ACTION_DATA_FINAL_STATUS = "final.status"; - private static AMRMClientAsync<AMRMClient.ContainerRequest> amRmClientAsync = null; - private static Configuration launcherJobConf = null; - private static Path actionDir; - private static Map<String, String> actionData = new HashMap<String,String>(); + private final UserGroupInformation ugi; + private final AMRMCallBackHandler callbackHandler; + private final AMRMClientAsyncFactory amRmClientAsyncFactory; + private final HdfsOperations hdfsOperations; + private final LocalFsOperations localFsOperations; + private final PrepareActionsHandler prepareHandler; + private final LauncherAMCallbackNotifierFactory callbackNotifierFactory; + private final LauncherSecurityManager launcherSecurityManager; + + private Configuration launcherJobConf; + private AMRMClientAsync<?> amRmClientAsync; + private Path actionDir; + private Map<String, String> actionData = new HashMap<String,String>(); + + public LauncherAM(UserGroupInformation ugi, + AMRMClientAsyncFactory amRmClientAsyncFactory, + AMRMCallBackHandler callbackHandler, + HdfsOperations hdfsOperations, + LocalFsOperations localFsOperations, + PrepareActionsHandler prepareHandler, + LauncherAMCallbackNotifierFactory callbackNotifierFactory, + LauncherSecurityManager launcherSecurityManager) { + this.ugi = Preconditions.checkNotNull(ugi, "ugi should not be null"); + this.amRmClientAsyncFactory = Preconditions.checkNotNull(amRmClientAsyncFactory, "amRmClientAsyncFactory should not be null"); + this.callbackHandler = Preconditions.checkNotNull(callbackHandler, "callbackHandler should not be null"); + this.hdfsOperations = Preconditions.checkNotNull(hdfsOperations, "hdfsOperations should not be null"); + this.localFsOperations = Preconditions.checkNotNull(localFsOperations, "localFsOperations should not be null"); + this.prepareHandler = Preconditions.checkNotNull(prepareHandler, "prepareHandler should not be null"); + this.callbackNotifierFactory = Preconditions.checkNotNull(callbackNotifierFactory, "callbackNotifierFactory should not be null"); + this.launcherSecurityManager = Preconditions.checkNotNull(launcherSecurityManager, "launcherSecurityManager should not be null"); + } - private static void printDebugInfo(String[] mainArgs) throws IOException { - printContentsOfCurrentDir(); + public static void main(String[] args) throws Exception { + UserGroupInformation ugi = null; + String submitterUser = System.getProperty("submitter.user", "").trim(); + Preconditions.checkArgument(!submitterUser.isEmpty(), "Submitter user is undefined"); + System.out.println("Submitter user is: " + submitterUser); - System.out.println(); - System.out.println("Oozie Launcher Application Master configuration"); - System.out.println("==============================================="); - System.out.println("Workflow job id : " + launcherJobConf.get("oozie.job.id")); - System.out.println("Workflow action id: " + launcherJobConf.get("oozie.action.id")); - System.out.println(); - System.out.println("Classpath :"); - System.out.println("------------------------"); - StringTokenizer st = new StringTokenizer(System.getProperty("java.class.path"), ":"); - while (st.hasMoreTokens()) { - System.out.println(" " + st.nextToken()); - } - System.out.println("------------------------"); - System.out.println(); - String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS); - System.out.println("Main class : " + mainClass); - System.out.println(); - System.out.println("Maximum output : " - + launcherJobConf.getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024)); - System.out.println(); - System.out.println("Arguments :"); - for (String arg : mainArgs) { - System.out.println(" " + arg); + if (UserGroupInformation.getLoginUser().getShortUserName().equals(submitterUser)) { + System.out.println("Using login user for UGI"); + ugi = UserGroupInformation.getLoginUser(); + } else { + ugi = UserGroupInformation.createRemoteUser(submitterUser); + ugi.addCredentials(UserGroupInformation.getLoginUser().getCredentials()); } - System.out.println(); - System.out.println("Java System Properties:"); - System.out.println("------------------------"); - System.getProperties().store(System.out, ""); - System.out.flush(); - System.out.println("------------------------"); - System.out.println(); - - System.out.println("================================================================="); - System.out.println(); - System.out.println(">>> Invoking Main class now >>>"); - System.out.println(); - System.out.flush(); + AMRMClientAsyncFactory amRmClientAsyncFactory = new AMRMClientAsyncFactory(); + AMRMCallBackHandler callbackHandler = new AMRMCallBackHandler(); + HdfsOperations hdfsOperations = new HdfsOperations(new SequenceFileWriterFactory(), ugi); + LocalFsOperations localFSOperations = new LocalFsOperations(); + PrepareActionsHandler prepareHandler = new PrepareActionsHandler(); + LauncherAMCallbackNotifierFactory callbackNotifierFactory = new LauncherAMCallbackNotifierFactory(); + LauncherSecurityManager launcherSecurityManager = new LauncherSecurityManager(); + + LauncherAM launcher = new LauncherAM(ugi, + amRmClientAsyncFactory, + callbackHandler, + hdfsOperations, + localFSOperations, + prepareHandler, + callbackNotifierFactory, + launcherSecurityManager); + + launcher.run(); } // TODO: OYA: rethink all print messages and formatting - public static void main(String[] AMargs) throws Exception { - final ErrorHolder eHolder = new ErrorHolder(); - FinalApplicationStatus finalStatus = FinalApplicationStatus.FAILED; - String submitterUser = System.getProperty("submitter.user", "").trim(); - Preconditions.checkArgument(!submitterUser.isEmpty(), "Submitter user is undefined"); - System.out.println("Submitter user is: " + submitterUser); + public void run() throws Exception { + final ErrorHolder errorHolder = new ErrorHolder(); + OozieActionResult actionResult = OozieActionResult.FAILED; + boolean launcerExecutedProperly = false; String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name()); @@ -144,25 +157,15 @@ public class LauncherAM { System.out.println("Login authMethod: " + login.getAuthenticationMethod()); System.out.println("JobUserName:" + jobUserName); - UserGroupInformation ugi = null; - - if (UserGroupInformation.getLoginUser().getShortUserName().equals(submitterUser)) { - System.out.println("Using login user for UGI"); - ugi = UserGroupInformation.getLoginUser(); - } else { - ugi = UserGroupInformation.createRemoteUser(submitterUser); - ugi.addCredentials(UserGroupInformation.getLoginUser().getCredentials()); - } - boolean backgroundAction = false; try { try { - launcherJobConf = readLauncherConf(); + launcherJobConf = localFsOperations.readLauncherConf(); System.out.println("Launcher AM configuration loaded"); } catch (Exception ex) { - eHolder.setErrorMessage("Could not load the Launcher AM configuration file"); - eHolder.setErrorCause(ex); + errorHolder.setErrorMessage("Could not load the Launcher AM configuration file"); + errorHolder.setErrorCause(ex); throw ex; } @@ -175,8 +178,8 @@ public class LauncherAM { executePrepare(ugi); System.out.println("Completed the execution of prepare actions successfully"); } catch (Exception ex) { - eHolder.setErrorMessage("Prepare execution in the Launcher AM has failed"); - eHolder.setErrorCause(ex); + errorHolder.setErrorMessage("Prepare execution in the Launcher AM has failed"); + errorHolder.setErrorCause(ex); throw ex; } @@ -185,14 +188,14 @@ public class LauncherAM { // TODO: OYA: should we allow turning this off? // TODO: OYA: what should default be? if (launcherJobConf.getBoolean("oozie.launcher.print.debug.info", true)) { - printDebugInfo(mainArgs); + printDebugInfo(); } setupMainConfiguration(); - finalStatus = runActionMain(mainArgs, eHolder, ugi); + launcerExecutedProperly = runActionMain(mainArgs, errorHolder, ugi); - if (finalStatus == FinalApplicationStatus.SUCCEEDED) { + if (launcerExecutedProperly) { handleActionData(); if (actionData.get(ACTION_DATA_OUTPUT_PROPS) != null) { System.out.println(); @@ -218,36 +221,87 @@ public class LauncherAM { System.err.println("Launcher AM execution failed"); e.printStackTrace(System.out); e.printStackTrace(System.err); - finalStatus = FinalApplicationStatus.FAILED; - eHolder.setErrorCause(e); - eHolder.setErrorMessage(e.getMessage()); + launcerExecutedProperly = false; + if (!errorHolder.isPopulated()) { + errorHolder.setErrorCause(e); + errorHolder.setErrorMessage(e.getMessage()); + } throw e; } finally { try { - // Store final status in case Launcher AM falls off the RM - actionData.put(ACTION_DATA_FINAL_STATUS, finalStatus.toString()); - if (finalStatus != FinalApplicationStatus.SUCCEEDED) { - failLauncher(eHolder); + ErrorHolder callbackErrorHolder = callbackHandler.getError(); + + if (launcerExecutedProperly) { + actionResult = backgroundAction ? OozieActionResult.RUNNING : OozieActionResult.SUCCEEDED; } - uploadActionDataToHDFS(ugi); + + if (!launcerExecutedProperly) { + updateActionDataWithFailure(errorHolder, actionData); + } else if (callbackErrorHolder != null) { // async error from the callback + actionResult = OozieActionResult.FAILED; + updateActionDataWithFailure(callbackErrorHolder, actionData); + } + + actionData.put(ACTION_DATA_FINAL_STATUS, actionResult.toString()); + hdfsOperations.uploadActionDataToHDFS(launcherJobConf, actionDir, actionData); } finally { try { - unregisterWithRM(finalStatus, eHolder.getErrorMessage()); + unregisterWithRM(actionResult, errorHolder.getErrorMessage()); } finally { - LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(launcherJobConf); - cn.notifyURL(finalStatus, backgroundAction); + LauncherAMCallbackNotifier cn = callbackNotifierFactory.createCallbackNotifier(launcherJobConf); + cn.notifyURL(actionResult); } } } } - private static void registerWithRM() throws IOException, YarnException { - AMRMClient<AMRMClient.ContainerRequest> amRmClient = AMRMClient.createAMRMClient(); + @VisibleForTesting + Map<String, String> getActionData() { + return actionData; + } + + private void printDebugInfo() throws IOException { + localFsOperations.printContentsOfDir(new File(".")); + + System.out.println(); + System.out.println("Oozie Launcher Application Master configuration"); + System.out.println("==============================================="); + System.out.println("Workflow job id : " + launcherJobConf.get(OOZIE_JOB_ID)); + System.out.println("Workflow action id: " + launcherJobConf.get(OOZIE_ACTION_ID)); + System.out.println(); + System.out.println("Classpath :"); + System.out.println("------------------------"); + StringTokenizer st = new StringTokenizer(System.getProperty(JAVA_CLASS_PATH), ":"); + while (st.hasMoreTokens()) { + System.out.println(" " + st.nextToken()); + } + System.out.println("------------------------"); + System.out.println(); + String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS); + System.out.println("Main class : " + mainClass); + System.out.println(); + System.out.println("Maximum output : " + + launcherJobConf.getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024)); + System.out.println(); + + System.out.println(); + System.out.println("Java System Properties:"); + System.out.println("------------------------"); + System.getProperties().store(System.out, ""); + System.out.flush(); + System.out.println("------------------------"); + System.out.println(); + + System.out.println("================================================================="); + System.out.println(); + System.out.println(">>> Invoking Main class now >>>"); + System.out.println(); + System.out.flush(); + } - AMRMCallBackHandler callBackHandler = new AMRMCallBackHandler(); - // TODO: OYA: make heartbeat interval configurable - // TODO: OYA: make heartbeat interval higher to put less load on RM, but lower than timeout - amRmClientAsync = AMRMClientAsync.createAMRMClientAsync(amRmClient, 60000, callBackHandler); + private void registerWithRM() throws IOException, YarnException { + // TODO: OYA: make heartbeat interval configurable & make interval higher to put less load on RM, but lower than timeout + amRmClientAsync = amRmClientAsyncFactory.createAMRMClientAsync(60000); amRmClientAsync.init(new Configuration(launcherJobConf)); amRmClientAsync.start(); @@ -255,28 +309,24 @@ public class LauncherAM { amRmClientAsync.registerApplicationMaster("", 0, ""); } - private static void unregisterWithRM(FinalApplicationStatus status, String message) throws YarnException, IOException { + private void unregisterWithRM(OozieActionResult actionResult, String message) throws YarnException, IOException { if (amRmClientAsync != null) { System.out.println("Stopping AM"); try { message = (message == null) ? "" : message; // tracking url is determined automatically - amRmClientAsync.unregisterApplicationMaster(status, message, ""); - } catch (YarnException ex) { - System.err.println("Error un-registering AM client"); - throw ex; - } catch (IOException ex) { + amRmClientAsync.unregisterApplicationMaster(actionResult.getYarnStatus(), message, ""); + } catch (Exception ex) { System.err.println("Error un-registering AM client"); throw ex; } finally { amRmClientAsync.stop(); - amRmClientAsync = null; } } } // Method to execute the prepare actions - private static void executePrepare(UserGroupInformation ugi) throws Exception { + private void executePrepare(UserGroupInformation ugi) throws Exception { Exception e = ugi.doAs(new PrivilegedAction<Exception>() { @Override public Exception run() { @@ -286,7 +336,7 @@ public class LauncherAM { if (prepareXML.length() != 0) { Configuration actionConf = new Configuration(launcherJobConf); actionConf.addResource(ACTION_CONF_XML); - PrepareActionsDriver.doOperations(prepareXML, actionConf); + prepareHandler.prepareAction(prepareXML, actionConf); } else { System.out.println("There are no prepare actions to execute."); } @@ -304,17 +354,13 @@ public class LauncherAM { } } - // FIXME - figure out what is actually needed here - private static void setupMainConfiguration() throws IOException { -// Path pathNew = new Path(new Path(actionDir, ACTION_CONF_XML), new Path(new File(ACTION_CONF_XML).getAbsolutePath())); -// FileSystem fs = FileSystem.get(pathNew.toUri(), getJobConf()); -// fs.copyToLocalFile(new Path(actionDir, ACTION_CONF_XML), new Path(new File(ACTION_CONF_XML).getAbsolutePath())); - - System.setProperty("oozie.launcher.job.id", launcherJobConf.get("oozie.job.id")); -// System.setProperty(OOZIE_JOB_ID, launcherJobConf.get(OOZIE_JOB_ID)); -// System.setProperty(OOZIE_ACTION_ID, launcherJobConf.get(OOZIE_ACTION_ID)); - System.setProperty("oozie.action.conf.xml", new File(ACTION_CONF_XML).getAbsolutePath()); - System.setProperty(ACTION_PREFIX + ACTION_DATA_EXTERNAL_CHILD_IDS, new File(ACTION_DATA_EXTERNAL_CHILD_IDS).getAbsolutePath()); + private void setupMainConfiguration() throws IOException { + System.setProperty(OOZIE_LAUNCHER_JOB_ID, launcherJobConf.get(OOZIE_JOB_ID)); + System.setProperty(OOZIE_JOB_ID, launcherJobConf.get(OOZIE_JOB_ID)); + System.setProperty(OOZIE_ACTION_ID, launcherJobConf.get(OOZIE_ACTION_ID)); + System.setProperty(OOZIE_ACTION_CONF_XML, new File(ACTION_CONF_XML).getAbsolutePath()); + System.setProperty(ACTION_PREFIX + ACTION_DATA_EXTERNAL_CHILD_IDS, + new File(ACTION_DATA_EXTERNAL_CHILD_IDS).getAbsolutePath()); System.setProperty(ACTION_PREFIX + ACTION_DATA_STATS, new File(ACTION_DATA_STATS).getAbsolutePath()); System.setProperty(ACTION_PREFIX + ACTION_DATA_NEW_ID, new File(ACTION_DATA_NEW_ID).getAbsolutePath()); System.setProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS, new File(ACTION_DATA_OUTPUT_PROPS).getAbsolutePath()); @@ -326,33 +372,28 @@ public class LauncherAM { } else { System.setProperty("oozie.job.launch.time", String.valueOf(System.currentTimeMillis())); } - -// String actionConfigClass = getJobConf().get(OOZIE_ACTION_CONFIG_CLASS); -// if (actionConfigClass != null) { -// System.setProperty(OOZIE_ACTION_CONFIG_CLASS, actionConfigClass); -// } } - private static FinalApplicationStatus runActionMain(final String[] mainArgs, final ErrorHolder eHolder, UserGroupInformation ugi) { - final AtomicReference<FinalApplicationStatus> finalStatus = new AtomicReference<FinalApplicationStatus>(FinalApplicationStatus.FAILED); + private boolean runActionMain(final String[] mainArgs, final ErrorHolder eHolder, UserGroupInformation ugi) { + // using AtomicBoolean because we want to modify it inside run() + final AtomicBoolean actionMainExecutedProperly = new AtomicBoolean(false); ugi.doAs(new PrivilegedAction<Void>() { @Override public Void run() { - LauncherSecurityManager secMan = new LauncherSecurityManager(); try { Class<?> klass = launcherJobConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class); System.out.println("Launcher class: " + klass.toString()); System.out.flush(); Method mainMethod = klass.getMethod("main", String[].class); // Enable LauncherSecurityManager to catch System.exit calls - secMan.set(); + launcherSecurityManager.set(); mainMethod.invoke(null, (Object) mainArgs); System.out.println(); System.out.println("<<< Invocation of Main class completed <<<"); System.out.println(); - finalStatus.set(FinalApplicationStatus.SUCCEEDED); + actionMainExecutedProperly.set(true); } catch (InvocationTargetException ex) { ex.printStackTrace(System.out); // Get what actually caused the exception @@ -362,24 +403,31 @@ public class LauncherAM { cause = cause.getCause(); } if (LauncherMainException.class.isInstance(cause)) { + int errorCode = ((LauncherMainException) ex.getCause()).getErrorCode(); String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS); eHolder.setErrorMessage("Main Class [" + mainClass + "], exit code [" + - ((LauncherMainException) ex.getCause()).getErrorCode() + "]"); + errorCode + "]"); + eHolder.setErrorCode(errorCode); } else if (SecurityException.class.isInstance(cause)) { - if (secMan.getExitInvoked()) { - System.out.println("Intercepting System.exit(" + secMan.getExitCode() - + ")"); - System.err.println("Intercepting System.exit(" + secMan.getExitCode() - + ")"); + if (launcherSecurityManager.getExitInvoked()) { + final int exitCode = launcherSecurityManager.getExitCode(); + System.out.println("Intercepting System.exit(" + exitCode + ")"); + System.err.println("Intercepting System.exit(" + exitCode + ")"); // if 0 main() method finished successfully // ignoring - eHolder.setErrorCode(secMan.getExitCode()); - if (eHolder.getErrorCode() != 0) { + eHolder.setErrorCode(exitCode); + if (exitCode != 0) { String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS); - eHolder.setErrorMessage("Main Class [" + mainClass + "], exit code [" + eHolder.getErrorCode() + "]"); + eHolder.setErrorMessage("Main Class [" + mainClass + "]," + + " exit code [" + eHolder.getErrorCode() + "]"); } else { - finalStatus.set(FinalApplicationStatus.SUCCEEDED); + actionMainExecutedProperly.set(true); } + } else { + // just SecurityException, no exit was invoked + eHolder.setErrorCode(0); + eHolder.setErrorCause(cause); + eHolder.setErrorMessage(cause.getMessage()); } } else { eHolder.setErrorMessage(cause.getMessage()); @@ -393,144 +441,55 @@ public class LauncherAM { System.out.flush(); System.err.flush(); // Disable LauncherSecurityManager - secMan.unset(); + launcherSecurityManager.unset(); } return null; } }); - return finalStatus.get(); + return actionMainExecutedProperly.get(); } - private static void handleActionData() throws IOException { + private void handleActionData() throws IOException { // external child IDs - String externalChildIdsProp = System.getProperty(ACTION_PREFIX - + ACTION_DATA_EXTERNAL_CHILD_IDS); - if (externalChildIdsProp != null) { - File externalChildIDs = new File(externalChildIdsProp); - if (externalChildIDs.exists()) { - actionData.put(ACTION_DATA_EXTERNAL_CHILD_IDS, getLocalFileContentStr(externalChildIDs, "", -1)); - } - } + processActionData(ACTION_PREFIX + ACTION_DATA_EXTERNAL_CHILD_IDS, null, ACTION_DATA_EXTERNAL_CHILD_IDS, -1, ACTIONOUTPUTTYPE_EXT_CHILD_ID); // external stats - String statsProp = System.getProperty(ACTION_PREFIX + ACTION_DATA_STATS); - if (statsProp != null) { - File actionStatsData = new File(statsProp); - if (actionStatsData.exists()) { - int statsMaxOutputData = launcherJobConf.getInt(CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, - Integer.MAX_VALUE); - actionData.put(ACTION_DATA_STATS, - getLocalFileContentStr(actionStatsData, "Stats", statsMaxOutputData)); - } - } + processActionData(ACTION_PREFIX + ACTION_DATA_STATS, CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, ACTION_DATA_STATS, Integer.MAX_VALUE, ACTIONOUTPUTTYPE_STATS); // output data - String outputProp = System.getProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS); - if (outputProp != null) { - File actionOutputData = new File(outputProp); - if (actionOutputData.exists()) { - int maxOutputData = launcherJobConf.getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024); - actionData.put(ACTION_DATA_OUTPUT_PROPS, - getLocalFileContentStr(actionOutputData, "Output", maxOutputData)); - } - } + processActionData(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS, CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, ACTION_DATA_OUTPUT_PROPS, 2048, ACTIONOUTPUTTYPE_OUTPUT); // id swap - String newIdProp = System.getProperty(ACTION_PREFIX + ACTION_DATA_NEW_ID); - if (newIdProp != null) { - File newId = new File(newIdProp); - if (newId.exists()) { - actionData.put(ACTION_DATA_NEW_ID, getLocalFileContentStr(newId, "", -1)); - } - } + processActionData(ACTION_PREFIX + ACTION_DATA_NEW_ID, null, ACTION_DATA_NEW_ID, -1, ACTIONOUTPUTTYPE_ID_SWAP); } - public static String getLocalFileContentStr(File file, String type, int maxLen) throws IOException { - StringBuilder sb = new StringBuilder(); - Reader reader = null; - try { - reader = new BufferedReader(new FileReader(file)); - char[] buffer = new char[2048]; - int read; - int count = 0; - while ((read = reader.read(buffer)) > -1) { - count += read; - if (maxLen > -1 && count > maxLen) { - throw new IOException(type + " data exceeds its limit [" + maxLen + "]"); - } - sb.append(buffer, 0, read); - } - } finally { - if (reader != null) { - reader.close(); - } - } - return sb.toString(); - } - - private static void uploadActionDataToHDFS(UserGroupInformation ugi) throws IOException { - IOException ioe = ugi.doAs(new PrivilegedAction<IOException>() { - @Override - public IOException run() { - Path finalPath = new Path(actionDir, ACTION_DATA_SEQUENCE_FILE); - // upload into sequence file - System.out.println("Oozie Launcher, uploading action data to HDFS sequence file: " - + new Path(actionDir, ACTION_DATA_SEQUENCE_FILE).toUri()); + private void processActionData(String propertyName, String maxSizePropertyName, String actionDataPropertyName, int maxSizeDefault, String type) throws IOException { + String propValue = System.getProperty(propertyName); + int maxSize = maxSizeDefault; - SequenceFile.Writer wr = null; - try { - wr = SequenceFile.createWriter(launcherJobConf, - SequenceFile.Writer.file(finalPath), - SequenceFile.Writer.keyClass(Text.class), - SequenceFile.Writer.valueClass(Text.class)); - if (wr != null) { - Set<String> keys = actionData.keySet(); - for (String propsKey : keys) { - wr.append(new Text(propsKey), new Text(actionData.get(propsKey))); - } - } else { - throw new IOException("SequenceFile.Writer is null for " + finalPath); - } - } catch (IOException e) { - e.printStackTrace(); - return e; - } finally { - if (wr != null) { - try { - wr.close(); - } catch (IOException e) { - e.printStackTrace(); - return e; - } - } - } + if (maxSizePropertyName != null) { + maxSize = launcherJobConf.getInt(maxSizePropertyName, maxSizeDefault); + } - return null; + if (propValue != null) { + File actionDataFile = new File(propValue); + if (localFsOperations.fileExists(actionDataFile)) { + actionData.put(actionDataPropertyName, localFsOperations.getLocalFileContentAsString(actionDataFile, type, maxSize)); } - }); - - if (ioe != null) { - throw ioe; } } - private static void failLauncher(int errorCode, String message, Throwable ex) { - ErrorHolder eHolder = new ErrorHolder(); - eHolder.setErrorCode(errorCode); - eHolder.setErrorMessage(message); - eHolder.setErrorCause(ex); - failLauncher(eHolder); - } - - private static void failLauncher(ErrorHolder eHolder) { - if (eHolder.getErrorCause() != null) { + private void updateActionDataWithFailure(ErrorHolder eHolder, Map<String, String> actionData) { + if (eHolder.getErrorCause() != null && eHolder.getErrorCause().getMessage() != null) { eHolder.setErrorMessage(eHolder.getErrorMessage() + ", " + eHolder.getErrorCause().getMessage()); } + Properties errorProps = new Properties(); errorProps.setProperty("error.code", Integer.toString(eHolder.getErrorCode())); - errorProps.setProperty("error.reason", eHolder.getErrorMessage()); + String errorMessage = eHolder.getErrorMessage() == null ? "<empty>" : eHolder.getErrorMessage(); + errorProps.setProperty("error.reason", errorMessage); if (eHolder.getErrorCause() != null) { if (eHolder.getErrorCause().getMessage() != null) { errorProps.setProperty("exception.message", eHolder.getErrorCause().getMessage()); @@ -541,18 +500,19 @@ public class LauncherAM { pw.close(); errorProps.setProperty("exception.stacktrace", sw.toString()); } + StringWriter sw = new StringWriter(); try { errorProps.store(sw, ""); sw.close(); - actionData.put(ACTION_DATA_ERROR_PROPS, sw.toString()); + actionData.put(LauncherAM.ACTION_DATA_ERROR_PROPS, sw.toString()); // external child IDs - String externalChildIdsProp = System.getProperty(ACTION_PREFIX + ACTION_DATA_EXTERNAL_CHILD_IDS); + String externalChildIdsProp = System.getProperty(LauncherAM.ACTION_PREFIX + LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS); if (externalChildIdsProp != null) { File externalChildIDs = new File(externalChildIdsProp); - if (externalChildIDs.exists()) { - actionData.put(ACTION_DATA_EXTERNAL_CHILD_IDS, getLocalFileContentStr(externalChildIDs, "", -1)); + if (localFsOperations.fileExists(externalChildIDs)) { + actionData.put(LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS, localFsOperations.getLocalFileContentAsString(externalChildIDs, ACTIONOUTPUTTYPE_EXT_CHILD_ID, -1)); } } } catch (IOException ioe) { @@ -568,50 +528,17 @@ public class LauncherAM { } } - private static class AMRMCallBackHandler implements AMRMClientAsync.CallbackHandler { - @Override - public void onContainersCompleted(List<ContainerStatus> containerStatuses) { - //noop - } - - @Override - public void onContainersAllocated(List<Container> containers) { - //noop - } - - @Override - public void onShutdownRequest() { - failLauncher(0, "ResourceManager requested AM Shutdown", null); - // TODO: OYA: interrupt? - } - - @Override - public void onNodesUpdated(List<NodeReport> nodeReports) { - //noop - } - - @Override - public float getProgress() { - return 0.5f; //TODO: OYA: maybe some action types can report better progress? - } - - @Override - public void onError(final Throwable ex) { - failLauncher(0, ex.getMessage(), ex); - // TODO: OYA: interrupt? - } - } - - public static String[] getMainArguments(Configuration conf) { + private String[] getMainArguments(Configuration conf) { String[] args = new String[conf.getInt(CONF_OOZIE_ACTION_MAIN_ARG_COUNT, 0)]; for (int i = 0; i < args.length; i++) { args[i] = conf.get(CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i); } + return args; } - private static class LauncherSecurityManager extends SecurityManager { + public static class LauncherSecurityManager extends SecurityManager { private boolean exitInvoked; private int exitCode; private SecurityManager securityManager; @@ -666,69 +593,20 @@ public class LauncherAM { } } + public enum OozieActionResult { + SUCCEEDED(FinalApplicationStatus.SUCCEEDED), + FAILED(FinalApplicationStatus.FAILED), + RUNNING(FinalApplicationStatus.SUCCEEDED); - /** - * Print files and directories in current directory. Will list files in the sub-directory (only 1 level deep) - */ - protected static void printContentsOfCurrentDir() { - File folder = new File("."); - System.out.println(); - System.out.println("Files in current dir:" + folder.getAbsolutePath()); - System.out.println("======================"); - - File[] listOfFiles = folder.listFiles(); - for (File fileName : listOfFiles) { - if (fileName.isFile()) { - System.out.println("File: " + fileName.getName()); - } else if (fileName.isDirectory()) { - System.out.println("Dir: " + fileName.getName()); - File subDir = new File(fileName.getName()); - File[] moreFiles = subDir.listFiles(); - for (File subFileName : moreFiles) { - if (subFileName.isFile()) { - System.out.println(" File: " + subFileName.getName()); - } else if (subFileName.isDirectory()) { - System.out.println(" Dir: " + subFileName.getName()); - } - } - } - } - } - - protected static Configuration readLauncherConf() { - File confFile = new File(LAUNCHER_JOB_CONF_XML); - Configuration conf = new Configuration(false); - conf.addResource(new Path(confFile.getAbsolutePath())); - return conf; - } - - protected static class ErrorHolder { - private int errorCode = 0; - private Throwable errorCause = null; - private String errorMessage = null; - - public int getErrorCode() { - return errorCode; - } - - public void setErrorCode(int errorCode) { - this.errorCode = errorCode; - } - - public Throwable getErrorCause() { - return errorCause; - } - - public void setErrorCause(Throwable errorCause) { - this.errorCause = errorCause; - } + // YARN-equivalent status + private FinalApplicationStatus yarnStatus; - public String getErrorMessage() { - return errorMessage; + OozieActionResult(FinalApplicationStatus yarnStatus) { + this.yarnStatus = yarnStatus; } - public void setErrorMessage(String errorMessage) { - this.errorMessage = errorMessage; + public FinalApplicationStatus getYarnStatus() { + return yarnStatus; } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java index 23648b8..2972658 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java @@ -18,7 +18,7 @@ package org.apache.oozie.action.hadoop; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.oozie.action.hadoop.LauncherAM.OozieActionResult; import java.io.IOException; import java.net.HttpURLConnection; @@ -26,7 +26,6 @@ import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.Proxy; import java.net.URL; -import java.util.EnumSet; // Adapted from org.apache.hadoop.mapreduce.v2.app.JobEndNotifier /** @@ -36,7 +35,6 @@ import java.util.EnumSet; public class LauncherAMCallbackNotifier { private static final String OOZIE_LAUNCHER_CALLBACK = "oozie.launcher.callback."; private static final int OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX = 5000; - private static final EnumSet<FinalApplicationStatus> FAILED_APPLICATION_STATES = EnumSet.of(FinalApplicationStatus.KILLED, FinalApplicationStatus.FAILED); public static final String OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS = OOZIE_LAUNCHER_CALLBACK + "retry.attempts"; public static final String OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL = OOZIE_LAUNCHER_CALLBACK + "retry.interval"; @@ -136,11 +134,11 @@ public class LauncherAMCallbackNotifier { /** * Notify a server of the completion of a submitted job. - * @param finalStatus The Application Status + * @param actionResult The Action Result (failed/succeeded/running) * * @throws InterruptedException */ - public void notifyURL(FinalApplicationStatus finalStatus, boolean backgroundAction) throws InterruptedException { + public void notifyURL(OozieActionResult actionResult) throws InterruptedException { // Do we need job-end notification? if (userUrl == null) { System.out.println("Callback notification URL not set, skipping."); @@ -149,12 +147,7 @@ public class LauncherAMCallbackNotifier { //Do string replacements for final status if (userUrl.contains(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN)) { - // only send back "RUNNING" if the submission was successful - if (backgroundAction && !FAILED_APPLICATION_STATES.contains(finalStatus)) { - userUrl = userUrl.replace(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN, "RUNNING"); - } else { - userUrl = userUrl.replace(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN, finalStatus.toString()); - } + userUrl = userUrl.replace(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN, actionResult.toString()); } // Create the URL, ensure sanity http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifierFactory.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifierFactory.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifierFactory.java new file mode 100644 index 0000000..688424b --- /dev/null +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifierFactory.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import org.apache.hadoop.conf.Configuration; + +public class LauncherAMCallbackNotifierFactory { + + public LauncherAMCallbackNotifier createCallbackNotifier(Configuration conf) { + return new LauncherAMCallbackNotifier(conf); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LocalFsOperations.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LocalFsOperations.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LocalFsOperations.java new file mode 100644 index 0000000..011ce93 --- /dev/null +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LocalFsOperations.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileVisitOption; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.EnumSet; + +import org.apache.hadoop.conf.Configuration; + +public class LocalFsOperations { + private static final int WALK_DEPTH = 2; + + /** + * Reads the launcher configuration "launcher.xml" + * @return Configuration object + */ + public Configuration readLauncherConf() { + File confFile = new File(LauncherAM.LAUNCHER_JOB_CONF_XML); + Configuration conf = new Configuration(false); + conf.addResource(new org.apache.hadoop.fs.Path(confFile.getAbsolutePath())); + return conf; + } + + /** + * Print files and directories in current directory. Will list files in the sub-directory (only 2 level deep) + * @throws IOException + */ + public void printContentsOfDir(File folder) throws IOException { + System.out.println(); + System.out.println("Files in current dir:" + folder.getAbsolutePath()); + System.out.println("======================"); + + final Path root = folder.toPath(); + Files.walkFileTree(root, EnumSet.of(FileVisitOption.FOLLOW_LINKS), WALK_DEPTH, new SimpleFileVisitor<Path>() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + if (attrs.isRegularFile()) { + System.out.println(" File: " + root.relativize(file)); + } else if (attrs.isDirectory()) { + System.out.println(" Dir: " + root.relativize(file) + "/"); + } + + return FileVisitResult.CONTINUE; + } + }); + } + + /** + * Returns the contents of a file as string. + * + * @param file the File object which represents the file to be read + * @param type Type of the file + * @param maxLen Maximum allowed length + * @return The file contents as string + * @throws IOException if the file is bigger than maxLen or there is any I/O error + * @throws FileNotFoundException if the file does not exist + */ + public String getLocalFileContentAsString(File file, String type, int maxLen) throws IOException { + if (file.exists()) { + if (maxLen > -1 && file.length() > maxLen) { + throw new IOException(type + " data exceeds its limit [" + maxLen + "]"); + } + + return com.google.common.io.Files.toString(file, StandardCharsets.UTF_8); + } else { + throw new FileNotFoundException("File not found: " + file.toPath().toAbsolutePath()); + } + } + + /** + * Checks if a given File exists or not. This method helps writing unit tests. + */ + public boolean fileExists(File file) { + return file.exists(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java index 4a51d48..cb5b1ac 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java @@ -38,7 +38,9 @@ import javax.xml.parsers.ParserConfigurationException; * Utility class to perform operations on the prepare block of Workflow * */ +@Deprecated public class PrepareActionsDriver { + private static final PrepareActionsHandler prepareHandler = new PrepareActionsHandler(); /** * Method to parse the prepare XML and execute the corresponding prepare actions @@ -48,41 +50,7 @@ public class PrepareActionsDriver { */ static void doOperations(String prepareXML, Configuration conf) throws IOException, SAXException, ParserConfigurationException, LauncherException { - Document doc = getDocumentFromXML(prepareXML); - doc.getDocumentElement().normalize(); - - // Get the list of child nodes, basically, each one corresponding to a separate action - NodeList nl = doc.getDocumentElement().getChildNodes(); - LauncherURIHandlerFactory factory = new LauncherURIHandlerFactory(conf); - - for (int i = 0; i < nl.getLength(); ++i) { - Node n = nl.item(i); - String operation = n.getNodeName(); - if (n.getAttributes() == null || n.getAttributes().getNamedItem("path") == null) { - continue; - } - String pathStr = n.getAttributes().getNamedItem("path").getNodeValue().trim(); - // use Path to avoid URIsyntax error caused by square bracket in glob - URI uri = new Path(pathStr).toUri(); - LauncherURIHandler handler = factory.getURIHandler(uri); - execute(operation, uri, handler, conf); - } - } - - /** - * Method to execute the prepare actions based on the command - * - * @param n Child node of the prepare XML - * @throws LauncherException - */ - private static void execute(String operation, URI uri, LauncherURIHandler handler, Configuration conf) - throws LauncherException { - if (operation.equals("delete")) { - handler.delete(uri, conf); - } - else if (operation.equals("mkdir")) { - handler.create(uri, conf); - } + prepareHandler.prepareAction(prepareXML, conf); } // Method to return the document from the prepare XML block http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsHandler.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsHandler.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsHandler.java new file mode 100644 index 0000000..b5377b1 --- /dev/null +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsHandler.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.w3c.dom.Document; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.xml.sax.SAXException; + +public class PrepareActionsHandler { + + /** + * Method to parse the prepare XML and execute the corresponding prepare actions + * + * @param prepareXML Prepare XML block in string format + * @throws LauncherException + */ + public void prepareAction(String prepareXML, Configuration conf) + throws IOException, SAXException, ParserConfigurationException, LauncherException { + Document doc = getDocumentFromXML(prepareXML); + doc.getDocumentElement().normalize(); + + // Get the list of child nodes, basically, each one corresponding to a separate action + NodeList nl = doc.getDocumentElement().getChildNodes(); + LauncherURIHandlerFactory factory = new LauncherURIHandlerFactory(conf); + + for (int i = 0; i < nl.getLength(); ++i) { + Node n = nl.item(i); + String operation = n.getNodeName(); + if (n.getAttributes() == null || n.getAttributes().getNamedItem("path") == null) { + continue; + } + String pathStr = n.getAttributes().getNamedItem("path").getNodeValue().trim(); + // use Path to avoid URIsyntax error caused by square bracket in glob + URI uri = new Path(pathStr).toUri(); + LauncherURIHandler handler = factory.getURIHandler(uri); + execute(operation, uri, handler, conf); + } + } + + private void execute(String operation, URI uri, LauncherURIHandler handler, Configuration conf) + throws LauncherException { + + switch (operation) { + case "delete": + handler.delete(uri, conf); + break; + + case "mkdir": + handler.create(uri, conf); + break; + + default: + System.out.println("Warning: unknown prepare operation " + operation + " -- skipping"); + } + } + + // Method to return the document from the prepare XML block + static Document getDocumentFromXML(String prepareXML) throws ParserConfigurationException, SAXException, + IOException { + DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance(); + docBuilderFactory.setNamespaceAware(true); + // support for includes in the xml file + docBuilderFactory.setXIncludeAware(true); + // ignore all comments inside the xml file + docBuilderFactory.setIgnoringComments(true); + docBuilderFactory.setExpandEntityReferences(false); + docBuilderFactory.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true); + DocumentBuilder docBuilder = docBuilderFactory.newDocumentBuilder(); + InputStream is = new ByteArrayInputStream(prepareXML.getBytes("UTF-8")); + return docBuilder.parse(is); + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/SequenceFileWriterFactory.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/SequenceFileWriterFactory.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/SequenceFileWriterFactory.java new file mode 100644 index 0000000..c73c13f --- /dev/null +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/SequenceFileWriterFactory.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; + +public class SequenceFileWriterFactory { + + public SequenceFile.Writer createSequenceFileWriter(Configuration launcherJobConf, Path finalPath, Class<?> keyClass, Class<?> valueClass) throws IOException { + return SequenceFile.createWriter(launcherJobConf, + SequenceFile.Writer.file(finalPath), + SequenceFile.Writer.keyClass(keyClass), + SequenceFile.Writer.valueClass(valueClass)); + } +}
