Repository: sqoop
Updated Branches:
  refs/heads/trunk 666700d33 -> 268299ee5


http://git-wip-us.apache.org/repos/asf/sqoop/blob/268299ee/src/java/org/apache/sqoop/tool/SqoopTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/SqoopTool.java 
b/src/java/org/apache/sqoop/tool/SqoopTool.java
index dbe429a..5b8453d 100644
--- a/src/java/org/apache/sqoop/tool/SqoopTool.java
+++ b/src/java/org/apache/sqoop/tool/SqoopTool.java
@@ -84,6 +84,8 @@ public abstract class SqoopTool {
         "Import a table from a database to HDFS");
     registerTool("import-all-tables", ImportAllTablesTool.class,
         "Import tables from a database to HDFS");
+    registerTool("import-mainframe", MainframeImportTool.class,
+            "Import datasets from a mainframe server to HDFS");
     registerTool("help", HelpTool.class, "List available commands");
     registerTool("list-databases", ListDatabasesTool.class,
         "List available databases on a server");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268299ee/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java 
b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
new file mode 100644
index 0000000..eae7a63
--- /dev/null
+++ b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.util;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.net.PrintCommandListener;
+import org.apache.commons.net.ftp.FTP;
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.commons.net.ftp.FTPClientConfig;
+import org.apache.commons.net.ftp.FTPConnectionClosedException;
+import org.apache.commons.net.ftp.FTPFile;
+import org.apache.commons.net.ftp.FTPReply;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+
+import org.apache.sqoop.mapreduce.JobBase;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+
+/**
+ * Utility methods used when accessing a mainframe server through FTP client.
+ */
+public final class MainframeFTPClientUtils {
+  private static final Log LOG = LogFactory.getLog(
+      MainframeFTPClientUtils.class.getName());
+
+  private static FTPClient mockFTPClient = null; // Used for unit testing
+
+  private MainframeFTPClientUtils() {
+  }
+
+  public static List<String> listSequentialDatasets(
+      String pdsName, Configuration conf) throws IOException {
+    List<String> datasets = new ArrayList<String>();
+    FTPClient ftp = null;
+    try {
+      ftp = getFTPConnection(conf);
+      if (ftp != null) {
+        ftp.changeWorkingDirectory("'" + pdsName + "'");
+        FTPFile[] ftpFiles = ftp.listFiles();
+        for (FTPFile f : ftpFiles) {
+          if (f.getType() == FTPFile.FILE_TYPE) {
+            datasets.add(f.getName());
+          }
+        }
+      }
+    } catch(IOException ioe) {
+      throw new IOException ("Could not list datasets from " + pdsName + ":"
+          + ioe.toString());
+    } finally {
+      if (ftp != null) {
+        closeFTPConnection(ftp);
+      }
+    }
+    return datasets;
+  }
+
+  public static FTPClient getFTPConnection(Configuration conf)
+      throws IOException {
+    FTPClient ftp = null;
+    try {
+      String username = conf.get(DBConfiguration.USERNAME_PROPERTY);
+      String password;
+      if (username == null) {
+        username = "anonymous";
+        password = "";
+      }
+      else {
+        password = DBConfiguration.getPassword((JobConf) conf);
+      }
+
+      String connectString = conf.get(DBConfiguration.URL_PROPERTY);
+      String server = connectString;
+      int port = 0;
+      String[] parts = connectString.split(":");
+      if (parts.length == 2) {
+        server = parts[0];
+        try {
+          port = Integer.parseInt(parts[1]);
+        } catch(NumberFormatException e) {
+          LOG.warn("Invalid port number: " + e.toString());
+        }
+      }
+
+      if (null != mockFTPClient) {
+        ftp = mockFTPClient;
+      } else {
+        ftp = new FTPClient();
+      }
+
+      FTPClientConfig config = new FTPClientConfig(FTPClientConfig.SYST_MVS);
+      ftp.configure(config);
+
+      if (conf.getBoolean(JobBase.PROPERTY_VERBOSE, false)) {
+        ftp.addProtocolCommandListener(new PrintCommandListener(
+            new PrintWriter(System.out), true));
+      }
+      try {
+        if (port > 0) {
+          ftp.connect(server, port);
+        } else {
+          ftp.connect(server);
+        }
+      } catch(IOException ioexp) {
+        throw new IOException("Could not connect to server " + server, ioexp);
+      }
+
+      int reply = ftp.getReplyCode();
+      if (!FTPReply.isPositiveCompletion(reply)) {
+        throw new IOException("FTP server " + server
+            + " refused connection:" + ftp.getReplyString());
+      }
+      LOG.info("Connected to " + server + " on " +
+          (port>0 ? port : ftp.getDefaultPort()));
+      if (!ftp.login(username, password)) {
+        ftp.logout();
+        throw new IOException("Could not login to server " + server
+            + ":" + ftp.getReplyString());
+      }
+      // set ASCII transfer mode
+      ftp.setFileType(FTP.ASCII_FILE_TYPE);
+      // Use passive mode as default.
+      ftp.enterLocalPassiveMode();
+    } catch(IOException ioe) {
+      if (ftp != null && ftp.isConnected()) {
+        try {
+          ftp.disconnect();
+        } catch(IOException f) {
+          // do nothing
+        }
+      }
+      ftp = null;
+      throw ioe;
+    }
+    return ftp;
+  }
+
+  public static boolean closeFTPConnection(FTPClient ftp) {
+    boolean success = true;
+    try {
+      ftp.noop(); // check that control connection is working OK
+      ftp.logout();
+    } catch(FTPConnectionClosedException e) {
+      success = false;
+      LOG.warn("Server closed connection: " + e.toString());
+    } catch(IOException e) {
+      success = false;
+      LOG.warn("Server closed connection: " + e.toString());
+    } finally {
+      if (ftp.isConnected()) {
+        try {
+          ftp.disconnect();
+        } catch(IOException f) {
+          success = false;
+        }
+      }
+    }
+    return success;
+  }
+
+  // Used for testing only
+  public static void setMockFTPClient(FTPClient FTPClient) {
+    mockFTPClient = FTPClient;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268299ee/src/test/org/apache/sqoop/manager/TestMainframeManager.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/manager/TestMainframeManager.java 
b/src/test/org/apache/sqoop/manager/TestMainframeManager.java
new file mode 100644
index 0000000..79cbcb1
--- /dev/null
+++ b/src/test/org/apache/sqoop/manager/TestMainframeManager.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.sqoop.accumulo.AccumuloUtil;
+import org.apache.sqoop.hbase.HBaseUtil;
+import org.apache.sqoop.tool.MainframeImportTool;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.cloudera.sqoop.ConnFactory;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ImportJobContext;
+import com.cloudera.sqoop.metastore.JobData;
+import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
+import com.cloudera.sqoop.util.ImportException;
+
+/**
+ * Test methods of the generic SqlManager implementation.
+ */
+public class TestMainframeManager extends BaseSqoopTestCase {
+
+  private static final Log LOG = LogFactory.getLog(TestMainframeManager.class
+      .getName());
+
+  private ConnManager manager;
+
+  private SqoopOptions opts;
+
+  private ImportJobContext context;
+
+  @Before
+  public void setUp() {
+    Configuration conf = getConf();
+    opts = getSqoopOptions(conf);
+    opts.setConnectString("dummy.server");
+    opts.setTableName("dummy.pds");
+    opts.setConnManagerClassName("org.apache.sqoop.manager.MainframeManager");
+    context = new ImportJobContext(getTableName(), null, opts, null);
+    ConnFactory f = new ConnFactory(conf);
+    try {
+      this.manager = f.getManager(new JobData(opts, new 
MainframeImportTool()));
+    } catch (IOException ioe) {
+      fail("IOException instantiating manager: "
+          + StringUtils.stringifyException(ioe));
+    }
+  }
+
+  @After
+  public void tearDown() {
+    try {
+      manager.close();
+    } catch (SQLException sqlE) {
+      LOG.error("Got SQLException: " + sqlE.toString());
+      fail("Got SQLException: " + sqlE.toString());
+    }
+  }
+
+  @Test
+  public void testListColNames() {
+    String[] colNames = manager.getColumnNames(getTableName());
+    assertNotNull("manager should return a column list", colNames);
+    assertEquals("Column list should be length 1", 1, colNames.length);
+    assertEquals(MainframeManager.DEFAULT_DATASET_COLUMN_NAME, colNames[0]);
+  }
+
+  @Test
+  public void testListColTypes() {
+    Map<String, Integer> types = manager.getColumnTypes(getTableName());
+    assertNotNull("manager should return a column types map", types);
+    assertEquals("Column types map should be size 1", 1, types.size());
+    assertEquals(types.get(MainframeManager.DEFAULT_DATASET_COLUMN_NAME)
+        .intValue(), Types.VARCHAR);
+  }
+
+  @Test
+  public void testImportTableNoHBaseJarPresent() {
+    HBaseUtil.setAlwaysNoHBaseJarMode(true);
+    opts.setHBaseTable("dummy_table");
+    try {
+      manager.importTable(context);
+      fail("An ImportException should be thrown: "
+          + "HBase jars are not present in classpath, cannot import to 
HBase!");
+    } catch (ImportException e) {
+      assertEquals(e.toString(),
+          "HBase jars are not present in classpath, cannot import to HBase!");
+    } catch (IOException e) {
+      fail("No IOException should be thrown!");
+    } finally {
+      opts.setHBaseTable(null);
+    }
+  }
+
+  @Test
+  public void testImportTableNoAccumuloJarPresent() {
+    AccumuloUtil.setAlwaysNoAccumuloJarMode(true);
+    opts.setAccumuloTable("dummy_table");
+    try {
+      manager.importTable(context);
+      fail("An ImportException should be thrown: "
+          + "Accumulo jars are not present in classpath, cannot import to "
+          + "Accumulo!");
+    } catch (ImportException e) {
+      assertEquals(e.toString(),
+          "Accumulo jars are not present in classpath, cannot import to "
+          + "Accumulo!");
+    } catch (IOException e) {
+      fail("No IOException should be thrown!");
+    } finally {
+      opts.setAccumuloTable(null);
+    }
+  }
+
+  @Test
+  public void testListTables() {
+    String[] tables = manager.listTables();
+    assertNull("manager should not return a list of tables", tables);
+  }
+
+  @Test
+  public void testListDatabases() {
+    String[] databases = manager.listDatabases();
+    assertNull("manager should not return a list of databases", databases);
+  }
+
+  @Test
+  public void testGetPrimaryKey() {
+    String primaryKey = manager.getPrimaryKey(getTableName());
+    assertNull("manager should not return a primary key", primaryKey);
+  }
+
+  @Test
+  public void testReadTable() {
+    String[] colNames = manager.getColumnNames(getTableName());
+    try {
+      ResultSet table = manager.readTable(getTableName(), colNames);
+      assertNull("manager should not read a table", table);
+    } catch (SQLException sqlE) {
+      fail("Got SQLException: " + sqlE.toString());
+    }
+  }
+
+  @Test
+  public void testGetConnection() {
+    try {
+      Connection con = manager.getConnection();
+      assertNull("manager should not return a connection", con);
+    } catch (SQLException sqlE) {
+      fail("Got SQLException: " + sqlE.toString());
+    }
+  }
+
+  @Test
+  public void testGetDriverClass() {
+    String driverClass = manager.getDriverClass();
+    assertNotNull("manager should return a driver class", driverClass);
+    assertEquals("manager should return an empty driver class", "",
+        driverClass);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268299ee/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetFTPRecordReader.java
----------------------------------------------------------------------
diff --git 
a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetFTPRecordReader.java
 
b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetFTPRecordReader.java
new file mode 100644
index 0000000..613ee7a
--- /dev/null
+++ 
b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetFTPRecordReader.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.mainframe;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.commons.net.ftp.FTPFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.mapreduce.DBWritable;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.apache.sqoop.util.MainframeFTPClientUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.cloudera.sqoop.lib.DelimiterSet;
+import com.cloudera.sqoop.lib.LargeObjectLoader;
+
+public class TestMainframeDatasetFTPRecordReader {
+
+  private MainframeImportJob mfImportJob;
+
+  private MainframeImportJob avroImportJob;
+
+  private MainframeDatasetInputSplit mfDIS;
+
+  private TaskAttemptContext context;
+
+  private MainframeDatasetRecordReader mfDRR;
+
+  private MainframeDatasetFTPRecordReader mfDFTPRR;
+
+  private FTPClient mockFTPClient;
+
+  public static class DummySqoopRecord extends SqoopRecord {
+    private String field;
+
+    public Map<String, Object> getFieldMap() {
+      Map<String, Object> map = new HashMap<String, Object>();
+      map.put("fieldName", field);
+      return map;
+    }
+
+    public void setField(String fieldName, Object fieldVal) {
+      if (fieldVal instanceof String) {
+        field = (String) fieldVal;
+      }
+    }
+
+    public void setField(final String val) {
+      this.field = val;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      field = in.readUTF();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeUTF(field);
+    }
+
+    @Override
+    public void readFields(ResultSet rs) throws SQLException {
+      field = rs.getString(1);
+    }
+
+    @Override
+    public void write(PreparedStatement s) throws SQLException {
+      s.setString(1, field);
+    }
+
+    @Override
+    public String toString() {
+      return field;
+    }
+
+    @Override
+    public int write(PreparedStatement stmt, int offset) throws SQLException {
+      return 0;
+    }
+
+    @Override
+    public String toString(DelimiterSet delimiters) {
+      return null;
+    }
+
+    @Override
+    public int getClassFormatVersion() {
+      return 0;
+    }
+
+    @Override
+    public int hashCode() {
+      return Integer.parseInt(field);
+    }
+
+    public void loadLargeObjects(LargeObjectLoader loader) {
+    }
+
+    public void parse(CharSequence s) {
+    }
+
+    public void parse(Text s) {
+    }
+
+    public void parse(byte[] s) {
+    }
+
+    public void parse(char[] s) {
+    }
+
+    public void parse(ByteBuffer s) {
+    }
+
+    public void parse(CharBuffer s) {
+    }
+
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    mockFTPClient = mock(FTPClient.class);
+    MainframeFTPClientUtils.setMockFTPClient(mockFTPClient);
+    try {
+      when(mockFTPClient.login("user", "pssword")).thenReturn(true);
+      when(mockFTPClient.logout()).thenReturn(true);
+      when(mockFTPClient.isConnected()).thenReturn(true);
+      when(mockFTPClient.completePendingCommand()).thenReturn(true);
+      when(mockFTPClient.changeWorkingDirectory(anyString())).thenReturn(true);
+      when(mockFTPClient.getReplyCode()).thenReturn(200);
+      when(mockFTPClient.noop()).thenReturn(200);
+      when(mockFTPClient.setFileType(anyInt())).thenReturn(true);
+
+      FTPFile ftpFile1 = new FTPFile();
+      ftpFile1.setType(FTPFile.FILE_TYPE);
+      ftpFile1.setName("test1");
+      FTPFile ftpFile2 = new FTPFile();
+      ftpFile2.setType(FTPFile.FILE_TYPE);
+      ftpFile2.setName("test2");
+      FTPFile[] ftpFiles = { ftpFile1, ftpFile2 };
+      when(mockFTPClient.listFiles()).thenReturn(ftpFiles);
+
+      when(mockFTPClient.retrieveFileStream("test1")).thenReturn(
+          new ByteArrayInputStream("123\n456\n".getBytes()));
+      when(mockFTPClient.retrieveFileStream("test2")).thenReturn(
+          new ByteArrayInputStream("789\n".getBytes()));
+      when(mockFTPClient.retrieveFileStream("NotComplete")).thenReturn(
+          new ByteArrayInputStream("NotComplete\n".getBytes()));
+    } catch (IOException e) {
+      fail("No IOException should be thrown!");
+    }
+
+    JobConf conf = new JobConf();
+    conf.set(DBConfiguration.URL_PROPERTY, "localhost:" + "11111");
+    conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
+    conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
+    // set the password in the secure credentials object
+    Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
+    conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
+        "pssword".getBytes());
+    conf.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, DummySqoopRecord.class,
+        DBWritable.class);
+
+    Job job = Job.getInstance(conf);
+    mfDIS = new MainframeDatasetInputSplit();
+    mfDIS.addDataset("test1");
+    mfDIS.addDataset("test2");
+    context = mock(TaskAttemptContext.class);
+    when(context.getConfiguration()).thenReturn(job.getConfiguration());
+    mfDFTPRR = new MainframeDatasetFTPRecordReader();
+  }
+
+  @After
+  public void tearDown() {
+    try {
+      mfDFTPRR.close();
+    } catch (IOException ioe) {
+      fail("Got IOException: " + ioe.toString());
+    }
+    MainframeFTPClientUtils.setMockFTPClient(null);
+  }
+
+  @Test
+  public void testReadAllData() {
+    try {
+      mfDFTPRR.initialize(mfDIS, context);
+      Assert.assertTrue("Retrieve of dataset", mfDFTPRR.nextKeyValue());
+      Assert.assertEquals("Key should increase by records", 1, mfDFTPRR
+          .getCurrentKey().get());
+      Assert.assertEquals("Read value by line and by dataset", "123", mfDFTPRR
+          .getCurrentValue().toString());
+      Assert.assertEquals("Get progress according to left dataset",
+          mfDFTPRR.getProgress(), (float) 0.5, 0.02);
+      Assert.assertTrue("Retrieve of dataset", mfDFTPRR.nextKeyValue());
+      Assert.assertEquals("Key should increase by records", 2, mfDFTPRR
+          .getCurrentKey().get());
+      Assert.assertEquals("Read value by line and by dataset", "456", mfDFTPRR
+          .getCurrentValue().toString());
+      Assert.assertEquals("Get progress according to left dataset",
+          mfDFTPRR.getProgress(), (float) 0.5, 0.02);
+      Assert.assertTrue("Retrieve of dataset", mfDFTPRR.nextKeyValue());
+      Assert.assertEquals("Key should increase by records", 3, mfDFTPRR
+          .getCurrentKey().get());
+      Assert.assertEquals("Read value by line and by dataset", "789", mfDFTPRR
+          .getCurrentValue().toString());
+      Assert.assertEquals("Get progress according to left dataset",
+          mfDFTPRR.getProgress(), (float) 1, 0.02);
+      Assert.assertFalse("End of dataset", mfDFTPRR.nextKeyValue());
+    } catch (IOException ioe) {
+      fail("Got IOException: " + ioe.toString());
+    } catch (InterruptedException ie) {
+      fail("Got InterruptedException: " + ie.toString());
+    }
+  }
+
+  @Test
+  public void testReadPartOfData() {
+    try {
+      mfDFTPRR.initialize(mfDIS, context);
+      Assert.assertTrue("Retrieve of dataset", mfDFTPRR.nextKeyValue());
+      Assert.assertEquals("Key should increase by records", 1, mfDFTPRR
+          .getCurrentKey().get());
+      Assert.assertEquals("Read value by line and by dataset", "123", mfDFTPRR
+          .getCurrentValue().toString());
+      Assert.assertEquals("Get progress according to left dataset",
+          mfDFTPRR.getProgress(), (float) 0.5, 0.02);
+    } catch (IOException ioe) {
+      fail("Got IOException: " + ioe.toString());
+    } catch (InterruptedException ie) {
+      fail("Got InterruptedException: " + ie.toString());
+    }
+  }
+
+  @Test
+  public void testFTPNotComplete() {
+    try {
+      mfDIS = new MainframeDatasetInputSplit();
+      mfDIS.addDataset("NotComplete");
+      mfDFTPRR.initialize(mfDIS, context);
+      Assert.assertTrue("Retrieve of dataset", mfDFTPRR.nextKeyValue());
+      when(mockFTPClient.completePendingCommand()).thenReturn(false);
+      mfDFTPRR.nextKeyValue();
+    } catch (IOException ioe) {
+      Assert.assertEquals(
+          "java.io.IOException: IOException during data transfer: "
+              + "java.io.IOException: Failed to complete ftp command.",
+          ioe.toString());
+    } catch (InterruptedException ie) {
+      fail("Got InterruptedException: " + ie.toString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268299ee/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetInputFormat.java
----------------------------------------------------------------------
diff --git 
a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetInputFormat.java
 
b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetInputFormat.java
new file mode 100644
index 0000000..70958e0
--- /dev/null
+++ 
b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetInputFormat.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.mainframe;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.commons.net.ftp.FTPFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration;
+import org.apache.sqoop.util.MainframeFTPClientUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMainframeDatasetInputFormat {
+
+  private MainframeDatasetInputFormat<SqoopRecord> format;
+
+  private FTPClient mockFTPClient;
+
+  @Before
+  public void setUp() {
+    format = new MainframeDatasetInputFormat<SqoopRecord>();
+    mockFTPClient = mock(FTPClient.class);
+    MainframeFTPClientUtils.setMockFTPClient(mockFTPClient);
+    try {
+      when(mockFTPClient.login("user", "pssword")).thenReturn(true);
+      when(mockFTPClient.logout()).thenReturn(true);
+      when(mockFTPClient.isConnected()).thenReturn(true);
+      when(mockFTPClient.completePendingCommand()).thenReturn(true);
+      when(mockFTPClient.changeWorkingDirectory(anyString())).thenReturn(true);
+      when(mockFTPClient.getReplyCode()).thenReturn(200);
+      when(mockFTPClient.getReplyString()).thenReturn("");
+      when(mockFTPClient.noop()).thenReturn(200);
+      when(mockFTPClient.setFileType(anyInt())).thenReturn(true);
+
+      FTPFile ftpFile1 = new FTPFile();
+      ftpFile1.setType(FTPFile.FILE_TYPE);
+      ftpFile1.setName("test1");
+      FTPFile ftpFile2 = new FTPFile();
+      ftpFile2.setType(FTPFile.FILE_TYPE);
+      ftpFile2.setName("test2");
+      FTPFile[] ftpFiles = { ftpFile1, ftpFile2 };
+      when(mockFTPClient.listFiles()).thenReturn(ftpFiles);
+    } catch (IOException e) {
+      fail("No IOException should be thrown!");
+    }
+  }
+
+  @After
+  public void tearDown() {
+    MainframeFTPClientUtils.setMockFTPClient(null);
+  }
+
+  @Test
+  public void testRetrieveDatasets() throws IOException {
+    JobConf conf = new JobConf();
+    conf.set(DBConfiguration.URL_PROPERTY, "localhost:12345");
+    conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
+    conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
+    // set the password in the secure credentials object
+    Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
+    conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
+        "pssword".getBytes());
+
+    String dsName = "dsName1";
+    conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME, dsName);
+    Job job = Job.getInstance(conf);
+    format.getSplits(job);
+
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    splits = ((MainframeDatasetInputFormat<SqoopRecord>) 
format).getSplits(job);
+    Assert.assertEquals("test1", ((MainframeDatasetInputSplit) splits.get(0))
+        .getNextDataset().toString());
+    Assert.assertEquals("test2", ((MainframeDatasetInputSplit) splits.get(1))
+        .getNextDataset().toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268299ee/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetInputSplit.java
----------------------------------------------------------------------
diff --git 
a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetInputSplit.java
 
b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetInputSplit.java
new file mode 100644
index 0000000..5d92f6d
--- /dev/null
+++ 
b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetInputSplit.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.mainframe;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMainframeDatasetInputSplit {
+
+  private MainframeDatasetInputSplit mfDatasetInputSplit;
+
+  @Before
+  public void setUp() {
+    mfDatasetInputSplit = new MainframeDatasetInputSplit();
+  }
+
+  @Test
+  public void testGetCurrentDataset() {
+    String currentDataset = mfDatasetInputSplit.getCurrentDataset();
+    Assert.assertNull(currentDataset);
+  }
+
+  @Test
+  public void testGetNextDatasetWithNull() {
+    String currentDataset = mfDatasetInputSplit.getNextDataset();
+    Assert.assertNull(currentDataset);
+  }
+
+  @Test
+  public void testGetNextDataset() {
+    String mainframeDataset = "test";
+    mfDatasetInputSplit.addDataset(mainframeDataset);
+    String currentDataset = mfDatasetInputSplit.getNextDataset();
+    Assert.assertEquals("test", currentDataset);
+  }
+
+  @Test
+  public void testHasMoreWithFalse() {
+    boolean retVal = mfDatasetInputSplit.hasMore();
+    Assert.assertFalse(retVal);
+  }
+
+  @Test
+  public void testHasMoreWithTrue() {
+    String mainframeDataset = "test";
+    mfDatasetInputSplit.addDataset(mainframeDataset);
+    boolean retVal = mfDatasetInputSplit.hasMore();
+    Assert.assertTrue(retVal);
+  }
+
+  @Test
+  public void testGetLength() {
+    String mainframeDataset = "test";
+    mfDatasetInputSplit.addDataset(mainframeDataset);
+    try {
+      long retVal = mfDatasetInputSplit.getLength();
+      Assert.assertEquals(1, retVal);
+    } catch (IOException ioe) {
+      Assert.fail("No IOException should be thrown!");
+    } catch (InterruptedException ie) {
+      Assert.fail("No InterruptedException should be thrown!");
+    }
+  }
+
+  @Test
+  public void testGetLocations() {
+    try {
+      String[] retVal = mfDatasetInputSplit.getLocations();
+      Assert.assertNotNull(retVal);
+    } catch (IOException ioe) {
+      Assert.fail("No IOException should be thrown!");
+    } catch (InterruptedException ie) {
+      Assert.fail("No InterruptedException should be thrown!");
+    }
+  }
+
+  @Test
+  public void testWriteRead() {
+    mfDatasetInputSplit.addDataset("dataSet1");
+    mfDatasetInputSplit.addDataset("dataSet2");
+    DataOutputBuffer dob = new DataOutputBuffer();
+    DataInputBuffer dib = new DataInputBuffer();
+    MainframeDatasetInputSplit mfReader = new MainframeDatasetInputSplit();
+    try {
+      mfDatasetInputSplit.write(dob);
+      dib.reset(dob.getData(), dob.getLength());
+      mfReader.readFields(dib);
+      Assert.assertNotNull("MFReader get data from tester", mfReader);
+      Assert.assertEquals(2, mfReader.getLength());
+      Assert.assertEquals("dataSet1", mfReader.getNextDataset());
+      Assert.assertEquals("dataSet2", mfReader.getNextDataset());
+    } catch (IOException ioe) {
+      Assert.fail("No IOException should be thrown!");
+    } catch (InterruptedException ie) {
+      Assert.fail("No InterruptedException should be thrown!");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268299ee/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java
----------------------------------------------------------------------
diff --git 
a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java 
b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java
new file mode 100644
index 0000000..ecaa8d5
--- /dev/null
+++ b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.mainframe;
+
+import static org.junit.Assert.assertEquals;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.manager.ImportJobContext;
+
+public class TestMainframeImportJob {
+
+  private MainframeImportJob mfImportJob;
+
+  private MainframeImportJob avroImportJob;
+
+  private SqoopOptions options;
+
+  @Before
+  public void setUp() {
+    options = new SqoopOptions();
+  }
+
+  @Test
+  public void testGetMainframeDatasetImportMapperClass()
+      throws SecurityException, NoSuchMethodException,
+      IllegalArgumentException, IllegalAccessException,
+      InvocationTargetException {
+    String jarFile = "dummyJarFile";
+    String tableName = "dummyTableName";
+    Path path = new Path("dummyPath");
+    ImportJobContext context = new ImportJobContext(tableName, jarFile,
+        options, path);
+    mfImportJob = new MainframeImportJob(options, context);
+
+    // To access protected method by means of reflection
+    Class[] types = {};
+    Method m_getMapperClass = MainframeImportJob.class.getDeclaredMethod(
+        "getMapperClass", types);
+    m_getMapperClass.setAccessible(true);
+    Class<? extends Mapper> mapper = (Class<? extends Mapper>) m_getMapperClass
+        .invoke(mfImportJob);
+    assertEquals(mapper,
+       
org.apache.sqoop.mapreduce.mainframe.MainframeDatasetImportMapper.class);
+  }
+
+  @Test
+  public void testSuperMapperClass() throws SecurityException,
+      NoSuchMethodException, IllegalArgumentException, IllegalAccessException,
+      InvocationTargetException {
+    String jarFile = "dummyJarFile";
+    String tableName = "dummyTableName";
+    Path path = new Path("dummyPath");
+    options.setFileLayout(SqoopOptions.FileLayout.AvroDataFile);
+    ImportJobContext context = new ImportJobContext(tableName, jarFile,
+        options, path);
+    avroImportJob = new MainframeImportJob(options, context);
+
+    // To access protected method by means of reflection
+    Class[] types = {};
+    Method m_getMapperClass = MainframeImportJob.class.getDeclaredMethod(
+        "getMapperClass", types);
+    m_getMapperClass.setAccessible(true);
+    Class<? extends Mapper> mapper = (Class<? extends Mapper>) m_getMapperClass
+        .invoke(avroImportJob);
+    assertEquals(mapper, org.apache.sqoop.mapreduce.AvroImportMapper.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268299ee/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java 
b/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java
new file mode 100644
index 0000000..936afd3
--- /dev/null
+++ b/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.tool;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sqoop.cli.RelatedOptions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
+import com.cloudera.sqoop.cli.ToolOptions;
+import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
+
+public class TestMainframeImportTool extends BaseSqoopTestCase {
+
+  private static final Log LOG = 
LogFactory.getLog(TestMainframeImportTool.class
+      .getName());
+
+  private MainframeImportTool mfImportTool;
+
+  private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
+
+  @Before
+  public void setUp() {
+
+    mfImportTool = new MainframeImportTool();
+    System.setOut(new PrintStream(outContent));
+  }
+
+  @After
+  public void tearDown() {
+    System.setOut(null);
+  }
+
+  @Test
+  public void testPrintHelp() {
+    ToolOptions toolOptions = new ToolOptions();
+    String separator = System.getProperty("line.separator");
+    mfImportTool.printHelp(toolOptions);
+    String outputMsg = "usage: sqoop "
+        + mfImportTool.getToolName()
+        + " [GENERIC-ARGS] [TOOL-ARGS]"
+        + separator
+        + ""
+        + separator
+        + ""
+        + separator
+        + "Generic Hadoop command-line arguments:"
+        + separator
+        + "(must preceed any tool-specific arguments)"
+        + separator
+        + "Generic options supported are"
+        + separator
+        + "-conf <configuration file>     specify an application configuration 
file"
+        + separator
+        + "-D <property=value>            use value for given property"
+        + separator
+        + "-fs <local|namenode:port>      specify a namenode"
+        + separator
+        + "-jt <local|jobtracker:port>    specify a job tracker"
+        + separator
+        + "-files <comma separated list of files>    "
+        + "specify comma separated files to be copied to the map reduce 
cluster"
+        + separator
+        + "-libjars <comma separated list of jars>    "
+        + "specify comma separated jar files to include in the classpath."
+        + separator
+        + "-archives <comma separated list of archives>    "
+        + "specify comma separated archives to be unarchived on the compute 
machines.\n"
+        + separator + "The general command line syntax is" + separator
+        + "bin/hadoop command [genericOptions] [commandOptions]\n" + separator
+        + "" + separator + "At minimum, you must specify --connect and --"
+        + MainframeImportTool.DS_ARG + separator;
+    assertEquals(outputMsg, outContent.toString());
+  }
+
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testGetImportOptions() throws SecurityException,
+      NoSuchMethodException, IllegalArgumentException, IllegalAccessException,
+      InvocationTargetException {
+    // To access protected method by means of reflection
+    Class[] types = {};
+    Object[] params = {};
+    Method m_getImportOptions = MainframeImportTool.class.getDeclaredMethod(
+        "getImportOptions", types);
+    m_getImportOptions.setAccessible(true);
+    RelatedOptions rOptions = (RelatedOptions) m_getImportOptions.invoke(
+        mfImportTool, params);
+    assertNotNull("It should return a RelatedOptions", rOptions);
+    assertTrue(rOptions.hasOption(MainframeImportTool.DS_ARG));
+    assertTrue(rOptions.hasOption(MainframeImportTool.DELETE_ARG));
+    assertTrue(rOptions.hasOption(MainframeImportTool.TARGET_DIR_ARG));
+    assertTrue(rOptions.hasOption(MainframeImportTool.WAREHOUSE_DIR_ARG));
+    assertTrue(rOptions.hasOption(MainframeImportTool.FMT_TEXTFILE_ARG));
+    assertTrue(rOptions.hasOption(MainframeImportTool.NUM_MAPPERS_ARG));
+    assertTrue(rOptions.hasOption(MainframeImportTool.MAPREDUCE_JOB_NAME));
+    assertTrue(rOptions.hasOption(MainframeImportTool.COMPRESS_ARG));
+    assertTrue(rOptions.hasOption(MainframeImportTool.COMPRESSION_CODEC_ARG));
+  }
+
+  @Test
+  public void testApplyOptions()
+      throws InvalidOptionsException, ParseException {
+    String[] args = { "--" + MainframeImportTool.DS_ARG, "dummy_ds" };
+    ToolOptions toolOptions = new ToolOptions();
+    SqoopOptions sqoopOption = new SqoopOptions();
+    mfImportTool.configureOptions(toolOptions);
+    sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
+    assertEquals(sqoopOption.getConnManagerClassName(),
+        "org.apache.sqoop.manager.MainframeManager");
+    assertEquals(sqoopOption.getTableName(), "dummy_ds");
+  }
+
+  @Test
+  public void testNotApplyOptions() throws ParseException,
+      InvalidOptionsException {
+    String[] args = new String[] { "--connection-manager=dummy_ClassName" };
+    ToolOptions toolOptions = new ToolOptions();
+    SqoopOptions sqoopOption = new SqoopOptions();
+    mfImportTool.configureOptions(toolOptions);
+    sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
+    assertEquals(sqoopOption.getConnManagerClassName(), "dummy_ClassName");
+    assertNull(sqoopOption.getTableName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268299ee/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java 
b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java
new file mode 100644
index 0000000..6b89502
--- /dev/null
+++ b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.util;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.sqoop.mapreduce.JobBase;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMainframeFTPClientUtils {
+
+  private JobConf conf;
+
+  private FTPClient mockFTPClient;
+
+  @Before
+  public void setUp() {
+    conf = new JobConf();
+    mockFTPClient = mock(FTPClient.class);
+    when(mockFTPClient.getReplyString()).thenReturn("");
+    MainframeFTPClientUtils.setMockFTPClient(mockFTPClient);
+  }
+
+  @After
+  public void tearDown() {
+    MainframeFTPClientUtils.setMockFTPClient(null);
+  }
+
+  @Test
+  public void testAnonymous_VERBOSE_IllegelPort() {
+    try {
+      when(mockFTPClient.login("anonymous", "")).thenReturn(true);
+      when(mockFTPClient.logout()).thenReturn(true);
+      when(mockFTPClient.isConnected()).thenReturn(false);
+      when(mockFTPClient.getReplyCode()).thenReturn(200);
+    } catch (IOException e) {
+      fail("No IOException should be thrown!");
+    }
+
+    conf.set(DBConfiguration.URL_PROPERTY, "localhost:testPort");
+    conf.setBoolean(JobBase.PROPERTY_VERBOSE, true);
+
+    FTPClient ftp = null;
+    boolean success = false;
+    try {
+      ftp = MainframeFTPClientUtils.getFTPConnection(conf);
+    } catch (IOException ioe) {
+      fail("No IOException should be thrown!");
+    } finally {
+      success = MainframeFTPClientUtils.closeFTPConnection(ftp);
+    }
+    Assert.assertTrue(success);
+  }
+
+  @Test
+  public void testCannotConnect() {
+    try {
+      when(mockFTPClient.login("testUser", "")).thenReturn(false);
+    } catch (IOException ioe) {
+      fail("No IOException should be thrown!");
+    }
+
+    conf.set(DBConfiguration.URL_PROPERTY, "testUser:11111");
+    try {
+      MainframeFTPClientUtils.getFTPConnection(conf);
+    } catch (IOException ioe) {
+      Assert.assertEquals(
+          "java.io.IOException: FTP server testUser refused connection:",
+          ioe.toString());
+    }
+  }
+
+  @Test
+  public void testWrongUsername() {
+    try {
+      when(mockFTPClient.login("user", "pssword")).thenReturn(true);
+      when(mockFTPClient.logout()).thenReturn(true);
+      when(mockFTPClient.isConnected()).thenReturn(false);
+      when(mockFTPClient.getReplyCode()).thenReturn(200);
+    } catch (IOException e) {
+      fail("No IOException should be thrown!");
+    }
+
+    FTPClient ftp = null;
+    conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
+    conf.set(DBConfiguration.USERNAME_PROPERTY, "userr");
+    conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
+    // set the password in the secure credentials object
+    Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
+    conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
+        "pssword".getBytes());
+
+    try {
+      ftp = MainframeFTPClientUtils.getFTPConnection(conf);
+    } catch (IOException ioe) {
+      Assert.assertEquals(
+          "java.io.IOException: Could not login to server localhost:",
+          ioe.toString());
+    }
+    Assert.assertNull(ftp);
+  }
+
+  @Test
+  public void testNotListDatasets() {
+    try {
+      when(mockFTPClient.login("user", "pssword")).thenReturn(true);
+      when(mockFTPClient.logout()).thenReturn(true);
+      when(mockFTPClient.isConnected()).thenReturn(false);
+      when(mockFTPClient.getReplyCode()).thenReturn(200);
+    } catch (IOException e) {
+      fail("No IOException should be thrown!");
+    }
+
+    conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
+    conf.set(DBConfiguration.USERNAME_PROPERTY, "userr");
+    conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
+    // set the password in the secure credentials object
+    Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
+    conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
+        "pssword".getBytes());
+
+    try {
+      MainframeFTPClientUtils.listSequentialDatasets("pdsName", conf);
+    } catch (IOException ioe) {
+      Assert.assertEquals("java.io.IOException: "
+          + "Could not list datasets from pdsName:"
+          + "java.io.IOException: Could not login to server localhost:",
+          ioe.toString());
+    }
+  }
+}

Reply via email to