FALCON-1038 Log mover fails for map-reduce action. Contributed by Peeyush Bishnoi.
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/fbb4d314 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/fbb4d314 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/fbb4d314 Branch: refs/heads/0.7 Commit: fbb4d314297e10b723e7acf53a50426e50333037 Parents: 17e2f71 Author: Ajay Yadava <[email protected]> Authored: Tue Aug 25 14:00:37 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Tue Aug 25 17:20:45 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../falcon/logging/DefaultTaskLogRetriever.java | 2 +- .../org/apache/falcon/logging/JobLogMover.java | 47 ++++++++++++-------- .../falcon/logging/TaskLogRetrieverYarn.java | 11 ++++- 4 files changed, 42 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/fbb4d314/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a1054fe..f7e9127 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -91,6 +91,8 @@ Trunk (Unreleased) (Suhas Vasu) BUG FIXES + FALCON-1038 Log mover fails for map-reduce action(Peeyush Bishnoi via Ajay Yadava) + FALCON-1412 Process waits indefinitely and finally timedout even though missing dependencies are met(Pallavi Rao via Ajay Yadava) FALCON-1409 Update API throws NullPointerException(Sandeep Samudrala via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/fbb4d314/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java b/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java index 962f891..82448d8 100644 --- a/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java +++ b/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java @@ -61,7 +61,7 @@ public class DefaultTaskLogRetriever extends Configured implements TaskLogURLRet } } - protected List<String> getFromHistory(String jodId) throws IOException { + protected List<String> getFromHistory(String jobId) throws IOException { return null; } } http://git-wip-us.apache.org/repos/asf/falcon/blob/fbb4d314/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java index ba669c8..478d68c 100644 --- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java +++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java @@ -91,17 +91,26 @@ public class JobLogMover { } } } else { - // if process wf with oozie engine - String subflowId = jobInfo.getExternalId(); - copyOozieLog(client, fs, path, subflowId); - WorkflowJob subflowInfo = client.getJobInfo(subflowId); + String flowId; + // if process wf with pig, hive + if (context.getUserWorkflowEngine().equals("pig") + ||context.getUserWorkflowEngine().equals("hive")) { + flowId = jobInfo.getId(); + } else { + // if process wf with oozie engine + flowId = jobInfo.getExternalId(); + } + copyOozieLog(client, fs, path, flowId); + WorkflowJob subflowInfo = client.getJobInfo(flowId); List<WorkflowAction> actions = subflowInfo.getActions(); for (WorkflowAction action : actions) { - if (action.getType().equals("pig") - || action.getType().equals("java")) { + if (isActionTypeSupported(action)) { + LOG.info("Copying hadoop TT log for action: {} of type: {}", + action.getName(), action.getType()); copyTTlogs(fs, path, action); } else { - LOG.info("Ignoring hadoop TT log for non-pig and non-java action: {}", action.getName()); + LOG.info("Ignoring hadoop TT log for non supported action: {} of type: {}", + action.getName(), action.getType()); } } } @@ -114,8 +123,8 @@ public class JobLogMover { } private boolean notUserWorkflowEngineIsOozie(String userWorkflowEngine) { - // userWorkflowEngine will be null for replication and "pig" for pig - return userWorkflowEngine != null && EngineType.fromValue(userWorkflowEngine) != EngineType.OOZIE; + // userWorkflowEngine will be null for replication and "not null" for pig, hive, oozie + return userWorkflowEngine != null && EngineType.fromValue(userWorkflowEngine) == null; } private void copyOozieLog(OozieClient client, FileSystem fs, Path path, @@ -134,7 +143,7 @@ public class JobLogMover { for (String ttLogURL : ttLogUrls) { LOG.info("Fetching log for action: {} from url: {}", action.getExternalId(), ttLogURL); InputStream in = getURLinputStream(new URL(ttLogURL)); - OutputStream out = fs.create(new Path(path, action.getName() + "_" + OutputStream out = fs.create(new Path(path, action.getName() + "_" + action.getType() + "_" + getMappedStatus(action.getStatus()) + "-" + index + ".log")); IOUtils.copyBytes(in, out, 4096, true); LOG.info("Copied log to {}", path); @@ -143,6 +152,13 @@ public class JobLogMover { } } + private boolean isActionTypeSupported(WorkflowAction action) { + return action.getType().equals("pig") + || action.getType().equals("hive") + || action.getType().equals("java") + || action.getType().equals("map-reduce"); + } + private String getMappedStatus(WorkflowAction.Status status) { if (status == WorkflowAction.Status.FAILED || status == WorkflowAction.Status.KILLED @@ -161,14 +177,9 @@ public class JobLogMover { @SuppressWarnings("unchecked") private Class<? extends TaskLogURLRetriever> getLogRetrieverClassName(Configuration conf) { - try { - if (YARN.equals(conf.get(MAPREDUCE_FRAMEWORK))) { - return TaskLogRetrieverYarn.class; - } - return (Class<? extends TaskLogURLRetriever>) - Class.forName("org.apache.falcon.logging.v1.TaskLogRetrieverV1"); - } catch (ClassNotFoundException e) { - LOG.warn("V1 Retriever missing, falling back to Default retriever"); + if (YARN.equals(conf.get(MAPREDUCE_FRAMEWORK))) { + return TaskLogRetrieverYarn.class; + } else { return DefaultTaskLogRetriever.class; } } http://git-wip-us.apache.org/repos/asf/falcon/blob/fbb4d314/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java b/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java index 61c5afb..146d53c 100644 --- a/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java +++ b/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java @@ -48,13 +48,22 @@ public class TaskLogRetrieverYarn extends DefaultTaskLogRetriever { LOG.warn("External id for workflow action is null"); return null; } + + if (conf.get(YARN_LOG_SERVER_URL) == null) { + LOG.warn("YARN log Server is null"); + return null; + } + try { Job job = cluster.getJob(jobID); if (job != null) { TaskCompletionEvent[] events = job.getTaskCompletionEvents(0); for (TaskCompletionEvent event : events) { LogParams params = cluster.getLogParams(jobID, event.getTaskAttemptId()); - String url = SCHEME + conf.get(YARN_LOG_SERVER_URL) + "/" + String url = (conf.get(YARN_LOG_SERVER_URL).startsWith(SCHEME) + ? conf.get(YARN_LOG_SERVER_URL) + : SCHEME + conf.get(YARN_LOG_SERVER_URL)) + + "/" + event.getTaskTrackerHttp() + "/" + params.getContainerId() + "/" + params.getApplicationId() + "/"
