Repository: oozie Updated Branches: refs/heads/master 249280cbf -> abb508780
OOZIE-2035 NotificationXCommand should support proxy Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/abb50878 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/abb50878 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/abb50878 Branch: refs/heads/master Commit: abb50878083f5bd1671dc27d7861c97bea87b999 Parents: 249280c Author: Purshotam Shah <[email protected]> Authored: Tue Dec 2 11:01:25 2014 -0800 Committer: Purshotam Shah <[email protected]> Committed: Tue Dec 2 11:01:25 2014 -0800 ---------------------------------------------------------------------- .../org/apache/oozie/client/OozieClient.java | 4 + .../oozie/command/NotificationXCommand.java | 137 +++++++++++++++++++ .../coord/CoordActionNotificationXCommand.java | 94 ++----------- .../oozie/command/wf/ActionKillXCommand.java | 2 +- .../oozie/command/wf/ActionStartXCommand.java | 2 +- .../apache/oozie/command/wf/ActionXCommand.java | 2 +- .../apache/oozie/command/wf/KillXCommand.java | 2 +- .../oozie/command/wf/NotificationXCommand.java | 137 ------------------- .../apache/oozie/command/wf/ResumeXCommand.java | 2 +- .../apache/oozie/command/wf/SignalXCommand.java | 8 +- .../oozie/command/wf/SuspendXCommand.java | 2 +- .../wf/WorkflowNotificationXCommand.java | 136 ++++++++++++++++++ core/src/main/resources/oozie-default.xml | 8 ++ .../TestCoordActionNotificationXCommand.java | 3 +- .../command/wf/TestNotificationXCommand.java | 78 ----------- .../wf/TestWorkflowNotificationXCommand.java | 79 +++++++++++ .../site/twiki/CoordinatorFunctionalSpec.twiki | 3 + .../src/site/twiki/WorkflowFunctionalSpec.twiki | 5 + release-log.txt | 1 + 19 files changed, 395 insertions(+), 310 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/client/src/main/java/org/apache/oozie/client/OozieClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/client/OozieClient.java b/client/src/main/java/org/apache/oozie/client/OozieClient.java index 5e53a18..23a8f21 100644 --- a/client/src/main/java/org/apache/oozie/client/OozieClient.java +++ b/client/src/main/java/org/apache/oozie/client/OozieClient.java @@ -103,10 +103,14 @@ public class OozieClient { public static final String WORKFLOW_NOTIFICATION_URL = "oozie.wf.workflow.notification.url"; + public static final String WORKFLOW_NOTIFICATION_PROXY = "oozie.wf.workflow.notification.proxy"; + public static final String ACTION_NOTIFICATION_URL = "oozie.wf.action.notification.url"; public static final String COORD_ACTION_NOTIFICATION_URL = "oozie.coord.action.notification.url"; + public static final String COORD_ACTION_NOTIFICATION_PROXY = "oozie.coord.action.notification.proxy"; + public static final String RERUN_SKIP_NODES = "oozie.wf.rerun.skip.nodes"; public static final String RERUN_FAIL_NODES = "oozie.wf.rerun.failnodes"; http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/main/java/org/apache/oozie/command/NotificationXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/NotificationXCommand.java b/core/src/main/java/org/apache/oozie/command/NotificationXCommand.java new file mode 100644 index 0000000..68359aa --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/NotificationXCommand.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.command; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.InetSocketAddress; +import java.net.Proxy; +import java.net.URL; +import org.apache.oozie.service.ConfigurationService; +import org.apache.oozie.util.LogUtils; +import org.apache.oozie.util.XLog; + +public abstract class NotificationXCommand extends XCommand<Void> { + + public static final String NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY = "oozie.notification.url.connection.timeout"; + public static final String NOTIFICATION_PROXY_KEY = "oozie.notification.proxy"; + + protected int retries = 0; + protected String jobId; + protected String url; + protected String proxyConf; + + public NotificationXCommand(String name, String type, int priority) { + super(name, type, priority); + } + + @Override + final protected boolean isLockRequired() { + return false; + } + + @Override + public String getEntityKey() { + return jobId; + } + + @Override + protected void verifyPrecondition() throws CommandException, PreconditionException { + + } + + @Override + protected Void execute() throws CommandException { + sendNotification(); + return null; + } + + @Override + protected void setLogInfo() { + LogUtils.setLogInfo(jobId); + } + + protected Proxy getProxy(String proxyConf) { + // Configure the proxy to use if its set. It should be set like + // proxyType@proxyHostname:port + if (proxyConf != null && !proxyConf.trim().equals("") && proxyConf.lastIndexOf(":") != -1) { + int typeIndex = proxyConf.indexOf("@"); + Proxy.Type proxyType = Proxy.Type.HTTP; + if (typeIndex != -1 && proxyConf.substring(0, typeIndex).compareToIgnoreCase("socks") == 0) { + proxyType = Proxy.Type.SOCKS; + } + String hostname = proxyConf.substring(typeIndex + 1, proxyConf.lastIndexOf(":")); + String portConf = proxyConf.substring(proxyConf.lastIndexOf(":") + 1); + try { + int port = Integer.parseInt(portConf); + LOG.info("Workflow notification using proxy type \"" + proxyType + "\" hostname \"" + hostname + + "\" and port \"" + port + "\""); + return new Proxy(proxyType, new InetSocketAddress(hostname, port)); + } + catch (NumberFormatException nfe) { + LOG.warn("Workflow notification couldn't parse configured proxy's port " + portConf + + ". Not going to use a proxy"); + } + } + return Proxy.NO_PROXY; + } + + protected void handleRetry() { + if (retries < 3) { + retries++; + this.resetUsed(); + queue(this, 60 * 1000); + } + else { + LOG.warn(XLog.OPS, "could not send notification [{0}]", url); + } + } + + protected void sendNotification() { + if (url != null) { + Proxy proxy = getProxy(proxyConf); + try { + URL url = new URL(this.url); + HttpURLConnection urlConn = (HttpURLConnection) url.openConnection(proxy); + urlConn.setConnectTimeout(getTimeOut()); + urlConn.setReadTimeout(getTimeOut()); + if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) { + handleRetry(); + } + } + catch (IOException ex) { + handleRetry(); + } + } + else { + LOG.info("No Notification URL is defined. Therefore nothing to notify for job " + jobId); + + } + + } + + private int getTimeOut() { + return ConfigurationService.getInt(NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY); + } + + public void setRetry(int retries) { + this.retries = retries; + + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/main/java/org/apache/oozie/command/coord/CoordActionNotificationXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionNotificationXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionNotificationXCommand.java index 2556152..d51f0d7 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionNotificationXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionNotificationXCommand.java @@ -20,52 +20,40 @@ package org.apache.oozie.command.coord; import java.io.IOException; import java.io.StringReader; -import java.net.HttpURLConnection; -import java.net.URL; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.ErrorCode; import org.apache.oozie.client.OozieClient; import org.apache.oozie.command.CommandException; -import org.apache.oozie.command.PreconditionException; -import org.apache.oozie.command.wf.NotificationXCommand; -import org.apache.oozie.service.ConfigurationService; +import org.apache.oozie.command.NotificationXCommand; import org.apache.oozie.service.Services; import org.apache.oozie.util.LogUtils; import org.apache.oozie.util.ParamChecker; import org.apache.oozie.util.XConfiguration; -import org.apache.oozie.util.XLog; /** * This class will send the notification for the coordinator action */ -public class CoordActionNotificationXCommand extends CoordinatorXCommand<Void> { +public class CoordActionNotificationXCommand extends NotificationXCommand { private final CoordinatorActionBean actionBean; private static final String STATUS_PATTERN = "\\$status"; private static final String ACTION_ID_PATTERN = "\\$actionId"; - //this variable is package private only for test purposes + // this variable is package private only for test purposes int retries = 0; public CoordActionNotificationXCommand(CoordinatorActionBean actionBean) { super("coord_action_notification", "coord_action_notification", 0); ParamChecker.notNull(actionBean, "Action Bean"); this.actionBean = actionBean; - } + jobId = actionBean.getId(); - @Override - protected void setLogInfo() { - LogUtils.setLogInfo(actionBean.getId()); } - /* (non-Javadoc) - * @see org.apache.oozie.command.XCommand#execute() - */ @Override - protected Void execute() throws CommandException { - LOG.info("STARTED Coordinator Notification actionId=" + actionBean.getId() + " : " + actionBean.getStatus()); + protected void loadState() throws CommandException { Configuration conf; try { conf = new XConfiguration(new StringReader(actionBean.getRunConf())); @@ -74,78 +62,16 @@ public class CoordActionNotificationXCommand extends CoordinatorXCommand<Void> { LOG.warn("Configuration parse error. read from DB :" + actionBean.getRunConf()); throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1); } - String url = conf.get(OozieClient.COORD_ACTION_NOTIFICATION_URL); + url = conf.get(OozieClient.COORD_ACTION_NOTIFICATION_URL); if (url != null) { url = url.replaceAll(ACTION_ID_PATTERN, actionBean.getId()); url = url.replaceAll(STATUS_PATTERN, actionBean.getStatus().toString()); - LOG.debug("Notification URL :" + url); - try { - int timeout = ConfigurationService.getInt(NotificationXCommand - .NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY); - URL urlObj = new URL(url); - HttpURLConnection urlConn = (HttpURLConnection) urlObj.openConnection(); - urlConn.setConnectTimeout(timeout); - urlConn.setReadTimeout(timeout); - if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) { - handleRetry(url); - } - } - catch (IOException ex) { - handleRetry(url); - } - } - else { - LOG.info("No Notification URL is defined. Therefore nothing to notify for job " + actionBean.getJobId() - + " action ID " + actionBean.getId()); - } - LOG.info("ENDED Coordinator Notification actionId=" + actionBean.getId()); - return null; - } + proxyConf = conf.get(OozieClient.COORD_ACTION_NOTIFICATION_PROXY, + Services.get().getConf().get(NOTIFICATION_PROXY_KEY)); + LOG.debug("Proxy :" + proxyConf); - /** - * This method handles the retry for the coordinator action. - * - * @param url This is the URL where the notification has to be sent. - */ - private void handleRetry(String url) { - if (retries < 3) { - retries++; - this.resetUsed(); - queue(this, 60 * 1000); - } - else { - LOG.warn(XLog.OPS, "could not send notification [{0}]", url); } - } - - /* (non-Javadoc) - * @see org.apache.oozie.command.XCommand#getEntityKey() - */ - @Override - public String getEntityKey() { - return actionBean.getId(); - } - - /* (non-Javadoc) - * @see org.apache.oozie.command.XCommand#isLockRequired() - */ - @Override - protected boolean isLockRequired() { - return false; - } - - /* (non-Javadoc) - * @see org.apache.oozie.command.XCommand#loadState() - */ - @Override - protected void loadState() throws CommandException { + LOG.debug("Notification URL :" + url); LogUtils.setLogInfo(actionBean); } - - /* (non-Javadoc) - * @see org.apache.oozie.command.XCommand#verifyPrecondition() - */ - @Override - protected void verifyPrecondition() throws CommandException, PreconditionException { - } } http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java index 6073e4d..33498bf 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java @@ -155,7 +155,7 @@ public class ActionKillXCommand extends ActionXCommand<Void> { if(slaEvent != null) { insertList.add(slaEvent); } - queue(new NotificationXCommand(wfJob, wfAction)); + queue(new WorkflowNotificationXCommand(wfJob, wfAction)); } catch (ActionExecutorException ex) { wfAction.resetPending(); http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java index 373c942..d4048a1 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java @@ -261,7 +261,7 @@ public class ActionStartXCommand extends ActionXCommand<Void> { wfAction.setErrorInfo(START_DATA_MISSING, "Execution Started, but Start Data Missing from Action"); failJob(context); } else { - queue(new NotificationXCommand(wfJob, wfAction)); + queue(new WorkflowNotificationXCommand(wfJob, wfAction)); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java index 98fddfd..9e7243a 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java @@ -195,7 +195,7 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> { workflow.setStatus(WorkflowJob.Status.FAILED); action.setStatus(WorkflowAction.Status.FAILED); action.resetPending(); - queue(new NotificationXCommand(workflow, action)); + queue(new WorkflowNotificationXCommand(workflow, action)); queue(new KillXCommand(workflow.getId())); InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER_NAME, 1, getInstrumentation()); } http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java index cac1381..d39b93d 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java @@ -183,7 +183,7 @@ public class KillXCommand extends WorkflowXCommand<Void> { if (EventHandlerService.isEnabled()) { generateEvent(wfJob); } - queue(new NotificationXCommand(wfJob)); + queue(new WorkflowNotificationXCommand(wfJob)); } catch (JPAExecutorException e) { throw new CommandException(e); http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/main/java/org/apache/oozie/command/wf/NotificationXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/NotificationXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/NotificationXCommand.java deleted file mode 100644 index 0fc3d65..0000000 --- a/core/src/main/java/org/apache/oozie/command/wf/NotificationXCommand.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.command.wf; - -import org.apache.oozie.client.OozieClient; -import org.apache.oozie.WorkflowActionBean; -import org.apache.oozie.WorkflowJobBean; -import org.apache.oozie.command.CommandException; -import org.apache.oozie.command.PreconditionException; -import org.apache.oozie.service.ConfigurationService; -import org.apache.oozie.service.Services; -import org.apache.oozie.util.LogUtils; -import org.apache.oozie.util.ParamChecker; -import org.apache.oozie.util.XLog; - -import java.io.IOException; -import java.net.HttpURLConnection; -import java.net.URL; - -public class NotificationXCommand extends WorkflowXCommand<Void> { - - public static final String NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY = "oozie.notification.url.connection.timeout"; - - private static final String STATUS_PATTERN = "\\$status"; - private static final String JOB_ID_PATTERN = "\\$jobId"; - private static final String NODE_NAME_PATTERN = "\\$nodeName"; - - private String url; - private String id; - - //this variable is package private only for test purposes - int retries = 0; - - public NotificationXCommand(WorkflowJobBean workflow) { - super("job.notification", "job.notification", 0); - ParamChecker.notNull(workflow, "workflow"); - id = workflow.getId(); - url = workflow.getWorkflowInstance().getConf().get(OozieClient.WORKFLOW_NOTIFICATION_URL); - if (url != null) { - url = url.replaceAll(JOB_ID_PATTERN, workflow.getId()); - url = url.replaceAll(STATUS_PATTERN, workflow.getStatus().toString()); - } - } - - public NotificationXCommand(WorkflowJobBean workflow, WorkflowActionBean action) { - super("action.notification", "job.notification", 0); - ParamChecker.notNull(workflow, "workflow"); - ParamChecker.notNull(action, "action"); - id = action.getId(); - url = workflow.getWorkflowInstance().getConf().get(OozieClient.ACTION_NOTIFICATION_URL); - if (url != null) { - url = url.replaceAll(JOB_ID_PATTERN, workflow.getId()); - url = url.replaceAll(NODE_NAME_PATTERN, action.getName()); - if (action.isComplete()) { - url = url.replaceAll(STATUS_PATTERN, "T:" + action.getTransition()); - } - else { - url = url.replaceAll(STATUS_PATTERN, "S:" + action.getStatus().toString()); - } - } - } - - @Override - protected void setLogInfo() { - LogUtils.setLogInfo(id); - } - - @Override - protected boolean isLockRequired() { - return false; - } - - @Override - public String getEntityKey() { - return url; - } - - @Override - protected void loadState() throws CommandException { - } - - @Override - protected void verifyPrecondition() throws CommandException, PreconditionException { - } - - @Override - protected Void execute() throws CommandException { - if (url != null) { - int timeout = ConfigurationService.getInt(NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY); - try { - URL url = new URL(this.url); - HttpURLConnection urlConn = (HttpURLConnection) url.openConnection(); - urlConn.setConnectTimeout(timeout); - urlConn.setReadTimeout(timeout); - if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) { - handleRetry(); - } - } - catch (IOException ex) { - handleRetry(); - } - } - return null; - } - - private void handleRetry() { - if (retries < 3) { - retries++; - this.resetUsed(); - queue(this, 60 * 1000); - } - else { - LOG.warn(XLog.OPS, "could not send notification [{0}]", url); - } - } - - public String getUrl() { - return url; - } - -} http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java index 73c1447..edbd767 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java @@ -142,7 +142,7 @@ public class ResumeXCommand extends WorkflowXCommand<Void> { if (EventHandlerService.isEnabled()) { generateEvent(workflow); } - queue(new NotificationXCommand(workflow)); + queue(new WorkflowNotificationXCommand(workflow)); } return null; } http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java index 5ff0e5c..7ca8646 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java @@ -184,7 +184,7 @@ public class SignalXCommand extends WorkflowXCommand<Void> { // 2. Add SLA registration events for all WF_ACTIONS createSLARegistrationForAllActions(workflowInstance.getApp().getDefinition(), wfJob.getUser(), wfJob.getGroup(), wfJob.getConf()); - queue(new NotificationXCommand(wfJob)); + queue(new WorkflowNotificationXCommand(wfJob)); } else { throw new CommandException(ErrorCode.E0801, wfJob.getId()); @@ -209,7 +209,7 @@ public class SignalXCommand extends WorkflowXCommand<Void> { wfAction.resetPending(); if (!skipAction) { wfAction.setTransition(workflowInstance.getTransition(wfAction.getName())); - queue(new NotificationXCommand(wfJob, wfAction)); + queue(new WorkflowNotificationXCommand(wfJob, wfAction)); } updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_PENDING_TRANS, wfAction)); @@ -243,7 +243,7 @@ public class SignalXCommand extends WorkflowXCommand<Void> { wfJobErrorCode = actionToFail.getErrorCode(); wfJobErrorMsg = actionToFail.getErrorMessage(); } - queue(new NotificationXCommand(wfJob, actionToFail)); + queue(new WorkflowNotificationXCommand(wfJob, actionToFail)); SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED, SlaAppType.WORKFLOW_ACTION); if (slaEvent != null) { @@ -279,7 +279,7 @@ public class SignalXCommand extends WorkflowXCommand<Void> { if (slaEvent != null) { insertList.add(slaEvent); } - queue(new NotificationXCommand(wfJob)); + queue(new WorkflowNotificationXCommand(wfJob)); if (wfJob.getStatus() == WorkflowJob.Status.SUCCEEDED) { InstrumentUtils.incrJobCounter(INSTR_SUCCEEDED_JOBS_COUNTER_NAME, 1, getInstrumentation()); } http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java index 199af36..ef97990 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java @@ -70,7 +70,7 @@ public class SuspendXCommand extends WorkflowXCommand<Void> { updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, this.wfJobBean)); BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); - queue(new NotificationXCommand(this.wfJobBean)); + queue(new WorkflowNotificationXCommand(this.wfJobBean)); //Calling suspend recursively to handle parent workflow suspendParentWorkFlow(); } http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/main/java/org/apache/oozie/command/wf/WorkflowNotificationXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/WorkflowNotificationXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/WorkflowNotificationXCommand.java new file mode 100644 index 0000000..7ea0e1e --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/wf/WorkflowNotificationXCommand.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.wf; + +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.PreconditionException; +import org.apache.oozie.service.ConfigurationService; +import org.apache.oozie.util.LogUtils; +import org.apache.oozie.util.ParamChecker; +import org.apache.oozie.util.XLog; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; + +public class WorkflowNotificationXCommand extends WorkflowXCommand<Void> { + + public static final String NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY = "oozie.notification.url.connection.timeout"; + + private static final String STATUS_PATTERN = "\\$status"; + private static final String JOB_ID_PATTERN = "\\$jobId"; + private static final String NODE_NAME_PATTERN = "\\$nodeName"; + + private String url; + private String id; + + //this variable is package private only for test purposes + int retries = 0; + + public WorkflowNotificationXCommand(WorkflowJobBean workflow) { + super("job.notification", "job.notification", 0); + ParamChecker.notNull(workflow, "workflow"); + id = workflow.getId(); + url = workflow.getWorkflowInstance().getConf().get(OozieClient.WORKFLOW_NOTIFICATION_URL); + if (url != null) { + url = url.replaceAll(JOB_ID_PATTERN, workflow.getId()); + url = url.replaceAll(STATUS_PATTERN, workflow.getStatus().toString()); + } + } + + public WorkflowNotificationXCommand(WorkflowJobBean workflow, WorkflowActionBean action) { + super("action.notification", "job.notification", 0); + ParamChecker.notNull(workflow, "workflow"); + ParamChecker.notNull(action, "action"); + id = action.getId(); + url = workflow.getWorkflowInstance().getConf().get(OozieClient.ACTION_NOTIFICATION_URL); + if (url != null) { + url = url.replaceAll(JOB_ID_PATTERN, workflow.getId()); + url = url.replaceAll(NODE_NAME_PATTERN, action.getName()); + if (action.isComplete()) { + url = url.replaceAll(STATUS_PATTERN, "T:" + action.getTransition()); + } + else { + url = url.replaceAll(STATUS_PATTERN, "S:" + action.getStatus().toString()); + } + } + } + + @Override + protected void setLogInfo() { + LogUtils.setLogInfo(id); + } + + @Override + protected boolean isLockRequired() { + return false; + } + + @Override + public String getEntityKey() { + return url; + } + + @Override + protected void loadState() throws CommandException { + } + + @Override + protected void verifyPrecondition() throws CommandException, PreconditionException { + } + + @Override + protected Void execute() throws CommandException { + if (url != null) { + int timeout = ConfigurationService.getInt(NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY); + try { + URL url = new URL(this.url); + HttpURLConnection urlConn = (HttpURLConnection) url.openConnection(); + urlConn.setConnectTimeout(timeout); + urlConn.setReadTimeout(timeout); + if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) { + handleRetry(); + } + } + catch (IOException ex) { + handleRetry(); + } + } + return null; + } + + private void handleRetry() { + if (retries < 3) { + retries++; + this.resetUsed(); + queue(this, 60 * 1000); + } + else { + LOG.warn(XLog.OPS, "could not send notification [{0}]", url); + } + } + + public String getUrl() { + return url; + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index 6a768b3..3d07c6f 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -2350,4 +2350,12 @@ </description> </property> + <property> + <name>oozie.notification.proxy</name> + <value></value> + <description> + System level proxy setting for job notifications. + </description> + </property> + </configuration> http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionNotificationXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionNotificationXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionNotificationXCommand.java index b58ecd9..8ca404f 100644 --- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionNotificationXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionNotificationXCommand.java @@ -21,8 +21,9 @@ package org.apache.oozie.command.coord; import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.OozieClient; +import org.apache.oozie.command.NotificationXCommand; import org.apache.oozie.command.wf.HangServlet; -import org.apache.oozie.command.wf.NotificationXCommand; +import org.apache.oozie.command.wf.WorkflowNotificationXCommand; import org.apache.oozie.service.Services; import org.apache.oozie.test.EmbeddedServletContainer; import org.apache.oozie.test.XTestCase; http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/test/java/org/apache/oozie/command/wf/TestNotificationXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestNotificationXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestNotificationXCommand.java deleted file mode 100644 index ad2fbd7..0000000 --- a/core/src/test/java/org/apache/oozie/command/wf/TestNotificationXCommand.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.command.wf; - -import org.apache.oozie.WorkflowJobBean; -import org.apache.oozie.client.OozieClient; -import org.apache.oozie.client.WorkflowJob; -import org.apache.oozie.service.Services; -import org.apache.oozie.test.EmbeddedServletContainer; -import org.apache.oozie.test.XTestCase; -import org.apache.oozie.util.XConfiguration; -import org.apache.oozie.workflow.WorkflowInstance; -import org.junit.Assert; -import org.mockito.Mockito; - -public class TestNotificationXCommand extends XTestCase { - private EmbeddedServletContainer container; - - @Override - public void setUp() throws Exception { - super.setUp(); - setSystemProperty(NotificationXCommand.NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY, "50"); - Services services = new Services(); - services.init(); - container = new EmbeddedServletContainer("blah"); - container.addServletEndpoint("/hang/*", HangServlet.class); - container.start(); - } - - @Override - public void tearDown() throws Exception { - try { - container.stop(); - } - catch (Exception ex) { - } - try { - Services.get().destroy(); - } - catch (Exception ex) { - } - super.tearDown(); - } - - public void testWFNotificationTimeout() throws Exception { - XConfiguration conf = new XConfiguration(); - conf.set(OozieClient.WORKFLOW_NOTIFICATION_URL, container.getServletURL("/hang/*")); - WorkflowInstance wfi = Mockito.mock(WorkflowInstance.class); - Mockito.when(wfi.getConf()).thenReturn(conf); - WorkflowJobBean workflow = Mockito.mock(WorkflowJobBean.class); - Mockito.when(workflow.getId()).thenReturn("1"); - Mockito.when(workflow.getStatus()).thenReturn(WorkflowJob.Status.SUCCEEDED); - Mockito.when(workflow.getWorkflowInstance()).thenReturn(wfi); - NotificationXCommand command = new NotificationXCommand(workflow); - command.retries = 3; - long start = System.currentTimeMillis(); - command.call(); - long end = System.currentTimeMillis(); - Assert.assertTrue(end - start >= 50); - Assert.assertTrue(end - start < 10000); - } -} http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowNotificationXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowNotificationXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowNotificationXCommand.java new file mode 100644 index 0000000..b427180 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowNotificationXCommand.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.wf; + +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.client.WorkflowJob; +import org.apache.oozie.command.NotificationXCommand; +import org.apache.oozie.service.Services; +import org.apache.oozie.test.EmbeddedServletContainer; +import org.apache.oozie.test.XTestCase; +import org.apache.oozie.util.XConfiguration; +import org.apache.oozie.workflow.WorkflowInstance; +import org.junit.Assert; +import org.mockito.Mockito; + +public class TestWorkflowNotificationXCommand extends XTestCase { + private EmbeddedServletContainer container; + + @Override + public void setUp() throws Exception { + super.setUp(); + setSystemProperty(NotificationXCommand.NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY, "50"); + Services services = new Services(); + services.init(); + container = new EmbeddedServletContainer("blah"); + container.addServletEndpoint("/hang/*", HangServlet.class); + container.start(); + } + + @Override + public void tearDown() throws Exception { + try { + container.stop(); + } + catch (Exception ex) { + } + try { + Services.get().destroy(); + } + catch (Exception ex) { + } + super.tearDown(); + } + + public void testWFNotificationTimeout() throws Exception { + XConfiguration conf = new XConfiguration(); + conf.set(OozieClient.WORKFLOW_NOTIFICATION_URL, container.getServletURL("/hang/*")); + WorkflowInstance wfi = Mockito.mock(WorkflowInstance.class); + Mockito.when(wfi.getConf()).thenReturn(conf); + WorkflowJobBean workflow = Mockito.mock(WorkflowJobBean.class); + Mockito.when(workflow.getId()).thenReturn("1"); + Mockito.when(workflow.getStatus()).thenReturn(WorkflowJob.Status.SUCCEEDED); + Mockito.when(workflow.getWorkflowInstance()).thenReturn(wfi); + WorkflowNotificationXCommand command = new WorkflowNotificationXCommand(workflow); + command.retries = 3; + long start = System.currentTimeMillis(); + command.call(); + long end = System.currentTimeMillis(); + Assert.assertTrue(end - start >= 50); + Assert.assertTrue(end - start < 10000); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki b/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki index 7f8c0c6..8894649 100644 --- a/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki +++ b/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki @@ -3525,6 +3525,9 @@ See also [[WorkflowFunctionalSpec#WorkflowNotifications][Workflow Notifications] If the =oozie.coord.workflow.notification.url= property is present in the coordinator job properties when submitting the job, Oozie will make a notification to the provided URL when any of the coordinator's actions changes its status. +=oozie.coord.action.notification.proxy= property can be used to configure either a http or socks proxy. +The format is proxyHostname:port or proxyType@proxyHostname:port. If proxy type is not specified, it defaults to http. +For eg: myhttpproxyhost.mydomain.com:80 or [email protected]:1080. If the URL contains any of the following tokens, they will be replaced with the actual values by Oozie before making the notification: http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/docs/src/site/twiki/WorkflowFunctionalSpec.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/WorkflowFunctionalSpec.twiki b/docs/src/site/twiki/WorkflowFunctionalSpec.twiki index e6e52a0..21199e8 100644 --- a/docs/src/site/twiki/WorkflowFunctionalSpec.twiki +++ b/docs/src/site/twiki/WorkflowFunctionalSpec.twiki @@ -2245,6 +2245,10 @@ See also [[CoordinatorFunctionalSpec#CoordinatorNotifications][Coordinator Notif If the =oozie.wf.workflow.notification.url= property is present in the workflow job properties when submitting the job, Oozie will make a notification to the provided URL when the workflow job changes its status. +=oozie.wf.workflow.notification.proxy= property can be used to configure either a http or socks proxy. +The format is proxyHostname:port or proxyType@proxyHostname:port. If proxy type is not specified, it defaults to http. +For eg: myhttpproxyhost.mydomain.com:80 or [email protected]:1080. + If the URL contains any of the following tokens, they will be replaced with the actual values by Oozie before making the notification: @@ -2257,6 +2261,7 @@ the notification: If the =oozie.wf.action.notification.url= property is present in the workflow job properties when submitting the job, Oozie will make a notification to the provided URL every time the workflow job enters and exits an action node. For decision nodes, Oozie will send a single notification with the name of the first evaluation that resolved to =true=. +=oozie.wf.workflow.notification.proxy= property can be used to configure proxy, it should contain proxy host and port (xyz:4080). If the URL contains any of the following tokens, they will be replaced with the actual values by Oozie before making the notification: http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 4b5b050..9b8708e 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.2.0 release (trunk - unreleased) +OOZIE-2035 NotificationXCommand should support proxy (puru) OOZIE-2065 Oozie returns incorrect total action for coord dryrun (puru) OOZIE-2069 RecoveryService reads incorrect configuration (puru) OOZIE-2074 Compatibility issue with Yarn and Hadoop 0.23/2.x (jaydeepvishwakarma via shwethags)
