Updated Branches: refs/heads/sqoop2 073d37c72 -> 3f574f981
SQOOP-863: Sqoop2: Introduce ProgressThread into Extractor and Loader (Jarek Jarcec Cecho via Kate Ting) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/3f574f98 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/3f574f98 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/3f574f98 Branch: refs/heads/sqoop2 Commit: 3f574f9813edaa049bc833b8843c3039e75f2dbc Parents: 073d37c Author: Kate Ting <[email protected]> Authored: Sun Apr 7 00:58:27 2013 -0400 Committer: Kate Ting <[email protected]> Committed: Sun Apr 7 00:58:27 2013 -0400 ---------------------------------------------------------------------- .../org/apache/sqoop/job/mr/ProgressRunnable.java | 47 +++++++++++++++ .../java/org/apache/sqoop/job/mr/SqoopMapper.java | 24 ++++++- .../java/org/apache/sqoop/job/mr/SqoopReducer.java | 33 +++++++++- 3 files changed, 96 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/3f574f98/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java new file mode 100644 index 0000000..7e87250 --- /dev/null +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job.mr; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; + + +/** + * Runnable that will ping mapreduce context about progress. + */ +public class ProgressRunnable implements Runnable { + + public static final Log LOG = LogFactory.getLog(ProgressRunnable.class); + + /** + * Context class that we should use for reporting progress. + */ + private final TaskInputOutputContext context; + + public ProgressRunnable(final TaskInputOutputContext ctxt) { + this.context = ctxt; + } + + @Override + public void run() { + LOG.debug("Auto-progress thread reporting progress"); + this.context.progress(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/3f574f98/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index 2a82303..7715d5f 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -18,6 +18,9 @@ package org.apache.sqoop.job.mr; import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,11 +41,14 @@ import org.apache.sqoop.utils.ClassUtils; /** * A mapper to perform map function. */ -public class SqoopMapper - extends Mapper<SqoopSplit, NullWritable, Data, NullWritable> { +public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, Data, NullWritable> { - public static final Log LOG = - LogFactory.getLog(SqoopMapper.class.getName()); + public static final Log LOG = LogFactory.getLog(SqoopMapper.class); + + /** + * Service for reporting progress to mapreduce. + */ + private final ScheduledExecutorService progressService = Executors.newSingleThreadScheduledExecutor(); @Override public void run(Context context) throws IOException, InterruptedException { @@ -76,6 +82,9 @@ public class SqoopMapper ExtractorContext extractorContext = new ExtractorContext(subContext, new MapDataWriter(context)); try { + LOG.info("Starting progress service"); + progressService.scheduleAtFixedRate(new ProgressRunnable(context), 0, 2, TimeUnit.MINUTES); + LOG.info("Running extractor class " + extractorName); extractor.extract(extractorContext, configConnection, configJob, split.getPartition()); LOG.info("Extractor has finished"); @@ -83,6 +92,13 @@ public class SqoopMapper .increment(extractor.getRowsRead()); } catch (Exception e) { throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e); + } finally { + LOG.info("Stopping progress service"); + progressService.shutdown(); + if(!progressService.awaitTermination(5, TimeUnit.SECONDS)) { + LOG.info("Stopping progress service with shutdownNow"); + progressService.shutdownNow(); + } } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3f574f98/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java index d236148..e4ad6ba 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java @@ -23,13 +23,38 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import org.apache.sqoop.job.io.Data; +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + /** * A reducer to perform reduce function. */ -public class SqoopReducer - extends Reducer<Data, NullWritable, Data, NullWritable> { +public class SqoopReducer extends Reducer<Data, NullWritable, Data, NullWritable> { + + public static final Log LOG = LogFactory.getLog(SqoopReducer.class); + + /** + * Service for reporting progress to mapreduce. + */ + private final ScheduledExecutorService progressService = Executors.newSingleThreadScheduledExecutor(); - public static final Log LOG = - LogFactory.getLog(SqoopReducer.class.getName()); + @Override + public void run(Context context) throws IOException, InterruptedException { + try { + LOG.info("Starting progress service"); + progressService.scheduleAtFixedRate(new ProgressRunnable(context), 0, 2, TimeUnit.MINUTES); + // Delegating all functionality to our parent + super.run(context); + } finally { + LOG.info("Stopping progress service"); + progressService.shutdown(); + if(!progressService.awaitTermination(5, TimeUnit.SECONDS)) { + LOG.info("Stopping progress service with shutdownNow"); + progressService.shutdownNow(); + } + } + } }
