Repository: oozie
Updated Branches:
  refs/heads/master 249280cbf -> abb508780


OOZIE-2035 NotificationXCommand should support proxy


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/abb50878
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/abb50878
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/abb50878

Branch: refs/heads/master
Commit: abb50878083f5bd1671dc27d7861c97bea87b999
Parents: 249280c
Author: Purshotam Shah <[email protected]>
Authored: Tue Dec 2 11:01:25 2014 -0800
Committer: Purshotam Shah <[email protected]>
Committed: Tue Dec 2 11:01:25 2014 -0800

----------------------------------------------------------------------
 .../org/apache/oozie/client/OozieClient.java    |   4 +
 .../oozie/command/NotificationXCommand.java     | 137 +++++++++++++++++++
 .../coord/CoordActionNotificationXCommand.java  |  94 ++-----------
 .../oozie/command/wf/ActionKillXCommand.java    |   2 +-
 .../oozie/command/wf/ActionStartXCommand.java   |   2 +-
 .../apache/oozie/command/wf/ActionXCommand.java |   2 +-
 .../apache/oozie/command/wf/KillXCommand.java   |   2 +-
 .../oozie/command/wf/NotificationXCommand.java  | 137 -------------------
 .../apache/oozie/command/wf/ResumeXCommand.java |   2 +-
 .../apache/oozie/command/wf/SignalXCommand.java |   8 +-
 .../oozie/command/wf/SuspendXCommand.java       |   2 +-
 .../wf/WorkflowNotificationXCommand.java        | 136 ++++++++++++++++++
 core/src/main/resources/oozie-default.xml       |   8 ++
 .../TestCoordActionNotificationXCommand.java    |   3 +-
 .../command/wf/TestNotificationXCommand.java    |  78 -----------
 .../wf/TestWorkflowNotificationXCommand.java    |  79 +++++++++++
 .../site/twiki/CoordinatorFunctionalSpec.twiki  |   3 +
 .../src/site/twiki/WorkflowFunctionalSpec.twiki |   5 +
 release-log.txt                                 |   1 +
 19 files changed, 395 insertions(+), 310 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/client/src/main/java/org/apache/oozie/client/OozieClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/client/OozieClient.java 
b/client/src/main/java/org/apache/oozie/client/OozieClient.java
index 5e53a18..23a8f21 100644
--- a/client/src/main/java/org/apache/oozie/client/OozieClient.java
+++ b/client/src/main/java/org/apache/oozie/client/OozieClient.java
@@ -103,10 +103,14 @@ public class OozieClient {
 
     public static final String WORKFLOW_NOTIFICATION_URL = 
"oozie.wf.workflow.notification.url";
 
+    public static final String WORKFLOW_NOTIFICATION_PROXY = 
"oozie.wf.workflow.notification.proxy";
+
     public static final String ACTION_NOTIFICATION_URL = 
"oozie.wf.action.notification.url";
 
     public static final String COORD_ACTION_NOTIFICATION_URL = 
"oozie.coord.action.notification.url";
 
+    public static final String COORD_ACTION_NOTIFICATION_PROXY = 
"oozie.coord.action.notification.proxy";
+
     public static final String RERUN_SKIP_NODES = "oozie.wf.rerun.skip.nodes";
 
     public static final String RERUN_FAIL_NODES = "oozie.wf.rerun.failnodes";

http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/main/java/org/apache/oozie/command/NotificationXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/NotificationXCommand.java 
b/core/src/main/java/org/apache/oozie/command/NotificationXCommand.java
new file mode 100644
index 0000000..68359aa
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/NotificationXCommand.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.net.URL;
+import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.util.LogUtils;
+import org.apache.oozie.util.XLog;
+
+public abstract class NotificationXCommand extends XCommand<Void> {
+
+    public static final String NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY = 
"oozie.notification.url.connection.timeout";
+    public static final String NOTIFICATION_PROXY_KEY = 
"oozie.notification.proxy";
+
+    protected int retries = 0;
+    protected String jobId;
+    protected String url;
+    protected String proxyConf;
+
+    public NotificationXCommand(String name, String type, int priority) {
+        super(name, type, priority);
+    }
+
+    @Override
+    final protected boolean isLockRequired() {
+        return false;
+    }
+
+    @Override
+    public String getEntityKey() {
+        return jobId;
+    }
+
+    @Override
+    protected void verifyPrecondition() throws CommandException, 
PreconditionException {
+
+    }
+
+    @Override
+    protected Void execute() throws CommandException {
+        sendNotification();
+        return null;
+    }
+
+    @Override
+    protected void setLogInfo() {
+        LogUtils.setLogInfo(jobId);
+    }
+
+    protected Proxy getProxy(String proxyConf) {
+        // Configure the proxy to use if its set. It should be set like
+        // proxyType@proxyHostname:port
+        if (proxyConf != null && !proxyConf.trim().equals("") && 
proxyConf.lastIndexOf(":") != -1) {
+            int typeIndex = proxyConf.indexOf("@");
+            Proxy.Type proxyType = Proxy.Type.HTTP;
+            if (typeIndex != -1 && proxyConf.substring(0, 
typeIndex).compareToIgnoreCase("socks") == 0) {
+                proxyType = Proxy.Type.SOCKS;
+            }
+            String hostname = proxyConf.substring(typeIndex + 1, 
proxyConf.lastIndexOf(":"));
+            String portConf = proxyConf.substring(proxyConf.lastIndexOf(":") + 
1);
+            try {
+                int port = Integer.parseInt(portConf);
+                LOG.info("Workflow notification using proxy type \"" + 
proxyType + "\" hostname \"" + hostname
+                        + "\" and port \"" + port + "\"");
+                return new Proxy(proxyType, new InetSocketAddress(hostname, 
port));
+            }
+            catch (NumberFormatException nfe) {
+                LOG.warn("Workflow notification couldn't parse configured 
proxy's port " + portConf
+                        + ". Not going to use a proxy");
+            }
+        }
+        return Proxy.NO_PROXY;
+    }
+
+    protected void handleRetry() {
+        if (retries < 3) {
+            retries++;
+            this.resetUsed();
+            queue(this, 60 * 1000);
+        }
+        else {
+            LOG.warn(XLog.OPS, "could not send notification [{0}]", url);
+        }
+    }
+
+    protected void sendNotification() {
+        if (url != null) {
+            Proxy proxy = getProxy(proxyConf);
+            try {
+                URL url = new URL(this.url);
+                HttpURLConnection urlConn = (HttpURLConnection) 
url.openConnection(proxy);
+                urlConn.setConnectTimeout(getTimeOut());
+                urlConn.setReadTimeout(getTimeOut());
+                if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) {
+                    handleRetry();
+                }
+            }
+            catch (IOException ex) {
+                handleRetry();
+            }
+        }
+        else {
+            LOG.info("No Notification URL is defined. Therefore nothing to 
notify for job " + jobId);
+
+        }
+
+    }
+
+    private int getTimeOut() {
+        return 
ConfigurationService.getInt(NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY);
+    }
+
+    public void setRetry(int retries) {
+        this.retries = retries;
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/main/java/org/apache/oozie/command/coord/CoordActionNotificationXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/coord/CoordActionNotificationXCommand.java
 
b/core/src/main/java/org/apache/oozie/command/coord/CoordActionNotificationXCommand.java
index 2556152..d51f0d7 100644
--- 
a/core/src/main/java/org/apache/oozie/command/coord/CoordActionNotificationXCommand.java
+++ 
b/core/src/main/java/org/apache/oozie/command/coord/CoordActionNotificationXCommand.java
@@ -20,52 +20,40 @@ package org.apache.oozie.command.coord;
 
 import java.io.IOException;
 import java.io.StringReader;
-import java.net.HttpURLConnection;
-import java.net.URL;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.command.CommandException;
-import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.command.wf.NotificationXCommand;
-import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.command.NotificationXCommand;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.LogUtils;
 import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XLog;
 
 /**
  * This class will send the notification for the coordinator action
  */
-public class CoordActionNotificationXCommand extends CoordinatorXCommand<Void> 
{
+public class CoordActionNotificationXCommand extends NotificationXCommand {
 
     private final CoordinatorActionBean actionBean;
     private static final String STATUS_PATTERN = "\\$status";
     private static final String ACTION_ID_PATTERN = "\\$actionId";
 
-    //this variable is package private only for test purposes
+    // this variable is package private only for test purposes
     int retries = 0;
 
     public CoordActionNotificationXCommand(CoordinatorActionBean actionBean) {
         super("coord_action_notification", "coord_action_notification", 0);
         ParamChecker.notNull(actionBean, "Action Bean");
         this.actionBean = actionBean;
-    }
+        jobId = actionBean.getId();
 
-    @Override
-    protected void setLogInfo() {
-        LogUtils.setLogInfo(actionBean.getId());
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.oozie.command.XCommand#execute()
-     */
     @Override
-    protected Void execute() throws CommandException {
-        LOG.info("STARTED Coordinator Notification actionId=" + 
actionBean.getId() + " : " + actionBean.getStatus());
+    protected void loadState() throws CommandException {
         Configuration conf;
         try {
             conf = new XConfiguration(new 
StringReader(actionBean.getRunConf()));
@@ -74,78 +62,16 @@ public class CoordActionNotificationXCommand extends 
CoordinatorXCommand<Void> {
             LOG.warn("Configuration parse error. read from DB :" + 
actionBean.getRunConf());
             throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1);
         }
-        String url = conf.get(OozieClient.COORD_ACTION_NOTIFICATION_URL);
+        url = conf.get(OozieClient.COORD_ACTION_NOTIFICATION_URL);
         if (url != null) {
             url = url.replaceAll(ACTION_ID_PATTERN, actionBean.getId());
             url = url.replaceAll(STATUS_PATTERN, 
actionBean.getStatus().toString());
-            LOG.debug("Notification URL :" + url);
-            try {
-                int timeout = ConfigurationService.getInt(NotificationXCommand
-                        .NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY);
-                URL urlObj = new URL(url);
-                HttpURLConnection urlConn = (HttpURLConnection) 
urlObj.openConnection();
-                urlConn.setConnectTimeout(timeout);
-                urlConn.setReadTimeout(timeout);
-                if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) {
-                    handleRetry(url);
-                }
-            }
-            catch (IOException ex) {
-                handleRetry(url);
-            }
-        }
-        else {
-            LOG.info("No Notification URL is defined. Therefore nothing to 
notify for job " + actionBean.getJobId()
-                    + " action ID " + actionBean.getId());
-        }
-        LOG.info("ENDED Coordinator Notification actionId=" + 
actionBean.getId());
-        return null;
-    }
+            proxyConf = conf.get(OozieClient.COORD_ACTION_NOTIFICATION_PROXY,
+                    Services.get().getConf().get(NOTIFICATION_PROXY_KEY));
+            LOG.debug("Proxy :" + proxyConf);
 
-    /**
-     * This method handles the retry for the coordinator action.
-     *
-     * @param url This is the URL where the notification has to be sent.
-     */
-    private void handleRetry(String url) {
-        if (retries < 3) {
-            retries++;
-            this.resetUsed();
-            queue(this, 60 * 1000);
-        }
-        else {
-            LOG.warn(XLog.OPS, "could not send notification [{0}]", url);
         }
-    }
-
-    /* (non-Javadoc)
-     * @see org.apache.oozie.command.XCommand#getEntityKey()
-     */
-    @Override
-    public String getEntityKey() {
-        return actionBean.getId();
-    }
-
-    /* (non-Javadoc)
-     * @see org.apache.oozie.command.XCommand#isLockRequired()
-     */
-    @Override
-    protected boolean isLockRequired() {
-        return false;
-    }
-
-    /* (non-Javadoc)
-     * @see org.apache.oozie.command.XCommand#loadState()
-     */
-    @Override
-    protected void loadState() throws CommandException {
+        LOG.debug("Notification URL :" + url);
         LogUtils.setLogInfo(actionBean);
     }
-
-    /* (non-Javadoc)
-     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
-     */
-    @Override
-    protected void verifyPrecondition() throws CommandException, 
PreconditionException {
-    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
index 6073e4d..33498bf 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
@@ -155,7 +155,7 @@ public class ActionKillXCommand extends 
ActionXCommand<Void> {
                     if(slaEvent != null) {
                         insertList.add(slaEvent);
                     }
-                    queue(new NotificationXCommand(wfJob, wfAction));
+                    queue(new WorkflowNotificationXCommand(wfJob, wfAction));
                 }
                 catch (ActionExecutorException ex) {
                     wfAction.resetPending();

http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
index 373c942..d4048a1 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
@@ -261,7 +261,7 @@ public class ActionStartXCommand extends 
ActionXCommand<Void> {
                         wfAction.setErrorInfo(START_DATA_MISSING, "Execution 
Started, but Start Data Missing from Action");
                         failJob(context);
                     } else {
-                        queue(new NotificationXCommand(wfJob, wfAction));
+                        queue(new WorkflowNotificationXCommand(wfJob, 
wfAction));
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
index 98fddfd..9e7243a 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
@@ -195,7 +195,7 @@ public abstract class ActionXCommand<T> extends 
WorkflowXCommand<Void> {
                 workflow.setStatus(WorkflowJob.Status.FAILED);
                 action.setStatus(WorkflowAction.Status.FAILED);
                 action.resetPending();
-                queue(new NotificationXCommand(workflow, action));
+                queue(new WorkflowNotificationXCommand(workflow, action));
                 queue(new KillXCommand(workflow.getId()));
                 InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER_NAME, 
1, getInstrumentation());
             }

http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
index cac1381..d39b93d 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
@@ -183,7 +183,7 @@ public class KillXCommand extends WorkflowXCommand<Void> {
             if (EventHandlerService.isEnabled()) {
                 generateEvent(wfJob);
             }
-            queue(new NotificationXCommand(wfJob));
+            queue(new WorkflowNotificationXCommand(wfJob));
         }
         catch (JPAExecutorException e) {
             throw new CommandException(e);

http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/main/java/org/apache/oozie/command/wf/NotificationXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/wf/NotificationXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/NotificationXCommand.java
deleted file mode 100644
index 0fc3d65..0000000
--- a/core/src/main/java/org/apache/oozie/command/wf/NotificationXCommand.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.command.wf;
-
-import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.WorkflowActionBean;
-import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.command.CommandException;
-import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.service.ConfigurationService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.util.LogUtils;
-import org.apache.oozie.util.ParamChecker;
-import org.apache.oozie.util.XLog;
-
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
-
-public class NotificationXCommand extends WorkflowXCommand<Void> {
-
-    public static final String NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY = 
"oozie.notification.url.connection.timeout";
-
-    private static final String STATUS_PATTERN = "\\$status";
-    private static final String JOB_ID_PATTERN = "\\$jobId";
-    private static final String NODE_NAME_PATTERN = "\\$nodeName";
-
-    private String url;
-    private String id;
-
-    //this variable is package private only for test purposes
-    int retries = 0;
-
-    public NotificationXCommand(WorkflowJobBean workflow) {
-        super("job.notification", "job.notification", 0);
-        ParamChecker.notNull(workflow, "workflow");
-        id = workflow.getId();
-        url = 
workflow.getWorkflowInstance().getConf().get(OozieClient.WORKFLOW_NOTIFICATION_URL);
-        if (url != null) {
-            url = url.replaceAll(JOB_ID_PATTERN, workflow.getId());
-            url = url.replaceAll(STATUS_PATTERN, 
workflow.getStatus().toString());
-        }
-    }
-
-    public NotificationXCommand(WorkflowJobBean workflow, WorkflowActionBean 
action) {
-        super("action.notification", "job.notification", 0);
-        ParamChecker.notNull(workflow, "workflow");
-        ParamChecker.notNull(action, "action");
-        id = action.getId();
-        url = 
workflow.getWorkflowInstance().getConf().get(OozieClient.ACTION_NOTIFICATION_URL);
-        if (url != null) {
-            url = url.replaceAll(JOB_ID_PATTERN, workflow.getId());
-            url = url.replaceAll(NODE_NAME_PATTERN, action.getName());
-            if (action.isComplete()) {
-                url = url.replaceAll(STATUS_PATTERN, "T:" + 
action.getTransition());
-            }
-            else {
-                url = url.replaceAll(STATUS_PATTERN, "S:" + 
action.getStatus().toString());
-            }
-        }
-    }
-
-    @Override
-    protected void setLogInfo() {
-        LogUtils.setLogInfo(id);
-    }
-
-    @Override
-    protected boolean isLockRequired() {
-        return false;
-    }
-
-    @Override
-    public String getEntityKey() {
-        return url;
-    }
-
-    @Override
-    protected void loadState() throws CommandException {
-    }
-
-    @Override
-    protected void verifyPrecondition() throws CommandException, 
PreconditionException {
-    }
-
-    @Override
-    protected Void execute() throws CommandException {
-        if (url != null) {
-            int timeout = 
ConfigurationService.getInt(NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY);
-            try {
-                URL url = new URL(this.url);
-                HttpURLConnection urlConn = (HttpURLConnection) 
url.openConnection();
-                urlConn.setConnectTimeout(timeout);
-                urlConn.setReadTimeout(timeout);
-                if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) {
-                    handleRetry();
-                }
-            }
-            catch (IOException ex) {
-                handleRetry();
-            }
-        }
-        return null;
-    }
-
-    private void handleRetry() {
-        if (retries < 3) {
-            retries++;
-            this.resetUsed();
-            queue(this, 60 * 1000);
-        }
-        else {
-            LOG.warn(XLog.OPS, "could not send notification [{0}]", url);
-        }
-    }
-
-    public String getUrl() {
-        return url;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
index 73c1447..edbd767 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
@@ -142,7 +142,7 @@ public class ResumeXCommand extends WorkflowXCommand<Void> {
                 if (EventHandlerService.isEnabled()) {
                     generateEvent(workflow);
                 }
-                queue(new NotificationXCommand(workflow));
+                queue(new WorkflowNotificationXCommand(workflow));
             }
             return null;
         }

http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
index 5ff0e5c..7ca8646 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
@@ -184,7 +184,7 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
                 // 2. Add SLA registration events for all WF_ACTIONS
                 
createSLARegistrationForAllActions(workflowInstance.getApp().getDefinition(), 
wfJob.getUser(),
                         wfJob.getGroup(), wfJob.getConf());
-                queue(new NotificationXCommand(wfJob));
+                queue(new WorkflowNotificationXCommand(wfJob));
             }
             else {
                 throw new CommandException(ErrorCode.E0801, wfJob.getId());
@@ -209,7 +209,7 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
             wfAction.resetPending();
             if (!skipAction) {
                 
wfAction.setTransition(workflowInstance.getTransition(wfAction.getName()));
-                queue(new NotificationXCommand(wfJob, wfAction));
+                queue(new WorkflowNotificationXCommand(wfJob, wfAction));
             }
             updateList.add(new 
UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_PENDING_TRANS,
                     wfAction));
@@ -243,7 +243,7 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
                         wfJobErrorCode = actionToFail.getErrorCode();
                         wfJobErrorMsg = actionToFail.getErrorMessage();
                     }
-                    queue(new NotificationXCommand(wfJob, actionToFail));
+                    queue(new WorkflowNotificationXCommand(wfJob, 
actionToFail));
                     SLAEventBean slaEvent = 
SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(),
                             Status.FAILED, SlaAppType.WORKFLOW_ACTION);
                     if (slaEvent != null) {
@@ -279,7 +279,7 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
             if (slaEvent != null) {
                 insertList.add(slaEvent);
             }
-            queue(new NotificationXCommand(wfJob));
+            queue(new WorkflowNotificationXCommand(wfJob));
             if (wfJob.getStatus() == WorkflowJob.Status.SUCCEEDED) {
                 
InstrumentUtils.incrJobCounter(INSTR_SUCCEEDED_JOBS_COUNTER_NAME, 1, 
getInstrumentation());
             }

http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java
index 199af36..ef97990 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java
@@ -70,7 +70,7 @@ public class SuspendXCommand extends WorkflowXCommand<Void> {
             updateList.add(new 
UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
                     this.wfJobBean));
             
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, 
updateList, null);
-            queue(new NotificationXCommand(this.wfJobBean));
+            queue(new WorkflowNotificationXCommand(this.wfJobBean));
             //Calling suspend recursively to handle parent workflow
             suspendParentWorkFlow();
         }

http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/main/java/org/apache/oozie/command/wf/WorkflowNotificationXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/wf/WorkflowNotificationXCommand.java
 
b/core/src/main/java/org/apache/oozie/command/wf/WorkflowNotificationXCommand.java
new file mode 100644
index 0000000..7ea0e1e
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/command/wf/WorkflowNotificationXCommand.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.wf;
+
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.util.LogUtils;
+import org.apache.oozie.util.ParamChecker;
+import org.apache.oozie.util.XLog;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+public class WorkflowNotificationXCommand extends WorkflowXCommand<Void> {
+
+    public static final String NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY = 
"oozie.notification.url.connection.timeout";
+
+    private static final String STATUS_PATTERN = "\\$status";
+    private static final String JOB_ID_PATTERN = "\\$jobId";
+    private static final String NODE_NAME_PATTERN = "\\$nodeName";
+
+    private String url;
+    private String id;
+
+    //this variable is package private only for test purposes
+    int retries = 0;
+
+    public WorkflowNotificationXCommand(WorkflowJobBean workflow) {
+        super("job.notification", "job.notification", 0);
+        ParamChecker.notNull(workflow, "workflow");
+        id = workflow.getId();
+        url = 
workflow.getWorkflowInstance().getConf().get(OozieClient.WORKFLOW_NOTIFICATION_URL);
+        if (url != null) {
+            url = url.replaceAll(JOB_ID_PATTERN, workflow.getId());
+            url = url.replaceAll(STATUS_PATTERN, 
workflow.getStatus().toString());
+        }
+    }
+
+    public WorkflowNotificationXCommand(WorkflowJobBean workflow, 
WorkflowActionBean action) {
+        super("action.notification", "job.notification", 0);
+        ParamChecker.notNull(workflow, "workflow");
+        ParamChecker.notNull(action, "action");
+        id = action.getId();
+        url = 
workflow.getWorkflowInstance().getConf().get(OozieClient.ACTION_NOTIFICATION_URL);
+        if (url != null) {
+            url = url.replaceAll(JOB_ID_PATTERN, workflow.getId());
+            url = url.replaceAll(NODE_NAME_PATTERN, action.getName());
+            if (action.isComplete()) {
+                url = url.replaceAll(STATUS_PATTERN, "T:" + 
action.getTransition());
+            }
+            else {
+                url = url.replaceAll(STATUS_PATTERN, "S:" + 
action.getStatus().toString());
+            }
+        }
+    }
+
+    @Override
+    protected void setLogInfo() {
+        LogUtils.setLogInfo(id);
+    }
+
+    @Override
+    protected boolean isLockRequired() {
+        return false;
+    }
+
+    @Override
+    public String getEntityKey() {
+        return url;
+    }
+
+    @Override
+    protected void loadState() throws CommandException {
+    }
+
+    @Override
+    protected void verifyPrecondition() throws CommandException, 
PreconditionException {
+    }
+
+    @Override
+    protected Void execute() throws CommandException {
+        if (url != null) {
+            int timeout = 
ConfigurationService.getInt(NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY);
+            try {
+                URL url = new URL(this.url);
+                HttpURLConnection urlConn = (HttpURLConnection) 
url.openConnection();
+                urlConn.setConnectTimeout(timeout);
+                urlConn.setReadTimeout(timeout);
+                if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) {
+                    handleRetry();
+                }
+            }
+            catch (IOException ex) {
+                handleRetry();
+            }
+        }
+        return null;
+    }
+
+    private void handleRetry() {
+        if (retries < 3) {
+            retries++;
+            this.resetUsed();
+            queue(this, 60 * 1000);
+        }
+        else {
+            LOG.warn(XLog.OPS, "could not send notification [{0}]", url);
+        }
+    }
+
+    public String getUrl() {
+        return url;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml 
b/core/src/main/resources/oozie-default.xml
index 6a768b3..3d07c6f 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -2350,4 +2350,12 @@
         </description>
     </property>
 
+    <property>
+        <name>oozie.notification.proxy</name>
+        <value></value>
+        <description>
+         System level proxy setting for job notifications.
+        </description>
+    </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionNotificationXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionNotificationXCommand.java
 
b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionNotificationXCommand.java
index b58ecd9..8ca404f 100644
--- 
a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionNotificationXCommand.java
+++ 
b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionNotificationXCommand.java
@@ -21,8 +21,9 @@ package org.apache.oozie.command.coord;
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.command.NotificationXCommand;
 import org.apache.oozie.command.wf.HangServlet;
-import org.apache.oozie.command.wf.NotificationXCommand;
+import org.apache.oozie.command.wf.WorkflowNotificationXCommand;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.test.EmbeddedServletContainer;
 import org.apache.oozie.test.XTestCase;

http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/test/java/org/apache/oozie/command/wf/TestNotificationXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/command/wf/TestNotificationXCommand.java 
b/core/src/test/java/org/apache/oozie/command/wf/TestNotificationXCommand.java
deleted file mode 100644
index ad2fbd7..0000000
--- 
a/core/src/test/java/org/apache/oozie/command/wf/TestNotificationXCommand.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.command.wf;
-
-import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.test.EmbeddedServletContainer;
-import org.apache.oozie.test.XTestCase;
-import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.workflow.WorkflowInstance;
-import org.junit.Assert;
-import org.mockito.Mockito;
-
-public class TestNotificationXCommand extends XTestCase {
-    private EmbeddedServletContainer container;
-
-    @Override
-    public void setUp() throws Exception {
-        super.setUp();
-        
setSystemProperty(NotificationXCommand.NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY, 
"50");
-        Services services = new Services();
-        services.init();
-        container = new EmbeddedServletContainer("blah");
-        container.addServletEndpoint("/hang/*", HangServlet.class);
-        container.start();
-    }
-
-    @Override
-    public void tearDown() throws Exception {
-        try {
-            container.stop();
-        }
-        catch (Exception ex) {
-        }
-        try {
-            Services.get().destroy();
-        }
-        catch (Exception ex) {
-        }
-        super.tearDown();
-    }
-
-    public void testWFNotificationTimeout() throws Exception {
-        XConfiguration conf = new XConfiguration();
-        conf.set(OozieClient.WORKFLOW_NOTIFICATION_URL, 
container.getServletURL("/hang/*"));
-        WorkflowInstance wfi = Mockito.mock(WorkflowInstance.class);
-        Mockito.when(wfi.getConf()).thenReturn(conf);
-        WorkflowJobBean workflow = Mockito.mock(WorkflowJobBean.class);
-        Mockito.when(workflow.getId()).thenReturn("1");
-        
Mockito.when(workflow.getStatus()).thenReturn(WorkflowJob.Status.SUCCEEDED);
-        Mockito.when(workflow.getWorkflowInstance()).thenReturn(wfi);
-        NotificationXCommand command = new NotificationXCommand(workflow);
-        command.retries = 3;
-        long start = System.currentTimeMillis();
-        command.call();
-        long end = System.currentTimeMillis();
-        Assert.assertTrue(end - start >= 50);
-        Assert.assertTrue(end - start < 10000);
-    }
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowNotificationXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowNotificationXCommand.java
 
b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowNotificationXCommand.java
new file mode 100644
index 0000000..b427180
--- /dev/null
+++ 
b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowNotificationXCommand.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.wf;
+
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.NotificationXCommand;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.EmbeddedServletContainer;
+import org.apache.oozie.test.XTestCase;
+import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.workflow.WorkflowInstance;
+import org.junit.Assert;
+import org.mockito.Mockito;
+
+public class TestWorkflowNotificationXCommand extends XTestCase {
+    private EmbeddedServletContainer container;
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        
setSystemProperty(NotificationXCommand.NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY, 
"50");
+        Services services = new Services();
+        services.init();
+        container = new EmbeddedServletContainer("blah");
+        container.addServletEndpoint("/hang/*", HangServlet.class);
+        container.start();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        try {
+            container.stop();
+        }
+        catch (Exception ex) {
+        }
+        try {
+            Services.get().destroy();
+        }
+        catch (Exception ex) {
+        }
+        super.tearDown();
+    }
+
+    public void testWFNotificationTimeout() throws Exception {
+        XConfiguration conf = new XConfiguration();
+        conf.set(OozieClient.WORKFLOW_NOTIFICATION_URL, 
container.getServletURL("/hang/*"));
+        WorkflowInstance wfi = Mockito.mock(WorkflowInstance.class);
+        Mockito.when(wfi.getConf()).thenReturn(conf);
+        WorkflowJobBean workflow = Mockito.mock(WorkflowJobBean.class);
+        Mockito.when(workflow.getId()).thenReturn("1");
+        
Mockito.when(workflow.getStatus()).thenReturn(WorkflowJob.Status.SUCCEEDED);
+        Mockito.when(workflow.getWorkflowInstance()).thenReturn(wfi);
+        WorkflowNotificationXCommand command = new 
WorkflowNotificationXCommand(workflow);
+        command.retries = 3;
+        long start = System.currentTimeMillis();
+        command.call();
+        long end = System.currentTimeMillis();
+        Assert.assertTrue(end - start >= 50);
+        Assert.assertTrue(end - start < 10000);
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki 
b/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
index 7f8c0c6..8894649 100644
--- a/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
+++ b/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
@@ -3525,6 +3525,9 @@ See also 
[[WorkflowFunctionalSpec#WorkflowNotifications][Workflow Notifications]
 
 If the =oozie.coord.workflow.notification.url= property is present in the 
coordinator job properties when submitting the job,
 Oozie will make a notification to the provided URL when any of the 
coordinator's actions changes its status.
+=oozie.coord.action.notification.proxy= property can be used to configure 
either a http or socks proxy.
+The format is proxyHostname:port or proxyType@proxyHostname:port. If proxy 
type is not specified, it defaults to http.
+For eg: myhttpproxyhost.mydomain.com:80 or [email protected]:1080.
 
 If the URL contains any of the following tokens, they will be replaced with 
the actual values by Oozie before making
 the notification:

http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/WorkflowFunctionalSpec.twiki 
b/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
index e6e52a0..21199e8 100644
--- a/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
+++ b/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
@@ -2245,6 +2245,10 @@ See also 
[[CoordinatorFunctionalSpec#CoordinatorNotifications][Coordinator Notif
 
 If the =oozie.wf.workflow.notification.url= property is present in the 
workflow job properties when submitting the job,
 Oozie will make a notification to the provided URL when the workflow job 
changes its status.
+=oozie.wf.workflow.notification.proxy= property can be used to configure 
either a http or socks proxy.
+The format is proxyHostname:port or proxyType@proxyHostname:port. If proxy 
type is not specified, it defaults to http.
+For eg: myhttpproxyhost.mydomain.com:80 or [email protected]:1080.
+
 
 If the URL contains any of the following tokens, they will be replaced with 
the actual values by Oozie before making
 the notification:
@@ -2257,6 +2261,7 @@ the notification:
 If the =oozie.wf.action.notification.url= property is present in the workflow 
job properties when submitting the job,
 Oozie will make a notification to the provided URL every time the workflow job 
enters and exits an action node. For
 decision nodes, Oozie will send a single notification with the name of the 
first evaluation that resolved to =true=.
+=oozie.wf.workflow.notification.proxy= property can be used to configure 
proxy, it should contain proxy host and port (xyz:4080).
 
 If the URL contains any of the following tokens, they will be replaced with 
the actual values by Oozie before making
 the notification:

http://git-wip-us.apache.org/repos/asf/oozie/blob/abb50878/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 4b5b050..9b8708e 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.2.0 release (trunk - unreleased)
 
+OOZIE-2035 NotificationXCommand should support proxy (puru)
 OOZIE-2065 Oozie returns incorrect total action for coord dryrun (puru)
 OOZIE-2069 RecoveryService reads incorrect configuration (puru)
 OOZIE-2074 Compatibility issue with Yarn and Hadoop 0.23/2.x 
(jaydeepvishwakarma via shwethags)

Reply via email to