Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 22289d236 -> 7f53eb22e


SQOOP-1929: Sqoop2: Track number of records written in Loader

(Veena Basavaraj via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/7f53eb22
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/7f53eb22
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/7f53eb22

Branch: refs/heads/sqoop2
Commit: 7f53eb22e348ef1b9171eedb45063e552ce0281d
Parents: 22289d2
Author: Abraham Elmahrek <[email protected]>
Authored: Mon Dec 22 18:14:23 2014 -0800
Committer: Abraham Elmahrek <[email protected]>
Committed: Mon Dec 22 18:14:23 2014 -0800

----------------------------------------------------------------------
 .../sqoop/submission/counter/SqoopCounters.java |   3 +-
 .../sqoop/connector/jdbc/GenericJdbcLoader.java |  28 +++--
 .../sqoop/connector/hdfs/HdfsExtractor.java     |   8 +-
 .../apache/sqoop/connector/hdfs/HdfsLoader.java |  15 ++-
 .../sqoop/connector/kafka/KafkaLoader.java      |  10 ++
 .../apache/sqoop/connector/kite/KiteLoader.java |  14 ++-
 execution/mapreduce/pom.xml                     |   6 +-
 .../job/mr/SqoopOutputFormatLoadExecutor.java   |  59 ++++-----
 .../org/apache/sqoop/job/TestMapReduce.java     |  10 ++
 .../mr/TestSqoopOutputFormatLoadExecutor.java   | 123 +++++++++++++------
 .../java/org/apache/sqoop/job/etl/Loader.java   |  12 ++
 11 files changed, 201 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/7f53eb22/common/src/main/java/org/apache/sqoop/submission/counter/SqoopCounters.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/sqoop/submission/counter/SqoopCounters.java 
b/common/src/main/java/org/apache/sqoop/submission/counter/SqoopCounters.java
index 75f3980..dd9dd68 100644
--- 
a/common/src/main/java/org/apache/sqoop/submission/counter/SqoopCounters.java
+++ 
b/common/src/main/java/org/apache/sqoop/submission/counter/SqoopCounters.java
@@ -21,5 +21,6 @@ package org.apache.sqoop.submission.counter;
  *
  */
 public enum SqoopCounters {
-  ROWS_READ;
+  ROWS_READ,
+  ROWS_WRITTEN
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/7f53eb22/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java
 
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java
index 6340a70..31fd876 100644
--- 
a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java
+++ 
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java
@@ -28,6 +28,7 @@ public class GenericJdbcLoader extends 
Loader<LinkConfiguration, ToJobConfigurat
   public static final int DEFAULT_BATCHES_PER_TRANSACTION = 100;
   private int rowsPerBatch = DEFAULT_ROWS_PER_BATCH;
   private int batchesPerTransaction = DEFAULT_BATCHES_PER_TRANSACTION;
+  private long rowsWritten = 0;
 
   @Override
   public void load(LoaderContext context, LinkConfiguration linkConfig, 
ToJobConfiguration toJobConfig) throws Exception{
@@ -41,27 +42,28 @@ public class GenericJdbcLoader extends 
Loader<LinkConfiguration, ToJobConfigurat
     String sql = 
context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL);
     executor.beginBatch(sql);
     try {
-      int numberOfRows = 0;
-      int numberOfBatches = 0;
+      int numberOfRowsPerBatch = 0;
+      int numberOfBatchesPerTransaction = 0;
       Object[] array;
 
       while ((array = context.getDataReader().readArrayRecord()) != null) {
-        numberOfRows++;
+        numberOfRowsPerBatch++;
         executor.addBatch(array);
 
-        if (numberOfRows == rowsPerBatch) {
-          numberOfBatches++;
-          if (numberOfBatches == batchesPerTransaction) {
+        if (numberOfRowsPerBatch == rowsPerBatch) {
+          numberOfBatchesPerTransaction++;
+          if (numberOfBatchesPerTransaction == batchesPerTransaction) {
             executor.executeBatch(true);
-            numberOfBatches = 0;
+            numberOfBatchesPerTransaction = 0;
           } else {
             executor.executeBatch(false);
           }
-          numberOfRows = 0;
+          numberOfRowsPerBatch = 0;
         }
+        rowsWritten ++;
       }
 
-      if (numberOfRows != 0 || numberOfBatches != 0) {
+      if (numberOfRowsPerBatch != 0 || numberOfBatchesPerTransaction != 0) {
         // execute and commit the remaining rows
         executor.executeBatch(true);
       }
@@ -73,4 +75,12 @@ public class GenericJdbcLoader extends 
Loader<LinkConfiguration, ToJobConfigurat
     }
   }
 
+  /* (non-Javadoc)
+   * @see org.apache.sqoop.job.etl.Loader#getRowsWritten()
+   */
+  @Override
+  public long getRowsWritten() {
+    return rowsWritten;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/7f53eb22/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
index 3c417b4..9f3367b 100644
--- 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
+++ 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
@@ -48,7 +48,7 @@ public class HdfsExtractor extends 
Extractor<LinkConfiguration, FromJobConfigura
 
   private Configuration conf;
   private DataWriter dataWriter;
-  private long rowRead = 0;
+  private long rowsRead = 0;
 
   @Override
   public void extract(ExtractorContext context, LinkConfiguration 
linkConfiguration,
@@ -109,7 +109,7 @@ public class HdfsExtractor extends 
Extractor<LinkConfiguration, FromJobConfigura
     Text line = new Text();
     boolean hasNext = filereader.next(line);
     while (hasNext) {
-      rowRead++;
+      rowsRead++;
       if (HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)) {
         dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, 
fromJobConfiguration, line.toString()));
       } else {
@@ -176,7 +176,7 @@ public class HdfsExtractor extends 
Extractor<LinkConfiguration, FromJobConfigura
       } else {
         next = fileseeker.getPos();
       }
-      rowRead++;
+      rowsRead++;
       if (HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)) {
         dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, 
fromJobConfiguration, line.toString()));
       } else {
@@ -189,7 +189,7 @@ public class HdfsExtractor extends 
Extractor<LinkConfiguration, FromJobConfigura
 
   @Override
   public long getRowsRead() {
-    return rowRead;
+    return rowsRead;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/7f53eb22/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
index 05b0230..556c112 100644
--- 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
+++ 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
@@ -38,6 +38,9 @@ import org.apache.sqoop.job.etl.LoaderContext;
 import org.apache.sqoop.utils.ClassUtils;
 
 public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
+
+  private long rowsWritten = 0;
+
   /**
    * Load data to target.
    *
@@ -79,19 +82,21 @@ public class HdfsLoader extends Loader<LinkConfiguration, 
ToJobConfiguration> {
 
       GenericHdfsWriter filewriter = getWriter(toJobConfig);
 
-      filewriter.initialize(filepath,conf,codec);
+      filewriter.initialize(filepath, conf, codec);
 
       if (HdfsUtils.hasCustomFormat(linkConfiguration, toJobConfig)) {
         Object[] record;
 
         while ((record = reader.readArrayRecord()) != null) {
           filewriter.write(HdfsUtils.formatRecord(linkConfiguration, 
toJobConfig, record));
+          rowsWritten++;
         }
       } else {
         String record;
 
         while ((record = reader.readTextRecord()) != null) {
           filewriter.write(record);
+          rowsWritten++;
         }
       }
       filewriter.destroy();
@@ -142,4 +147,12 @@ public class HdfsLoader extends Loader<LinkConfiguration, 
ToJobConfiguration> {
     return codec.getDefaultExtension();
   }
 
+  /* (non-Javadoc)
+   * @see org.apache.sqoop.job.etl.Loader#getRowsWritten()
+   */
+  @Override
+  public long getRowsWritten() {
+    return rowsWritten;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/7f53eb22/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaLoader.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaLoader.java
 
b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaLoader.java
index 5d79516..1c08f60 100644
--- 
a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaLoader.java
+++ 
b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaLoader.java
@@ -38,6 +38,7 @@ public class KafkaLoader extends 
Loader<LinkConfiguration,ToJobConfiguration> {
   private List<KeyedMessage<String, String>> messageList =
           new ArrayList<KeyedMessage<String, 
String>>(KafkaConstants.DEFAULT_BATCH_SIZE);
   private Producer producer;
+  private long rowsWritten = 0;
 
   @Override
   public void load(LoaderContext context,LinkConfiguration linkConfiguration, 
ToJobConfiguration jobConfiguration) throws
@@ -58,6 +59,7 @@ public class KafkaLoader extends 
Loader<LinkConfiguration,ToJobConfiguration> {
       if (messageList.size() >= KafkaConstants.DEFAULT_BATCH_SIZE) {
         sendToKafka(messageList);
       }
+      rowsWritten ++;
     }
 
     if (messageList.size() > 0) {
@@ -103,4 +105,12 @@ public class KafkaLoader extends 
Loader<LinkConfiguration,ToJobConfiguration> {
     
props.put(KafkaConstants.PRODUCER_TYPE,KafkaConstants.DEFAULT_PRODUCER_TYPE);
     return props;
   }
+
+  /* (non-Javadoc)
+   * @see org.apache.sqoop.job.etl.Loader#getRowsWritten()
+   */
+  @Override
+  public long getRowsWritten() {
+    return rowsWritten;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/7f53eb22/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
 
b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
index 709fd94..0a46f4a 100644
--- 
a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
+++ 
b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
@@ -34,6 +34,7 @@ public class KiteLoader extends Loader<LinkConfiguration, 
ToJobConfiguration> {
 
   private static final Logger LOG = Logger.getLogger(KiteLoader.class);
 
+  private long rowsWritten = 0;
   @VisibleForTesting
   protected KiteDatasetExecutor getExecutor(String uri, Schema schema,
       FileFormat format) {
@@ -57,14 +58,13 @@ public class KiteLoader extends Loader<LinkConfiguration, 
ToJobConfiguration> {
     DataReader reader = context.getDataReader();
     Object[] array;
     boolean success = false;
-    long count = 0L;
 
     try {
       while ((array = reader.readArrayRecord()) != null) {
         executor.writeRecord(array);
-        count++;
+        rowsWritten++;
       }
-      LOG.info(count + " data record(s) have been written into dataset.");
+      LOG.info(rowsWritten + " data record(s) have been written into 
dataset.");
       success = true;
     } finally {
       executor.closeWriter();
@@ -76,4 +76,12 @@ public class KiteLoader extends Loader<LinkConfiguration, 
ToJobConfiguration> {
     }
   }
 
+  /* (non-Javadoc)
+   * @see org.apache.sqoop.job.etl.Loader#getRowsWritten()
+   */
+  @Override
+  public long getRowsWritten() {
+    return rowsWritten;
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/7f53eb22/execution/mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/execution/mapreduce/pom.xml b/execution/mapreduce/pom.xml
index b23b905..ad7f489 100644
--- a/execution/mapreduce/pom.xml
+++ b/execution/mapreduce/pom.xml
@@ -39,7 +39,11 @@ limitations under the License.
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
-
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.sqoop</groupId>
       <artifactId>sqoop-core</artifactId>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/7f53eb22/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git 
a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
 
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index aaf771c..7835e38 100644
--- 
a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ 
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -42,7 +42,7 @@ import org.apache.sqoop.common.PrefixContext;
 import org.apache.sqoop.job.etl.Loader;
 import org.apache.sqoop.job.etl.LoaderContext;
 import org.apache.sqoop.etl.io.DataReader;
-import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.submission.counter.SqoopCounters;
 import org.apache.sqoop.job.io.SqoopWritable;
 import org.apache.sqoop.utils.ClassUtils;
 
@@ -60,20 +60,20 @@ public class SqoopOutputFormatLoadExecutor {
   private Future<?> consumerFuture;
   private Semaphore filled = new Semaphore(0, true);
   private Semaphore free = new Semaphore(1, true);
-  private volatile boolean isTest = false;
   private String loaderName;
 
   // NOTE: This method is only exposed for test cases
-  SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName, 
IntermediateDataFormat<?> idf) {
-    this.isTest = isTest;
+  SqoopOutputFormatLoadExecutor(JobContext jobctx, String loaderName, 
IntermediateDataFormat<?> toDataFormat, Matcher matcher) {
+    context = jobctx;
     this.loaderName = loaderName;
-    toDataFormat = idf;
+    this.matcher = matcher;
+    this.toDataFormat = toDataFormat;
     writer = new SqoopRecordWriter();
-    matcher = null;
   }
 
   public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
     context = jobctx;
+    loaderName = context.getConfiguration().get(MRJobConstants.JOB_ETL_LOADER);
     writer = new SqoopRecordWriter();
     matcher = MatcherFactory.getMatcher(
         MRConfigurationUtils.getConnectorSchema(Direction.FROM, 
context.getConfiguration()),
@@ -87,12 +87,12 @@ public class SqoopOutputFormatLoadExecutor {
   public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {
     consumerFuture = Executors.newSingleThreadExecutor(new 
ThreadFactoryBuilder().setNameFormat
         ("OutputFormatLoader-consumer").build()).submit(
-            new ConsumerThread());
+            new ConsumerThread(context));
     return writer;
   }
 
   /*
-   * This is a producer-consumer problem and can be solved
+   * This is a reader-writer problem and can be solved
    * with two semaphores.
    */
   private class SqoopRecordWriter extends RecordWriter<SqoopWritable, 
NullWritable> {
@@ -215,40 +215,43 @@ public class SqoopOutputFormatLoadExecutor {
 
   private class ConsumerThread implements Runnable {
 
+    /**
+     * Context class that we should use for reporting counters.
+     */
+    private final JobContext jobctx;
+
+    public ConsumerThread(final JobContext context) {
+      jobctx = context;
+    }
+
     @SuppressWarnings({ "rawtypes", "unchecked" })
     @Override
     public void run() {
       LOG.info("SqoopOutputFormatLoadExecutor consumer thread is starting");
       try {
         DataReader reader = new SqoopOutputFormatDataReader();
-
-        Configuration conf = null;
-        if (!isTest) {
-          conf = context.getConfiguration();
-          loaderName = conf.get(MRJobConstants.JOB_ETL_LOADER);
-        }
+        Configuration conf = context.getConfiguration();
         Loader loader = (Loader) ClassUtils.instantiate(loaderName);
 
-        // Objects that should be pass to the Executor execution
-        PrefixContext subContext = null;
-        Object connectorLinkConfig = null;
-        Object connectorToJobConfig = null;
-        Schema schema = null;
-
-        if (!isTest) {
-          // Using the TO schema since the SqoopDataWriter in the SqoopMapper 
encapsulates the toDataFormat
-          schema = matcher.getToSchema();
-          subContext = new PrefixContext(conf, 
MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT);
-          connectorLinkConfig = 
MRConfigurationUtils.getConnectorLinkConfig(Direction.TO, conf);
-          connectorToJobConfig = 
MRConfigurationUtils.getConnectorJobConfig(Direction.TO, conf);
-        }
+        // Objects that should be passed to the Loader
+        PrefixContext subContext = new PrefixContext(conf,
+            MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT);
+        Object connectorLinkConfig = MRConfigurationUtils
+            .getConnectorLinkConfig(Direction.TO, conf);
+        Object connectorToJobConfig = MRConfigurationUtils
+            .getConnectorJobConfig(Direction.TO, conf);
+        // Using the TO schema since the SqoopDataWriter in the SqoopMapper
+        // encapsulates the toDataFormat
 
         // Create loader context
-        LoaderContext loaderContext = new LoaderContext(subContext, reader, 
schema);
+        LoaderContext loaderContext = new LoaderContext(subContext, reader, 
matcher.getToSchema());
 
         LOG.info("Running loader class " + loaderName);
         loader.load(loaderContext, connectorLinkConfig, connectorToJobConfig);
         LOG.info("Loader has finished");
+        ((TaskAttemptContext) 
jobctx).getCounter(SqoopCounters.ROWS_WRITTEN).increment(
+            loader.getRowsWritten());
+
       } catch (Throwable t) {
         readerFinished = true;
         LOG.error("Error while loading data out of MR job.", t);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/7f53eb22/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git 
a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java 
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index 47696cc..cc0a3cc 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -251,6 +251,7 @@ public class TestMapReduce {
   public static class DummyLoader extends Loader<EmptyConfiguration, 
EmptyConfiguration> {
     private int index = START_PARTITION * NUMBER_OF_ROWS_PER_PARTITION;
     private IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
+    private long rowsWritten = 0;
 
     @Override
     public void load(LoaderContext context, EmptyConfiguration oc, 
EmptyConfiguration oj)
@@ -260,9 +261,18 @@ public class TestMapReduce {
         String testData = "" + index + "," +  (double) index + ",'" + 
String.valueOf(index) + "'";
         dataFormat.setCSVTextData(testData);
         index++;
+        rowsWritten ++;
         assertEquals(dataFormat.getCSVTextData().toString(), data);
       }
     }
+
+    /* (non-Javadoc)
+     * @see org.apache.sqoop.job.etl.Loader#getRowsWritten()
+     */
+    @Override
+    public long getRowsWritten() {
+      return rowsWritten;
+    }
   }
 
   public static class DummyFromDestroyer extends Destroyer<EmptyConfiguration, 
EmptyConfiguration> {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/7f53eb22/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git 
a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
 
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
index ec0e886..f5f627d 100644
--- 
a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
+++ 
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
@@ -18,50 +18,63 @@
  */
 package org.apache.sqoop.job.mr;
 
-import java.util.ConcurrentModificationException;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.TimeUnit;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.counters.GenericCounter;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
 import org.apache.sqoop.connector.idf.IntermediateDataFormat;
+import org.apache.sqoop.connector.matcher.Matcher;
+import org.apache.sqoop.connector.matcher.MatcherFactory;
 import org.apache.sqoop.job.MRJobConstants;
 import org.apache.sqoop.job.etl.Loader;
 import org.apache.sqoop.job.etl.LoaderContext;
 import org.apache.sqoop.job.io.SqoopWritable;
 import org.apache.sqoop.job.util.MRJobTestUtil;
+import org.apache.sqoop.schema.NullSchema;
+import org.apache.sqoop.submission.counter.SqoopCounters;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ConcurrentModificationException;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.TimeUnit;
+
 public class TestSqoopOutputFormatLoadExecutor {
 
   private Configuration conf;
+  private TaskAttemptContext jobContextMock;
 
-  public static class ThrowingLoader extends Loader {
-
-    public ThrowingLoader() {
-
-    }
+  public static class ThrowingLoader extends Loader<Object, Object> {
 
     @Override
     public void load(LoaderContext context, Object cc, Object jc) throws 
Exception {
       context.getDataReader().readTextRecord();
       throw new BrokenBarrierException();
     }
+
+    @Override
+    public long getRowsWritten() {
+      return 0;
+    }
   }
 
-  public static class ThrowingContinuousLoader extends Loader {
+  public static class ThrowingContinuousLoader extends Loader<Object, Object> {
 
+    private long rowsWritten = 0;
     public ThrowingContinuousLoader() {
     }
 
     @Override
     public void load(LoaderContext context, Object cc, Object jc) throws 
Exception {
-      int runCount = 0;
       Object o;
       String[] arr;
       while ((o = context.getDataReader().readTextRecord()) != null) {
@@ -70,20 +83,20 @@ public class TestSqoopOutputFormatLoadExecutor {
         for (int i = 0; i < arr.length; i++) {
           Assert.assertEquals(i, Integer.parseInt(arr[i]));
         }
-        runCount++;
-        if (runCount == 5) {
+        rowsWritten++;
+        if (rowsWritten == 5) {
           throw new ConcurrentModificationException();
         }
       }
     }
-  }
-
-  public static class GoodLoader extends Loader {
-
-    public GoodLoader() {
 
+    @Override
+    public long getRowsWritten() {
+      return rowsWritten;
     }
+  }
 
+  public static class GoodLoader extends Loader<Object, Object> {
     @Override
     public void load(LoaderContext context, Object cc, Object jc) throws 
Exception {
       String[] arr = 
context.getDataReader().readTextRecord().toString().split(",");
@@ -92,17 +105,20 @@ public class TestSqoopOutputFormatLoadExecutor {
         Assert.assertEquals(i, Integer.parseInt(arr[i]));
       }
     }
-  }
 
-  public static class GoodContinuousLoader extends Loader {
+    @Override
+    public long getRowsWritten() {
+      return 0;
+    }
+  }
 
-    public GoodContinuousLoader() {
+  public static class GoodContinuousLoader extends Loader<Object, Object> {
 
-    }
+    private long rowsWritten = 0;
 
     @Override
     public void load(LoaderContext context, Object cc, Object jc) throws 
Exception {
-      int runCount = 0;
+      int rowsWritten = 0;
       Object o;
       String[] arr;
       while ((o = context.getDataReader().readTextRecord()) != null) {
@@ -111,26 +127,47 @@ public class TestSqoopOutputFormatLoadExecutor {
         for (int i = 0; i < arr.length; i++) {
           Assert.assertEquals(i, Integer.parseInt(arr[i]));
         }
-        runCount++;
+        rowsWritten++;
       }
-      Assert.assertEquals(10, runCount);
+      Assert.assertEquals(10, rowsWritten);
+    }
+
+    @Override
+    public long getRowsWritten() {
+      return rowsWritten;
     }
   }
 
+  // TODO:SQOOP-1873: Mock objects instead
+  private Matcher getMatcher(){
+    return MatcherFactory.getMatcher(NullSchema.getInstance(),
+        NullSchema.getInstance());
+
+  }
+  // TODO:SQOOP-1873: Mock objects instead
+  private IntermediateDataFormat<?> getIDF(){
+    return new CSVIntermediateDataFormat();
+  }
 
   @Before
   public void setUp() {
     conf = new Configuration();
-    conf.setIfUnset(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, 
CSVIntermediateDataFormat.class.getName());
-
+    conf.setIfUnset(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT,
+        CSVIntermediateDataFormat.class.getName());
+    jobContextMock = mock(TaskAttemptContext.class);
+    GenericCounter counter = new GenericCounter("test", "test-me");
+    when(((TaskAttemptContext) 
jobContextMock).getCounter(SqoopCounters.ROWS_WRITTEN)).thenReturn(counter);
+    org.apache.hadoop.mapred.JobConf testConf = new 
org.apache.hadoop.mapred.JobConf();
+    when(jobContextMock.getConfiguration()).thenReturn(testConf);
   }
 
   @Test(expected = BrokenBarrierException.class)
   public void testWhenLoaderThrows() throws Throwable {
     conf.set(MRJobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName());
-    SqoopOutputFormatLoadExecutor executor = new
-        SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName(), 
new CSVIntermediateDataFormat());
-    RecordWriter<SqoopWritable, NullWritable> writer = 
executor.getRecordWriter();
+    SqoopOutputFormatLoadExecutor executor = new 
SqoopOutputFormatLoadExecutor(jobContextMock,
+        ThrowingLoader.class.getName(), getIDF(), getMatcher());
+    RecordWriter<SqoopWritable, NullWritable> writer = executor
+        .getRecordWriter();
     IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
     SqoopWritable writable = new SqoopWritable(dataFormat);
     try {
@@ -146,8 +183,9 @@ public class TestSqoopOutputFormatLoadExecutor {
   @Test
   public void testSuccessfulContinuousLoader() throws Throwable {
     conf.set(MRJobConstants.JOB_ETL_LOADER, 
GoodContinuousLoader.class.getName());
-    SqoopOutputFormatLoadExecutor executor = new
-        SqoopOutputFormatLoadExecutor(true, 
GoodContinuousLoader.class.getName(), new CSVIntermediateDataFormat());
+
+    SqoopOutputFormatLoadExecutor executor = new 
SqoopOutputFormatLoadExecutor(jobContextMock,
+        GoodContinuousLoader.class.getName(), getIDF(), getMatcher());
     RecordWriter<SqoopWritable, NullWritable> writer = 
executor.getRecordWriter();
     IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
     SqoopWritable writable = new SqoopWritable(dataFormat);
@@ -163,13 +201,16 @@ public class TestSqoopOutputFormatLoadExecutor {
       writer.write(writable, null);
     }
     writer.close(null);
+    verify(jobContextMock, times(1)).getConfiguration();
+    verify(jobContextMock, times(1)).getCounter(SqoopCounters.ROWS_WRITTEN);
   }
 
-  @Test (expected = SqoopException.class)
+  @Test(expected = SqoopException.class)
   public void testSuccessfulLoader() throws Throwable {
-    SqoopOutputFormatLoadExecutor executor = new
-        SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName(), new 
CSVIntermediateDataFormat());
-    RecordWriter<SqoopWritable, NullWritable> writer = 
executor.getRecordWriter();
+    SqoopOutputFormatLoadExecutor executor = new 
SqoopOutputFormatLoadExecutor(jobContextMock,
+        GoodLoader.class.getName(), getIDF(), getMatcher());
+    RecordWriter<SqoopWritable, NullWritable> writer = executor
+        .getRecordWriter();
     IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
     SqoopWritable writable = new SqoopWritable(dataFormat);
     StringBuilder builder = new StringBuilder();
@@ -182,18 +223,19 @@ public class TestSqoopOutputFormatLoadExecutor {
     dataFormat.setCSVTextData(builder.toString());
     writer.write(writable, null);
 
-    //Allow writer to complete.
+    // Allow writer to complete.
     TimeUnit.SECONDS.sleep(5);
     writer.close(null);
+    verify(jobContextMock, times(1)).getConfiguration();
+    verify(jobContextMock, times(1)).getCounter(SqoopCounters.ROWS_WRITTEN);
   }
 
-
   @Test(expected = ConcurrentModificationException.class)
   public void testThrowingContinuousLoader() throws Throwable {
     conf.set(MRJobConstants.JOB_ETL_LOADER, 
ThrowingContinuousLoader.class.getName());
-    SqoopOutputFormatLoadExecutor executor = new
-        SqoopOutputFormatLoadExecutor(true, 
ThrowingContinuousLoader.class.getName(), new CSVIntermediateDataFormat());
-    RecordWriter<SqoopWritable, NullWritable> writer = 
executor.getRecordWriter();
+    SqoopOutputFormatLoadExecutor executor = new 
SqoopOutputFormatLoadExecutor(jobContextMock,
+        ThrowingContinuousLoader.class.getName(), getIDF(), getMatcher());
+  RecordWriter<SqoopWritable, NullWritable> writer = 
executor.getRecordWriter();
     IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
     SqoopWritable writable = new SqoopWritable(dataFormat);
     try {
@@ -213,4 +255,5 @@ public class TestSqoopOutputFormatLoadExecutor {
       throw ex.getCause();
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/7f53eb22/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java 
b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
index 3b6bd71..e47b244 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
@@ -33,4 +33,16 @@ public abstract class Loader<LinkConfiguration, 
ToJobConfiguration> {
   public abstract void load(LoaderContext context, LinkConfiguration 
linkConfiguration,
       ToJobConfiguration jobConfiguration) throws Exception;
 
+  /**
+   * Return the number of rows witten by the last call to
+   * {@linkplain Loader#load(org.apache.sqoop.job.etl.LoaderContext, 
java.lang.Object) }
+   * method. This method returns only the number of rows written in the last 
call,
+   * and not a cumulative total of the number of rows written by this Loader
+   * since its creation.
+   *
+   * @return the number of rows written by the last call to
+   * {@linkplain Loader#load(org.apache.sqoop.job.etl.LoaderContext, 
java.lang.Object) }
+   */
+  public abstract long getRowsWritten();
+
 }

Reply via email to