Repository: falcon Updated Branches: refs/heads/master 08fbf4f38 -> d81820082
FALCON-1102 Gather data transfer details of filesystem replication. Contributed by Peeyush Bishnoi. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/d8182008 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/d8182008 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/d8182008 Branch: refs/heads/master Commit: d8182008269050e926b99c7111817d1e327c0dfd Parents: 08fbf4f Author: Sowmya Ramesh <[email protected]> Authored: Mon Oct 12 16:31:04 2015 -0700 Committer: Sowmya Ramesh <[email protected]> Committed: Mon Oct 12 16:31:04 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../resources/hdfs-replication-workflow.xml | 2 + .../InstanceRelationshipGraphBuilder.java | 30 +++++++ .../falcon/workflow/WorkflowExecutionArgs.java | 3 +- .../workflow/WorkflowExecutionContext.java | 64 ++++++++++++++ .../metadata/MetadataMappingServiceTest.java | 67 +++++++++++++- metrics/pom.xml | 27 ++++++ .../falcon/job/FSReplicationCounters.java | 44 ++++++++++ .../java/org/apache/falcon/job/JobCounters.java | 92 ++++++++++++++++++++ .../apache/falcon/job/JobCountersHandler.java | 41 +++++++++ .../java/org/apache/falcon/job/JobType.java | 26 ++++++ .../falcon/job/ReplicationJobCountersList.java | 61 +++++++++++++ .../falcon/job/FSReplicationCountersTest.java | 52 +++++++++++ .../feed/FSReplicationWorkflowBuilder.java | 1 + .../feed/FeedReplicationWorkflowBuilder.java | 25 ++++++ .../feed/OozieFeedWorkflowBuilderTest.java | 23 ++++- .../feed/fs-replication-feed-counters.xml | 59 +++++++++++++ replication/pom.xml | 4 + .../falcon/replication/FeedReplicator.java | 22 ++++- 19 files changed, 639 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ff8c40e..e6c8f28 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,8 @@ Trunk (Unreleased) FALCON-1401 MetadataMappingService fails to add an edge for a process instance(Pallavi Rao) NEW FEATURES + FALCON-1102 Gather data transfer details of filesystem replication(Peeyush Bishnoi via Sowmya Ramesh) + FALCON-1316 Add supporting REST API calls for new UI(Balu Vellanki via Sowmya Ramesh) FALCON-1473 Feed SLA Miss Alerts through REST API(Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml ---------------------------------------------------------------------- diff --git a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml index 942421f..c1966be 100644 --- a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml +++ b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml @@ -67,6 +67,8 @@ <arg>FILESYSTEM</arg> <arg>-availabilityFlag</arg> <arg>${availabilityFlag == 'NA' ? "NA" : availabilityFlag}</arg> + <arg>-counterLogDir</arg> + <arg>${logDir}/job-${nominalTime}/${srcClusterName == 'NA' ? '' : srcClusterName}</arg> </java> <ok to="end"/> <error to="fail"/> http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java index 016c622..f485764 100644 --- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java +++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java @@ -91,9 +91,25 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder { addPipelines(process.getPipelines(), processInstance); } + addCounters(processInstance, context); + return processInstance; } + private void addCounters(Vertex processInstance, WorkflowExecutionContext context) throws FalconException { + String counterString = getCounterString(context); + if (!StringUtils.isBlank(counterString)) { + addCountersToInstance(counterString, processInstance); + } + } + + private String getCounterString(WorkflowExecutionContext context) { + if (!StringUtils.isBlank(context.getCounters())) { + return context.getCounters(); + } + return null; + } + public String getProcessInstanceName(WorkflowExecutionContext context) { return context.getEntityName() + "/" + context.getNominalTimeAsISO8601(); } @@ -118,6 +134,18 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder { vertex.setProperty(optionName.getName(), value); } + private void addCountersToInstance(String counterString, Vertex vertex) throws FalconException { + String[] counterKeyValues = counterString.split(","); + try { + for (String counter : counterKeyValues) { + String[] keyVals = counter.split(":", 2); + vertex.setProperty(keyVals[0], Long.parseLong(keyVals[1])); + } + } catch (NumberFormatException e) { + throw new FalconException("Invalid values for counter:" + e); + } + } + public void addInstanceToEntity(Vertex instanceVertex, String entityName, RelationshipType entityType, RelationshipLabel edgeLabel) { addInstanceToEntity(instanceVertex, entityName, entityType, edgeLabel, null); @@ -200,6 +228,8 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder { addInstanceToEntity(feedInstanceVertex, targetClusterName, RelationshipType.CLUSTER_ENTITY, RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE, context.getTimeStampAsISO8601()); + + addCounters(feedInstanceVertex, context); } public void addEvictedInstance(WorkflowExecutionContext context) throws FalconException { http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java index d2430a2..ac7140c 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java @@ -84,7 +84,8 @@ public enum WorkflowExecutionArgs { LOG_DIR("logDir", "log dir where lineage can be recorded"), CONTEXT_FILE("contextFile", "wf execution context file path where wf properties are recorded", false), - CONTEXT_TYPE("contextType", "wf execution context type, pre or post processing", false); + CONTEXT_TYPE("contextType", "wf execution context type, pre or post processing", false), + COUNTERS("counters", "store job counters", false); private final String name; http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java index 45b6d23..b870e3a 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java @@ -23,6 +23,7 @@ import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.EntityType; @@ -314,6 +315,10 @@ public class WorkflowExecutionContext { return Type.valueOf(getValue(WorkflowExecutionArgs.CONTEXT_TYPE)); } + public String getCounters() { + return getValue(WorkflowExecutionArgs.COUNTERS); + } + /** * this method is invoked from with in the workflow. * @@ -383,6 +388,33 @@ public class WorkflowExecutionContext { } + public static Path getCounterFile(String logDir) { + return new Path(logDir, "counter.txt"); + } + + public static String readCounters(FileSystem fs, Path counterFile) throws IOException{ + StringBuilder counterBuffer = new StringBuilder(); + BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(counterFile))); + try { + String line; + while ((line = in.readLine()) != null) { + counterBuffer.append(line); + counterBuffer.append(","); + } + } catch (IOException e) { + throw e; + } finally { + IOUtils.closeQuietly(in); + } + + String counterString = counterBuffer.toString(); + if (StringUtils.isNotBlank(counterString) && counterString.length() > 0) { + return counterString.substring(0, counterString.length() - 1); + } else { + return null; + } + } + public static WorkflowExecutionContext create(String[] args, Type type) throws FalconException { return create(args, type, null); } @@ -408,10 +440,42 @@ public class WorkflowExecutionContext { executionContext.context.put(WorkflowExecutionArgs.CONTEXT_FILE, getFilePath(executionContext.getLogDir(), executionContext.getEntityName(), executionContext.getEntityType(), executionContext.getOperation())); + addCounterToWF(executionContext); return executionContext; } + private static void addCounterToWF(WorkflowExecutionContext executionContext) throws FalconException { + if (executionContext.hasWorkflowFailed()) { + LOG.info("Workflow Instance failed, counter will not be added: {}", + executionContext.getWorkflowRunIdString()); + return; + } + + FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( + new Path(executionContext.getLogDir()).toUri()); + Path counterFile = getCounterFile(executionContext.getLogDir()); + try { + if (fs.exists(counterFile)) { + String counters = readCounters(fs, counterFile); + if (StringUtils.isNotBlank(counters)) { + executionContext.context.put(WorkflowExecutionArgs.COUNTERS, counters); + } + } + } catch (IOException e) { + LOG.error("Error in accessing counter file :" + e); + } finally { + try { + if (fs.exists(counterFile)) { + fs.delete(counterFile, false); + } + fs.close(); + } catch (IOException e) { + LOG.error("Unable to delete counter file: {}", e); + } + } + } + private static CommandLine getCommand(String[] arguments) throws ParseException { Options options = new Options(); http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java index 89e8178..29f933d 100644 --- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java +++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java @@ -41,20 +41,23 @@ import org.apache.falcon.entity.v0.process.Inputs; import org.apache.falcon.entity.v0.process.Output; import org.apache.falcon.entity.v0.process.Outputs; import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.retention.EvictedInstanceSerDe; import org.apache.falcon.security.CurrentUser; import org.apache.falcon.service.Services; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.WorkflowExecutionArgs; import org.apache.falcon.workflow.WorkflowExecutionContext; -import static org.apache.falcon.workflow.WorkflowExecutionContext.EntityOperations; import org.apache.falcon.workflow.WorkflowJobEndNotificationService; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -62,13 +65,15 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import static org.apache.falcon.workflow.WorkflowExecutionContext.EntityOperations; + /** * Test for Metadata relationship mapping service. */ public class MetadataMappingServiceTest { public static final String FALCON_USER = "falcon-user"; - private static final String LOGS_DIR = "/falcon/staging/feed/logs"; + private static final String LOGS_DIR = "jail://global:00/falcon/staging/feed/logs"; private static final String NOMINAL_TIME = "2014-01-01-01-00"; public static final String CLUSTER_ENTITY_NAME = "primary-cluster"; @@ -97,6 +102,7 @@ public class MetadataMappingServiceTest { "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join1/20140102"; public static final String OUTPUT_INSTANCE_PATHS_NO_DATE = "jail://global:00/falcon/imp-click-join1,jail://global:00/falcon/imp-click-join2"; + public static final String COUNTERS = "TIMETAKEN:36956,COPY:30,BYTESCOPIED:1000"; public static final String BROKER = "org.apache.activemq.ActiveMQConnectionFactory"; @@ -580,6 +586,26 @@ public class MetadataMappingServiceTest { Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(outputs1, outputs2)); } + @Test + public void testLineageForJobCounter() throws Exception { + setupForJobCounters(); + WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( + EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "IGNORE", "IGNORE", "IGNORE", "NONE"), + WorkflowExecutionContext.Type.POST_PROCESSING); + service.onSuccess(context); + debug(service.getGraph()); + GraphUtils.dump(service.getGraph()); + Graph graph = service.getGraph(); + + Vertex vertex = graph.getVertices("name", "sample-process/2014-01-01T01:00Z").iterator().next(); + Assert.assertEquals(vertex.getProperty("TIMETAKEN"), 36956L); + Assert.assertEquals(vertex.getProperty("COPY"), 30L); + Assert.assertEquals(vertex.getProperty("BYTESCOPIED"), 1000L); + Assert.assertEquals(getVerticesCount(service.getGraph()), 9); + Assert.assertEquals(getEdgesCount(service.getGraph()), 14); + verifyLineageGraphForJobCounters(context); + } + private void verifyUpdatedEdges(Process newProcess) { Vertex processVertex = getEntityVertex(newProcess.getName(), RelationshipType.PROCESS_ENTITY); @@ -946,6 +972,13 @@ public class MetadataMappingServiceTest { Assert.assertEquals(clusterVertex.getProperty(RelationshipProperty.NAME.getName()), context.getClusterName()); } + private void verifyLineageGraphForJobCounters(WorkflowExecutionContext context) throws Exception { + Vertex processVertex = getEntityVertex(PROCESS_ENTITY_NAME, + RelationshipType.PROCESS_ENTITY); + Assert.assertEquals(processVertex.getProperty("name"), PROCESS_ENTITY_NAME); + Assert.assertTrue(context.getCounters().length()>0); + } + private static String[] getTestMessageArgs(EntityOperations operation, String wfName, String outputFeedNames, String feedInstancePaths, String falconInputPaths, String falconInputFeeds) { @@ -995,6 +1028,36 @@ public class MetadataMappingServiceTest { }; } + private void setupForJobCounters() throws Exception { + cleanUp(); + service.init(); + // Add cluster + clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME, + "classification=production"); + List<Feed> inFeeds = new ArrayList<>(); + List<Feed> outFeeds = new ArrayList<>(); + + createJobCountersFileForTest(); + // Add process + processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity, + "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, + WORKFLOW_VERSION, inFeeds, outFeeds); + } + + private void createJobCountersFileForTest() throws Exception { + Path counterFile = new Path(LOGS_DIR, "counter.txt"); + OutputStream out = null; + try { + FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( + new Path(LOGS_DIR).toUri()); + out = fs.create(counterFile); + out.write(COUNTERS.getBytes()); + out.flush(); + } finally { + out.close(); + } + } + private void setup() throws Exception { cleanUp(); service.init(); http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/metrics/pom.xml ---------------------------------------------------------------------- diff --git a/metrics/pom.xml b/metrics/pom.xml index a0358db..36d9b50 100644 --- a/metrics/pom.xml +++ b/metrics/pom.xml @@ -32,6 +32,33 @@ <name>Apache Falcon Metrics</name> <packaging>jar</packaging> + <profiles> + <profile> + <id>hadoop-2</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-distcp</artifactId> + </dependency> + </dependencies> + </profile> + </profiles> + <dependencies> <dependency> http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/metrics/src/main/java/org/apache/falcon/job/FSReplicationCounters.java ---------------------------------------------------------------------- diff --git a/metrics/src/main/java/org/apache/falcon/job/FSReplicationCounters.java b/metrics/src/main/java/org/apache/falcon/job/FSReplicationCounters.java new file mode 100644 index 0000000..9dc7259 --- /dev/null +++ b/metrics/src/main/java/org/apache/falcon/job/FSReplicationCounters.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.job; + +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Obtain and store Filesystem Replication counters from FeedReplicator job. + */ +public class FSReplicationCounters extends JobCounters { + private static final Logger LOG = LoggerFactory.getLogger(FSReplicationCounters.class); + + public FSReplicationCounters() { + super(); + } + + + protected void parseJob(Job job, Counters jobCounters, boolean isDistCp) throws IOException { + if (isDistCp) { + populateReplicationCountersMap(jobCounters); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/metrics/src/main/java/org/apache/falcon/job/JobCounters.java ---------------------------------------------------------------------- diff --git a/metrics/src/main/java/org/apache/falcon/job/JobCounters.java b/metrics/src/main/java/org/apache/falcon/job/JobCounters.java new file mode 100644 index 0000000..275fbd5 --- /dev/null +++ b/metrics/src/main/java/org/apache/falcon/job/JobCounters.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.job; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.tools.mapred.CopyMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; + +/** + * Job Counters abstract class to be extended by supported job type. + */ +public abstract class JobCounters { + private static final Logger LOG = LoggerFactory.getLogger(JobCounters.class); + protected Map<String, Long> countersMap = null; + + public JobCounters() { + countersMap = new HashMap<String, Long>(); + } + + public void obtainJobCounters(Configuration conf, Job job, boolean isDistCp) throws IOException { + try { + long timeTaken = job.getFinishTime() - job.getStartTime(); + countersMap.put(ReplicationJobCountersList.TIMETAKEN.getName(), timeTaken); + Counters jobCounters = job.getCounters(); + parseJob(job, jobCounters, isDistCp); + } catch (Exception e) { + LOG.info("Exception occurred while obtaining job counters: {}", e); + } + } + + protected void populateReplicationCountersMap(Counters jobCounters) { + for(CopyMapper.Counter copyCounterVal : CopyMapper.Counter.values()) { + if (ReplicationJobCountersList.getCountersKey(copyCounterVal.name()) != null) { + Counter counter = jobCounters.findCounter(copyCounterVal); + if (counter != null) { + String counterName = counter.getName(); + long counterValue = counter.getValue(); + countersMap.put(counterName, counterValue); + } + } + } + } + + public void storeJobCounters(Configuration conf, Path counterFile) throws IOException { + FileSystem sourceFs = FileSystem.get(conf); + OutputStream out = null; + try { + out = sourceFs.create(counterFile); + for (Map.Entry<String, Long> counter : countersMap.entrySet()) { + out.write((counter.getKey() + ":" + counter.getValue()).getBytes()); + out.write("\n".getBytes()); + } + out.flush(); + } finally { + IOUtils.closeQuietly(out); + } + } + + public Map<String, Long> getCountersMap() { + return countersMap; + } + + protected abstract void parseJob(Job job, Counters jobCounters, boolean isDistCp) throws IOException; +} http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java ---------------------------------------------------------------------- diff --git a/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java b/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java new file mode 100644 index 0000000..e8b68ff --- /dev/null +++ b/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.job; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Job counters handler to initialize the required concrete class for obtaining job counters. + */ +public final class JobCountersHandler { + private static final Logger LOG = LoggerFactory.getLogger(JobCountersHandler.class); + private JobCountersHandler() { + } + + public static JobCounters getCountersType(String jobType) { + if (jobType.equals(JobType.FSREPLICATION.name())) { + return new FSReplicationCounters(); + } + + LOG.error("JobType is not supported:" + jobType); + + return null; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/metrics/src/main/java/org/apache/falcon/job/JobType.java ---------------------------------------------------------------------- diff --git a/metrics/src/main/java/org/apache/falcon/job/JobType.java b/metrics/src/main/java/org/apache/falcon/job/JobType.java new file mode 100644 index 0000000..456e57f --- /dev/null +++ b/metrics/src/main/java/org/apache/falcon/job/JobType.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.job; + +/** + * Types of the job for which counters need to obtain. + */ +public enum JobType { + FSREPLICATION +} http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/metrics/src/main/java/org/apache/falcon/job/ReplicationJobCountersList.java ---------------------------------------------------------------------- diff --git a/metrics/src/main/java/org/apache/falcon/job/ReplicationJobCountersList.java b/metrics/src/main/java/org/apache/falcon/job/ReplicationJobCountersList.java new file mode 100644 index 0000000..d8c3377 --- /dev/null +++ b/metrics/src/main/java/org/apache/falcon/job/ReplicationJobCountersList.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.job; + +/** + * List of counters for replication job. + */ +public enum ReplicationJobCountersList { + TIMETAKEN("TIMETAKEN", "time taken by the distcp job"), + BYTESCOPIED("BYTESCOPIED", "number of bytes copied"), + COPY("COPY", "number of files copied"); + + private final String name; + private final String description; + + ReplicationJobCountersList(String name, String description) { + this.name = name; + this.description = description; + } + + public String getName() { + return name; + } + + public String getDescription() { + return description; + } + + public static ReplicationJobCountersList getCountersKey(String counterKey) { + if (counterKey != null) { + for (ReplicationJobCountersList value : ReplicationJobCountersList.values()) { + if (counterKey.equals(value.getName())) { + return value; + } + } + } + + return null; + } + + @Override + public String toString() { + return getName(); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/metrics/src/test/java/org/apache/falcon/job/FSReplicationCountersTest.java ---------------------------------------------------------------------- diff --git a/metrics/src/test/java/org/apache/falcon/job/FSReplicationCountersTest.java b/metrics/src/test/java/org/apache/falcon/job/FSReplicationCountersTest.java new file mode 100644 index 0000000..abe8379 --- /dev/null +++ b/metrics/src/test/java/org/apache/falcon/job/FSReplicationCountersTest.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.job; + +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; + +/** + * Test for FS Replication Counters. + */ +public class FSReplicationCountersTest { + private List<String> countersList = new ArrayList<String>(); + private final String[] countersArgs = new String[] { "TIMETAKEN:5000", "BYTESCOPIED:1000L", "COPY:1" }; + + @BeforeClass + public void setUp() throws Exception { + for (String counters : countersArgs) { + String countersKey = counters.split(":")[0]; + countersList.add(countersKey); + } + } + + @Test + public void testObtainJobCounters() throws Exception { + for (String counters : countersArgs) { + String countersKey = counters.split(":")[0]; + Assert.assertEquals(countersKey, ReplicationJobCountersList.getCountersKey(countersKey).getName()); + } + + Assert.assertEquals(countersArgs.length, ReplicationJobCountersList.values().length); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java index b82f4e0..0dc09ee 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java @@ -56,6 +56,7 @@ public class FSReplicationWorkflowBuilder extends FeedReplicationWorkflowBuilder ACTION replication = unmarshalAction(REPLICATION_ACTION_TEMPLATE); addHDFSServersConfig(replication, src, target); addAdditionalReplicationProperties(replication); + enableCounters(replication); addTransition(replication, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); workflow.getDecisionOrForkOrJoin().add(replication); http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java index a7c19cd..5a62130 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java @@ -29,6 +29,7 @@ import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.Property; import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder; import org.apache.falcon.oozie.workflow.ACTION; import org.apache.falcon.oozie.workflow.CONFIGURATION; @@ -37,6 +38,7 @@ import org.apache.falcon.util.RuntimeProperties; import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.hadoop.fs.Path; +import java.util.List; import java.util.Properties; /** @@ -47,11 +49,24 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW protected static final String REPLICATION_ACTION_NAME = "replication"; private static final String MR_MAX_MAPS = "maxMaps"; private static final String MR_MAP_BANDWIDTH = "mapBandwidth"; + private static final String REPLICATION_JOB_COUNTER = "job.counter"; public FeedReplicationWorkflowBuilder(Feed entity) { super(entity, LifeCycle.REPLICATION); } + public boolean isCounterEnabled() throws FalconException { + if (entity.getProperties() != null) { + List<Property> propertyList = entity.getProperties().getProperties(); + for (Property prop : propertyList) { + if (prop.getName().equals(REPLICATION_JOB_COUNTER) && "true".equalsIgnoreCase(prop.getValue())) { + return true; + } + } + } + return false; + } + @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException { Cluster srcCluster = ConfigurationStore.get().get(EntityType.CLUSTER, buildPath.getName()); @@ -99,6 +114,16 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW } return action; } + + protected ACTION enableCounters(ACTION action) throws FalconException { + if (isCounterEnabled()) { + List<String> args = action.getJava().getArg(); + args.add("-counterLogDir"); + args.add("${logDir}/job-${nominalTime}/${srcClusterName == 'NA' ? '' : srcClusterName}"); + } + return action; + } + protected abstract WORKFLOWAPP getWorkflow(Cluster src, Cluster target) throws FalconException; @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java index cfce1ae..5e93027 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java @@ -91,6 +91,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { private Feed fsReplFeed; private Feed lifecycleRetentionFeed; private Feed retentionFeed; + private Feed fsReplFeedCounter; private static final String SRC_CLUSTER_PATH = "/feed/src-cluster.xml"; private static final String TRG_CLUSTER_PATH = "/feed/trg-cluster.xml"; @@ -99,6 +100,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { private static final String FS_REPLICATION_FEED = "/feed/fs-replication-feed.xml"; private static final String FS_RETENTION_LIFECYCLE_FEED = "/feed/fs-retention-lifecycle-feed.xml"; private static final String FS_RETENTION_ORIG_FEED = "/feed/fs-retention-feed.xml"; + private static final String FS_REPLICATION_FEED_COUNTER = "/feed/fs-replication-feed-counters.xml"; @BeforeClass public void setUpDFS() throws Exception { @@ -129,6 +131,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { feed = (Feed) storeEntity(EntityType.FEED, FEED); fsReplFeed = (Feed) storeEntity(EntityType.FEED, FS_REPLICATION_FEED); + fsReplFeedCounter = (Feed) storeEntity(EntityType.FEED, FS_REPLICATION_FEED_COUNTER); tableFeed = (Feed) storeEntity(EntityType.FEED, TABLE_FEED); lifecycleRetentionFeed = (Feed) storeEntity(EntityType.FEED, FS_RETENTION_LIFECYCLE_FEED); retentionFeed = (Feed) storeEntity(EntityType.FEED, FS_RETENTION_ORIG_FEED); @@ -336,6 +339,18 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { assertReplCoord(betaCoord, fsReplFeed, betaTrgCluster, pathsWithPartitions); } + @Test + public void testReplicationWithCounters() throws Exception { + OozieCoordinatorBuilder builder = OozieCoordinatorBuilder.get(fsReplFeedCounter, Tag.REPLICATION); + List<Properties> alphaCoords = builder.buildCoords(alphaTrgCluster, new Path("/alpha/falcon")); + final COORDINATORAPP alphaCoord = getCoordinator(trgMiniDFS, + alphaCoords.get(0).getProperty(OozieEntityBuilder.ENTITY_PATH)); + Assert.assertEquals(alphaCoord.getStart(), "2012-10-01T12:05Z"); + Assert.assertEquals(alphaCoord.getEnd(), "2012-10-01T12:11Z"); + String pathsWithPartitions = getPathsWithPartitions(srcCluster, alphaTrgCluster, fsReplFeedCounter); + assertReplCoord(alphaCoord, fsReplFeedCounter, alphaTrgCluster, pathsWithPartitions); + } + private String getPathsWithPartitions(Cluster sourceCluster, Cluster targetCluster, Feed aFeed) throws FalconException { String srcPart = FeedHelper.normalizePartitionExpression( @@ -363,12 +378,16 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { Assert.assertEquals(coord.getEnd(), SchemaHelper.formatDateUTC(endDate)); WORKFLOWAPP workflow = getWorkflowapp(trgMiniDFS.getFileSystem(), coord); - assertWorkflowDefinition(fsReplFeed, workflow, false); + assertWorkflowDefinition(aFeed, workflow, false); ACTION replicationActionNode = getAction(workflow, "replication"); JAVA replication = replicationActionNode.getJava(); List<String> args = replication.getArg(); - Assert.assertEquals(args.size(), 15); + if (args.contains("-counterLogDir")) { + Assert.assertEquals(args.size(), 17); + } else { + Assert.assertEquals(args.size(), 15); + } HashMap<String, String> props = getCoordProperties(coord); http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/oozie/src/test/resources/feed/fs-replication-feed-counters.xml ---------------------------------------------------------------------- diff --git a/oozie/src/test/resources/feed/fs-replication-feed-counters.xml b/oozie/src/test/resources/feed/fs-replication-feed-counters.xml new file mode 100644 index 0000000..230e2b0 --- /dev/null +++ b/oozie/src/test/resources/feed/fs-replication-feed-counters.xml @@ -0,0 +1,59 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<feed description="billing RC File" name="replication-test-counter" xmlns="uri:falcon:feed:0.1"> + <partitions> + <partition name="colo"/> + <partition name="eventTime"/> + <partition name="impressionHour"/> + <partition name="pricingModel"/> + </partitions> + + <groups>online,bi</groups> + + <frequency>minutes(5)</frequency> + <timezone>UTC</timezone> + <late-arrival cut-off="minutes(1)"/> + + <clusters> + <cluster partition="${cluster.colo}" type="source" name="corp1"> + <validity end="2099-01-01T00:00Z" start="2012-10-01T12:00Z"/> + <retention action="delete" limit="days(10000)"/> + </cluster> + <cluster type="target" name="alpha"> + <validity end="2012-10-01T12:11Z" start="2012-10-01T12:05Z"/> + <retention action="delete" limit="days(10000)"/> + <locations> + <location path="/localDC/rc/billing/ua1/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/" type="data"/> + </locations> + </cluster> + </clusters> + + <locations> + <location path="/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/" type="data"/> + <location path="/data/regression/fetlrc/billing/stats" type="stats"/> + <location path="/data/regression/fetlrc/billing/metadata" type="meta"/> + </locations> + + <ACL permission="0x755" group="group" owner="fetl"/> + <schema provider="protobuf" location="/databus/streams_local/click_rr/schema/"/> + <properties> + <property name="maxMaps" value="33" /> + <property name="mapBandwidth" value="2" /> + <property name="job.counter" value="true" /> + </properties> +</feed> http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/replication/pom.xml ---------------------------------------------------------------------- diff --git a/replication/pom.xml b/replication/pom.xml index 3cc96fc..78c50f3 100644 --- a/replication/pom.xml +++ b/replication/pom.xml @@ -59,6 +59,10 @@ <groupId>org.apache.falcon</groupId> <artifactId>falcon-common</artifactId> </dependency> + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-metrics</artifactId> + </dependency> <dependency> <groupId>org.slf4j</groupId> http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java ---------------------------------------------------------------------- diff --git a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java index a226058..e97e84e 100644 --- a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java +++ b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java @@ -27,12 +27,17 @@ import org.apache.falcon.FalconException; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.Storage; import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.job.JobCountersHandler; +import org.apache.falcon.job.JobType; +import org.apache.falcon.job.JobCounters; import org.apache.falcon.util.ReplicationDistCpOption; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.tools.DistCp; import org.apache.hadoop.tools.DistCpOptions; import org.apache.hadoop.util.Tool; @@ -89,7 +94,18 @@ public class FeedReplicator extends Configured implements Tool { ? new CustomReplicator(conf, options) : new DistCp(conf, options); LOG.info("Started DistCp"); - distCp.execute(); + Job job = distCp.execute(); + + if (cmd.hasOption("counterLogDir") + && job.getStatus().getState() == JobStatus.State.SUCCEEDED) { + LOG.info("Gathering counters for the the Feed Replication job"); + Path counterFile = new Path(cmd.getOptionValue("counterLogDir"), "counter.txt"); + JobCounters fsReplicationCounters = JobCountersHandler.getCountersType(JobType.FSREPLICATION.name()); + if (fsReplicationCounters != null) { + fsReplicationCounters.obtainJobCounters(conf, job, true); + fsReplicationCounters.storeJobCounters(conf, counterFile); + } + } if (includePathSet) { executePostProcessing(conf, options); // this only applies for FileSystem Storage. @@ -161,6 +177,10 @@ public class FeedReplicator extends Configured implements Tool { opt.setRequired(false); options.addOption(opt); + opt = new Option("counterLogDir", true, "log directory to store job counter file"); + opt.setRequired(false); + options.addOption(opt); + return new GnuParser().parse(options, args); }
