Repository: falcon Updated Branches: refs/heads/0.10 de422a2f2 -> ef00c3e41
FALCON-2075 Falcon HiveDR tasks do not report progress and can get killed Author: Venkat Ranganathan <[email protected]> Reviewers: "Praveen Adlakha <[email protected]>, Balu Vellanki <[email protected]>" Closes #230 from vrangan/FALCON-2075 (cherry picked from commit ec4a273a8776ebdd1c50e062cad46c981b1d0122) Signed-off-by: bvellanki <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/ef00c3e4 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/ef00c3e4 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/ef00c3e4 Branch: refs/heads/0.10 Commit: ef00c3e41a041235f834d2f46ce5b3ff172dc60f Parents: de422a2 Author: Venkat Ranganathan <[email protected]> Authored: Tue Jul 19 13:23:20 2016 -0700 Committer: bvellanki <[email protected]> Committed: Tue Jul 19 13:23:28 2016 -0700 ---------------------------------------------------------------------- .../apache/falcon/hive/mapreduce/CopyMapper.java | 14 ++++++++++++-- .../apache/falcon/hive/mapreduce/CopyReducer.java | 16 +++++++++++++++- 2 files changed, 27 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/ef00c3e4/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java index e2297ef..5cd7e74 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java @@ -28,10 +28,12 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.io.IOException; import java.sql.SQLException; import java.util.List; +import java.util.concurrent.TimeUnit; /** * Map class for Hive DR. @@ -40,6 +42,7 @@ public class CopyMapper extends Mapper<LongWritable, Text, Text, Text> { private static final Logger LOG = LoggerFactory.getLogger(CopyMapper.class); private EventUtils eventUtils; + ScheduledThreadPoolExecutor timer; @Override protected void setup(Context context) throws IOException, InterruptedException { @@ -54,15 +57,22 @@ public class CopyMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, - Context context) throws IOException, InterruptedException { + final Context context) throws IOException, InterruptedException { LOG.debug("Processing Event value: {}", value.toString()); - + timer = new ScheduledThreadPoolExecutor(1); + timer.scheduleAtFixedRate(new Runnable() { + public void run() { + System.out.println("Hive DR copy mapper progress heart beat"); + context.progress(); + } + }, 0, 30, TimeUnit.SECONDS); try { eventUtils.processEvents(value.toString()); } catch (Exception e) { LOG.error("Exception in processing events:", e); throw new IOException(e); } finally { + timer.shutdownNow(); cleanup(context); } List<ReplicationStatus> replicationStatusList = eventUtils.getListReplicationStatus(); http://git-wip-us.apache.org/repos/asf/falcon/blob/ef00c3e4/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java index 50cb4b2..f4bb31c 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java @@ -35,12 +35,15 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * Reducer class for Hive DR. */ public class CopyReducer extends Reducer<Text, Text, Text, Text> { private DRStatusStore hiveDRStore; + private ScheduledThreadPoolExecutor timer; @Override protected void setup(Context context) throws IOException, InterruptedException { @@ -62,9 +65,18 @@ public class CopyReducer extends Reducer<Text, Text, Text, Text> { } @Override - protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { + protected void reduce(Text key, Iterable<Text> values, final Context context) + throws IOException, InterruptedException { List<ReplicationStatus> replStatusList = new ArrayList<ReplicationStatus>(); ReplicationStatus rs; + timer = new ScheduledThreadPoolExecutor(1); + timer.scheduleAtFixedRate(new Runnable() { + public void run() { + System.out.println("Hive DR copy reducer progress heart beat"); + context.progress(); + } + }, 0, 30, TimeUnit.SECONDS); + try { for (Text value : values) { String[] fields = (value.toString()).split("\t"); @@ -76,6 +88,8 @@ public class CopyReducer extends Reducer<Text, Text, Text, Text> { hiveDRStore.updateReplicationStatus(key.toString(), sortStatusList(replStatusList)); } catch (HiveReplicationException e) { throw new IOException(e); + } finally { + timer.shutdownNow(); } }
