Repository: falcon Updated Branches: refs/heads/master c980aa800 -> 54a88b814
FALCON-2175 java.lang.IllegalArgumentException in LogMover service Author: Pallavi Rao <[email protected]> Reviewers: @sandeepSamudrala, @praveen8927 Closes #292 from pallavi-rao/2175 and squashes the following commits: 38e16ee [Pallavi Rao] Fixing checkstyle issue 144c588 [Pallavi Rao] Revert "FALCON-1821 Update git pull merge script to accept and update JIRA type" 3848b10 [Pallavi Rao] FALCON-2175 java.lang.IllegalArgumentException in LogMover service 5ecb344 [Pallavi Rao] FALCON-2175 java.lang.IllegalArgumentException in LogMover service be17164 [Pallavi Rao] Merge remote-tracking branch 'upstream/master' a6d8c6c [Pallavi Rao] FALCON-1821 Update git pull merge script to accept and update JIRA type Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/54a88b81 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/54a88b81 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/54a88b81 Branch: refs/heads/master Commit: 54a88b8146ddaad9e230a8f090fff0f825cc778c Parents: c980aa8 Author: Pallavi Rao <[email protected]> Authored: Tue Nov 15 15:39:27 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Tue Nov 15 15:39:27 2016 +0530 ---------------------------------------------------------------------- .../workflow/WorkflowExecutionContext.java | 2 +- .../falcon/messaging/JMSMessageConsumer.java | 46 ++++++++++---------- .../org/apache/falcon/logging/JobLogMover.java | 14 ++++-- 3 files changed, 36 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/54a88b81/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java index cccbe3b..d8040f0 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java @@ -170,7 +170,7 @@ public class WorkflowExecutionContext { return getValue(WorkflowExecutionArgs.LOG_FILE); } - String getNominalTime() { + public String getNominalTime() { return getValue(WorkflowExecutionArgs.NOMINAL_TIME); } http://git-wip-us.apache.org/repos/asf/falcon/blob/54a88b81/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java ---------------------------------------------------------------------- diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java index 5383e7f..db22460 100644 --- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java +++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java @@ -18,6 +18,25 @@ package org.apache.falcon.messaging; +import java.lang.reflect.InvocationTargetException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.TimeZone; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.Pair; @@ -35,26 +54,6 @@ import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; -import java.lang.reflect.InvocationTargetException; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import java.util.TimeZone; - /** * Subscribes to the falcon topic for handling retries and alerts. */ @@ -156,9 +155,12 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener { wfProperties.put(WorkflowExecutionArgs.ENTITY_NAME, entityTypePair.first); wfProperties.put(WorkflowExecutionArgs.ENTITY_TYPE, entityTypePair.second.name()); wfProperties.put(WorkflowExecutionArgs.WORKFLOW_USER, message.getStringProperty("user")); - wfProperties.put(WorkflowExecutionArgs.OPERATION, getOperation(appName).name()); + WorkflowExecutionContext.EntityOperations operation = getOperation(appName); + wfProperties.put(WorkflowExecutionArgs.OPERATION, operation.name()); + String subflowId = (operation.equals(WorkflowExecutionContext.EntityOperations.GENERATE)) + ? "@user-action" : ""; wfProperties.put(WorkflowExecutionArgs.USER_SUBFLOW_ID, - json.getString("id").concat("@user-action")); + json.getString("id").concat(subflowId)); String appType = message.getStringProperty("appType"); return WorkflowExecutionContext.create(wfProperties, WorkflowExecutionContext.Type.valueOf(appType)); http://git-wip-us.apache.org/repos/asf/falcon/blob/54a88b81/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java index 72c3dc5..535f62a 100644 --- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java +++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java @@ -18,6 +18,7 @@ package org.apache.falcon.logging; +import org.apache.commons.lang.StringUtils; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.process.EngineType; import org.apache.falcon.hadoop.HadoopClientFactory; @@ -84,17 +85,23 @@ public class JobLogMover { public int run(WorkflowExecutionContext context) { try { - OozieClient client = new OozieClient(context.getWorkflowEngineUrl()); + String engineUrl = context.getWorkflowEngineUrl(); + if (StringUtils.isBlank(engineUrl)) { + LOG.warn("Unable to retrieve workflow url for {} with status {} ", + context.getWorkflowId(), context.getWorkflowStatus()); + return 0; + } + OozieClient client = new OozieClient(engineUrl); WorkflowJob jobInfo; try { - jobInfo = client.getJobInfo(context.getUserSubflowId()); + jobInfo = client.getJobInfo(context.getWorkflowId()); } catch (OozieClientException e) { LOG.error("Error getting jobinfo for: {}", context.getUserSubflowId(), e); return 0; } //Assumption is - Each wf run will have a directory //the corresponding job logs are stored within the respective dir - Path path = new Path(context.getLogDir() + "/" + Path path = new Path(context.getLogDir() + "/" + context.getNominalTime() + "/" + String.format("%03d", context.getWorkflowRunId())); FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(path.toUri(), getConf()); @@ -117,6 +124,7 @@ public class JobLogMover { ||context.getUserWorkflowEngine().equals("hive")) { flowId = jobInfo.getId(); } else { + jobInfo = client.getJobInfo(context.getUserSubflowId()); // if process wf with oozie engine flowId = jobInfo.getExternalId(); }
