Repository: falcon Updated Branches: refs/heads/master 2bf90130d -> baab41425
FALCON-1480 Gather data transfer details of Hive DR. Contributed by Peeyush Bishnoi. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/baab4142 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/baab4142 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/baab4142 Branch: refs/heads/master Commit: baab41425fbdf2e7e6dc4d55195e826d540b444c Parents: 2bf9013 Author: Ajay Yadava <[email protected]> Authored: Thu Nov 26 15:27:54 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Thu Nov 26 17:12:13 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 4 ++ addons/hivedr/pom.xml | 5 +- .../java/org/apache/falcon/hive/HiveDRArgs.java | 3 +- .../java/org/apache/falcon/hive/HiveDRTool.java | 18 +++++- .../falcon/hive/mapreduce/CopyMapper.java | 11 ++++ .../org/apache/falcon/hive/util/EventUtils.java | 19 +++++- .../apache/falcon/hive/util/HiveDRUtils.java | 13 ++++ .../hive-disaster-recovery-secure-workflow.xml | 2 + .../hive-disaster-recovery-workflow.xml | 2 + .../falcon/job/HiveReplicationCounters.java | 62 ++++++++++++++++++++ .../apache/falcon/job/JobCountersHandler.java | 2 + .../java/org/apache/falcon/job/JobType.java | 3 +- 12 files changed, 139 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 69aa1f4..a2ca7f9 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,7 +9,11 @@ Trunk (Unreleased) INCOMPATIBLE CHANGES NEW FEATURES + FALCON-1480 Gather data transfer details of Hive DR. (Peeyush Bishnoi via Ajay Yadava) + FALCON-1234 State Store for instances scheduled by Falcon (Pavan Kolamuri via Pallavi Rao) + + FALCON-1480 Gather data transfer details of Hive DR(Peeyush Bishnoi via Ajay Yadava) FALCON-1588 Add ability to provide the path for recipe files in command line(Peeyush Bishnoi via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/addons/hivedr/pom.xml ---------------------------------------------------------------------- diff --git a/addons/hivedr/pom.xml b/addons/hivedr/pom.xml index f98e8c4..346ffdb 100644 --- a/addons/hivedr/pom.xml +++ b/addons/hivedr/pom.xml @@ -99,7 +99,10 @@ <artifactId>falcon-hadoop-dependencies</artifactId> <scope>test</scope> </dependency> - + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-metrics</artifactId> + </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java index 574524d..5490232 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java @@ -78,7 +78,8 @@ public enum HiveDRArgs { FALCON_LIBPATH("falconLibPath", "Falcon Lib Path for Jar files", false), KEEP_HISTORY("keepHistory", "Keep history of events file generated", false), - EXECUTION_STAGE("executionStage", "Flag for workflow stage execution", false); + EXECUTION_STAGE("executionStage", "Flag for workflow stage execution", false), + COUNTER_LOGDIR("counterLogDir", "Log directory to store counter file", false); private final String name; private final String description; http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java index df16c40..e141800 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java @@ -31,6 +31,9 @@ import org.apache.falcon.hive.util.FileUtils; import org.apache.falcon.hive.util.HiveDRStatusStore; import org.apache.falcon.hive.util.HiveDRUtils; import org.apache.falcon.hive.util.HiveMetastoreUtils; +import org.apache.falcon.job.JobCounters; +import org.apache.falcon.job.JobCountersHandler; +import org.apache.falcon.job.JobType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; @@ -40,6 +43,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.util.Tool; @@ -99,7 +103,19 @@ public class HiveDRTool extends Configured implements Tool { } try { - execute(); + Job job = execute(); + if ((job != null) && (inputOptions.getExecutionStage().equalsIgnoreCase( + HiveDRUtils.ExecutionStage.EXPORT.name()))) { + if ((job.getStatus().getState() == JobStatus.State.SUCCEEDED) + && (job.getConfiguration().get("counterLogDir") != null)) { + LOG.info("Obtaining job replication counters for Hive DR job"); + Path counterFile = new Path(job.getConfiguration().get("counterLogDir"), "counter.txt"); + JobCounters hiveReplicationCounters = JobCountersHandler.getCountersType( + JobType.HIVEREPLICATION.name()); + hiveReplicationCounters.obtainJobCounters(job.getConfiguration(), job, false); + hiveReplicationCounters.storeJobCounters(job.getConfiguration(), counterFile); + } + } } catch (Exception e) { System.err.println("Exception encountered " + e.getMessage()); e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java index 5eb8acb..08e0551 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java @@ -22,6 +22,7 @@ import org.apache.falcon.hive.HiveDRArgs; import org.apache.falcon.hive.util.EventUtils; import org.apache.falcon.hive.util.HiveDRUtils; import org.apache.falcon.hive.util.ReplicationStatus; +import org.apache.falcon.job.ReplicationJobCountersList; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; @@ -70,6 +71,16 @@ public class CopyMapper extends Mapper<LongWritable, Text, Text, Text> { context.write(new Text(rs.getJobName()), new Text(rs.toString())); } } + + // In case of export stage, populate custom counters + if (context.getConfiguration().get(HiveDRArgs.EXECUTION_STAGE.getName()) + .equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name()) + && !eventUtils.isCountersMapEmtpy()) { + context.getCounter(ReplicationJobCountersList.BYTESCOPIED).increment( + eventUtils.getCounterValue(ReplicationJobCountersList.BYTESCOPIED.getName())); + context.getCounter(ReplicationJobCountersList.COPY).increment( + eventUtils.getCounterValue(ReplicationJobCountersList.COPY.getName())); + } } protected void cleanup(Context context) throws IOException, InterruptedException { http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java index f8397ff..d075bfb 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java @@ -27,11 +27,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.tools.DistCp; import org.apache.hadoop.tools.DistCpOptions; import org.apache.hive.hcatalog.api.repl.Command; import org.apache.hive.hcatalog.api.repl.ReplicationUtils; -import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +45,9 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; /** @@ -80,6 +83,8 @@ public class EventUtils { private Statement sourceStatement = null; private Statement targetStatement = null; + private Map<String, Long> countersMap = null; + private List<ReplicationStatus> listReplicationStatus; public EventUtils(Configuration conf) { @@ -97,6 +102,7 @@ public class EventUtils { targetNNKerberosPrincipal = conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName()); sourceCleanUpList = new ArrayList<Path>(); targetCleanUpList = new ArrayList<Path>(); + countersMap = new HashMap<>(); } public void setupConnection() throws Exception { @@ -312,6 +318,9 @@ public class EventUtils { Job distcpJob = distCp.execute(); LOG.info("Distp Hadoop job: {}", distcpJob.getJobID().toString()); LOG.info("Completed DistCp"); + if (distcpJob.getStatus().getState() == JobStatus.State.SUCCEEDED) { + countersMap = HiveDRUtils.fetchReplicationCounters(conf, distcpJob); + } } public DistCpOptions getDistCpOptions(List<Path> srcStagingPaths) { @@ -338,6 +347,14 @@ public class EventUtils { return distcpOptions; } + public Long getCounterValue(String counterKey) { + return countersMap.get(counterKey); + } + + public boolean isCountersMapEmtpy() { + return countersMap.size() == 0 ? true : false; + } + public void cleanEventsDirectory() throws IOException { LOG.info("Cleaning staging directory"); cleanupEventLocations(sourceCleanUpList, sourceFileSystem); http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java index d9d6ab0..dff0803 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java @@ -18,13 +18,18 @@ package org.apache.falcon.hive.util; +import org.apache.falcon.job.JobCounters; +import org.apache.falcon.job.JobCountersHandler; +import org.apache.falcon.job.JobType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Shell; import java.io.File; import java.io.IOException; import java.util.List; +import java.util.Map; /** * Hive replication utility class. @@ -83,4 +88,12 @@ public final class HiveDRUtils { } return path; } + + public static Map<String, Long> fetchReplicationCounters(Configuration conf, + Job job) throws IOException, InterruptedException { + JobCounters hiveReplicationCounters = JobCountersHandler.getCountersType( + JobType.HIVEREPLICATION.name()); + hiveReplicationCounters.obtainJobCounters(conf, job, true); + return hiveReplicationCounters.getCountersMap(); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml ---------------------------------------------------------------------- diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml index 74902b4..0494cf6 100644 --- a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml +++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml @@ -244,6 +244,8 @@ <arg>${drJobName}-${nominalTime}</arg> <arg>-executionStage</arg> <arg>export</arg> + <arg>-counterLogDir</arg> + <arg>${logDir}/job-${nominalTime}/${srcClusterName == 'NA' ? '' : srcClusterName}/</arg> </java> <ok to="import-dr-replication"/> <error to="fail"/> http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml ---------------------------------------------------------------------- diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml index 72d40a3..296e049 100644 --- a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml +++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml @@ -158,6 +158,8 @@ <arg>${drJobName}-${nominalTime}</arg> <arg>-executionStage</arg> <arg>export</arg> + <arg>-counterLogDir</arg> + <arg>${logDir}/job-${nominalTime}/${srcClusterName == 'NA' ? '' : srcClusterName}/</arg> </java> <ok to="import-dr-replication"/> <error to="fail"/> http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/metrics/src/main/java/org/apache/falcon/job/HiveReplicationCounters.java ---------------------------------------------------------------------- diff --git a/metrics/src/main/java/org/apache/falcon/job/HiveReplicationCounters.java b/metrics/src/main/java/org/apache/falcon/job/HiveReplicationCounters.java new file mode 100644 index 0000000..847ac34 --- /dev/null +++ b/metrics/src/main/java/org/apache/falcon/job/HiveReplicationCounters.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.job; + +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Obtain and store Hive Replication counters. + */ +public class HiveReplicationCounters extends JobCounters { + private static final Logger LOG = LoggerFactory.getLogger(HiveReplicationCounters.class); + + public HiveReplicationCounters() { + super(); + } + + + protected void parseJob(Job job, Counters jobCounters, boolean isDistCp) throws IOException { + if (isDistCp) { + populateReplicationCountersMap(jobCounters); + } else { + populateCustomCountersMap(jobCounters); + } + } + + private void populateCustomCountersMap(Counters jobCounters) { + for (ReplicationJobCountersList counterVal : ReplicationJobCountersList.values()) { + if (counterVal == ReplicationJobCountersList.TIMETAKEN) { + continue; + } + + Counter counter = jobCounters.findCounter(counterVal); + if (counter != null) { + String counterName = counter.getName(); + long counterValue = counter.getValue(); + countersMap.put(counterName, counterValue); + } + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java ---------------------------------------------------------------------- diff --git a/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java b/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java index e8b68ff..391c4a2 100644 --- a/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java +++ b/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java @@ -32,6 +32,8 @@ public final class JobCountersHandler { public static JobCounters getCountersType(String jobType) { if (jobType.equals(JobType.FSREPLICATION.name())) { return new FSReplicationCounters(); + } else if (jobType.equals(JobType.HIVEREPLICATION.name())) { + return new HiveReplicationCounters(); } LOG.error("JobType is not supported:" + jobType); http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/metrics/src/main/java/org/apache/falcon/job/JobType.java ---------------------------------------------------------------------- diff --git a/metrics/src/main/java/org/apache/falcon/job/JobType.java b/metrics/src/main/java/org/apache/falcon/job/JobType.java index 456e57f..ba45f76 100644 --- a/metrics/src/main/java/org/apache/falcon/job/JobType.java +++ b/metrics/src/main/java/org/apache/falcon/job/JobType.java @@ -22,5 +22,6 @@ package org.apache.falcon.job; * Types of the job for which counters need to obtain. */ public enum JobType { - FSREPLICATION + FSREPLICATION, + HIVEREPLICATION }
