Updated Branches: refs/heads/sqoop2 e8869ab34 -> 2c9a4eb46
SQOOP-671: Mapreduce counters are not used in generated mapreduce jobs (Hari Shreedharan via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/2c9a4eb4 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/2c9a4eb4 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/2c9a4eb4 Branch: refs/heads/sqoop2 Commit: 2c9a4eb46c8e51834be946439b40e3116203581a Parents: e8869ab Author: Jarek Jarcec Cecho <[email protected]> Authored: Wed Nov 14 10:56:53 2012 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Wed Nov 14 10:56:53 2012 -0800 ---------------------------------------------------------------------- .../sqoop/submission/counter/SqoopCounters.java | 25 +++++++++++++++ .../connector/jdbc/GenericJdbcImportExtractor.java | 9 +++++- .../java/org/apache/sqoop/job/mr/SqoopMapper.java | 4 ++- .../java/org/apache/sqoop/job/TestHdfsLoad.java | 5 +++ .../java/org/apache/sqoop/job/TestMapReduce.java | 5 +++ .../java/org/apache/sqoop/job/etl/Extractor.java | 13 ++++++++ 6 files changed, 59 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c9a4eb4/common/src/main/java/org/apache/sqoop/submission/counter/SqoopCounters.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/submission/counter/SqoopCounters.java b/common/src/main/java/org/apache/sqoop/submission/counter/SqoopCounters.java new file mode 100644 index 0000000..75f3980 --- /dev/null +++ b/common/src/main/java/org/apache/sqoop/submission/counter/SqoopCounters.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.submission.counter; + +/** + * + */ +public enum SqoopCounters { + ROWS_READ; +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c9a4eb4/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java index 1b3fcff..b856ce6 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java @@ -32,6 +32,7 @@ public class GenericJdbcImportExtractor extends Extractor { public static final Logger LOG = Logger.getLogger(GenericJdbcImportExtractor.class); + private long rowsRead = 0; @Override public void run(ImmutableContext context, Object connectionC, Object jobC, Partition partition, DataWriter writer) { String driver = context.getString( @@ -52,6 +53,7 @@ public class GenericJdbcImportExtractor extends Extractor { query = query.replace( GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, conditions); LOG.debug("Using query: " + query); + rowsRead = 0; ResultSet resultSet = executor.executeQuery(query); try { @@ -63,8 +65,8 @@ public class GenericJdbcImportExtractor extends Extractor { array[i] = resultSet.getObject(i+1); } writer.writeArrayRecord(array); + rowsRead++; } - } catch (SQLException e) { throw new SqoopException( GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0004, e); @@ -74,4 +76,9 @@ public class GenericJdbcImportExtractor extends Extractor { } } + @Override + public long getRowsRead() { + return rowsRead; + } + } http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c9a4eb4/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index 1c0f3aa..fcedf52 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -31,6 +31,7 @@ import org.apache.sqoop.job.PrefixContext; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.io.DataWriter; +import org.apache.sqoop.submission.counter.SqoopCounters; import org.apache.sqoop.utils.ClassUtils; /** @@ -75,7 +76,8 @@ public class SqoopMapper try { extractor.run(subContext, configConnection, configJob, split.getPartition(), new MapDataWriter(context)); - + context.getCounter(SqoopCounters.ROWS_READ) + .increment(extractor.getRowsRead()); } catch (Exception e) { throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c9a4eb4/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java index 875a123..2287b06 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java @@ -230,5 +230,10 @@ public class TestHdfsLoad extends TestCase { writer.writeArrayRecord(array); } } + + @Override + public long getRowsRead() { + return NUMBER_OF_ROWS_PER_ID; + } } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c9a4eb4/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java index 6e49cc2..6dcf784 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java @@ -142,6 +142,11 @@ public class TestMapReduce extends TestCase { String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)}); } } + + @Override + public long getRowsRead() { + return NUMBER_OF_ROWS_PER_PARTITION; + } } public static class DummyOutputFormat http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c9a4eb4/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java index ba04be9..e824b98 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java @@ -32,4 +32,17 @@ public abstract class Extractor { Partition partition, DataWriter writer); + /** + * Return the number of rows read by the last call to + * {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, org.apache.sqoop.job.etl.Partition, org.apache.sqoop.job.io.DataWriter) } + * method. This method returns only the number of rows read in the last call, + * and not a cumulative total of the number of rows read by this Extractor + * since its creation. If no calls were made to the run method, this method's + * behavior is undefined. + * + * @return the number of rows read by the last call to + * {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, org.apache.sqoop.job.etl.Partition, org.apache.sqoop.job.io.DataWriter) } + */ + public abstract long getRowsRead(); + }
