Repository: sqoop Updated Branches: refs/heads/trunk 666700d33 -> 268299ee5
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268299ee/src/java/org/apache/sqoop/tool/SqoopTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/tool/SqoopTool.java b/src/java/org/apache/sqoop/tool/SqoopTool.java index dbe429a..5b8453d 100644 --- a/src/java/org/apache/sqoop/tool/SqoopTool.java +++ b/src/java/org/apache/sqoop/tool/SqoopTool.java @@ -84,6 +84,8 @@ public abstract class SqoopTool { "Import a table from a database to HDFS"); registerTool("import-all-tables", ImportAllTablesTool.class, "Import tables from a database to HDFS"); + registerTool("import-mainframe", MainframeImportTool.class, + "Import datasets from a mainframe server to HDFS"); registerTool("help", HelpTool.class, "List available commands"); registerTool("list-databases", ListDatabasesTool.class, "List available databases on a server"); http://git-wip-us.apache.org/repos/asf/sqoop/blob/268299ee/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java new file mode 100644 index 0000000..eae7a63 --- /dev/null +++ b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.net.PrintCommandListener; +import org.apache.commons.net.ftp.FTP; +import org.apache.commons.net.ftp.FTPClient; +import org.apache.commons.net.ftp.FTPClientConfig; +import org.apache.commons.net.ftp.FTPConnectionClosedException; +import org.apache.commons.net.ftp.FTPFile; +import org.apache.commons.net.ftp.FTPReply; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; + +import org.apache.sqoop.mapreduce.JobBase; +import org.apache.sqoop.mapreduce.db.DBConfiguration; + +/** + * Utility methods used when accessing a mainframe server through FTP client. + */ +public final class MainframeFTPClientUtils { + private static final Log LOG = LogFactory.getLog( + MainframeFTPClientUtils.class.getName()); + + private static FTPClient mockFTPClient = null; // Used for unit testing + + private MainframeFTPClientUtils() { + } + + public static List<String> listSequentialDatasets( + String pdsName, Configuration conf) throws IOException { + List<String> datasets = new ArrayList<String>(); + FTPClient ftp = null; + try { + ftp = getFTPConnection(conf); + if (ftp != null) { + ftp.changeWorkingDirectory("'" + pdsName + "'"); + FTPFile[] ftpFiles = ftp.listFiles(); + for (FTPFile f : ftpFiles) { + if (f.getType() == FTPFile.FILE_TYPE) { + datasets.add(f.getName()); + } + } + } + } catch(IOException ioe) { + throw new IOException ("Could not list datasets from " + pdsName + ":" + + ioe.toString()); + } finally { + if (ftp != null) { + closeFTPConnection(ftp); + } + } + return datasets; + } + + public static FTPClient getFTPConnection(Configuration conf) + throws IOException { + FTPClient ftp = null; + try { + String username = conf.get(DBConfiguration.USERNAME_PROPERTY); + String password; + if (username == null) { + username = "anonymous"; + password = ""; + } + else { + password = DBConfiguration.getPassword((JobConf) conf); + } + + String connectString = conf.get(DBConfiguration.URL_PROPERTY); + String server = connectString; + int port = 0; + String[] parts = connectString.split(":"); + if (parts.length == 2) { + server = parts[0]; + try { + port = Integer.parseInt(parts[1]); + } catch(NumberFormatException e) { + LOG.warn("Invalid port number: " + e.toString()); + } + } + + if (null != mockFTPClient) { + ftp = mockFTPClient; + } else { + ftp = new FTPClient(); + } + + FTPClientConfig config = new FTPClientConfig(FTPClientConfig.SYST_MVS); + ftp.configure(config); + + if (conf.getBoolean(JobBase.PROPERTY_VERBOSE, false)) { + ftp.addProtocolCommandListener(new PrintCommandListener( + new PrintWriter(System.out), true)); + } + try { + if (port > 0) { + ftp.connect(server, port); + } else { + ftp.connect(server); + } + } catch(IOException ioexp) { + throw new IOException("Could not connect to server " + server, ioexp); + } + + int reply = ftp.getReplyCode(); + if (!FTPReply.isPositiveCompletion(reply)) { + throw new IOException("FTP server " + server + + " refused connection:" + ftp.getReplyString()); + } + LOG.info("Connected to " + server + " on " + + (port>0 ? port : ftp.getDefaultPort())); + if (!ftp.login(username, password)) { + ftp.logout(); + throw new IOException("Could not login to server " + server + + ":" + ftp.getReplyString()); + } + // set ASCII transfer mode + ftp.setFileType(FTP.ASCII_FILE_TYPE); + // Use passive mode as default. + ftp.enterLocalPassiveMode(); + } catch(IOException ioe) { + if (ftp != null && ftp.isConnected()) { + try { + ftp.disconnect(); + } catch(IOException f) { + // do nothing + } + } + ftp = null; + throw ioe; + } + return ftp; + } + + public static boolean closeFTPConnection(FTPClient ftp) { + boolean success = true; + try { + ftp.noop(); // check that control connection is working OK + ftp.logout(); + } catch(FTPConnectionClosedException e) { + success = false; + LOG.warn("Server closed connection: " + e.toString()); + } catch(IOException e) { + success = false; + LOG.warn("Server closed connection: " + e.toString()); + } finally { + if (ftp.isConnected()) { + try { + ftp.disconnect(); + } catch(IOException f) { + success = false; + } + } + } + return success; + } + + // Used for testing only + public static void setMockFTPClient(FTPClient FTPClient) { + mockFTPClient = FTPClient; + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/268299ee/src/test/org/apache/sqoop/manager/TestMainframeManager.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/TestMainframeManager.java b/src/test/org/apache/sqoop/manager/TestMainframeManager.java new file mode 100644 index 0000000..79cbcb1 --- /dev/null +++ b/src/test/org/apache/sqoop/manager/TestMainframeManager.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.manager; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Types; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; +import org.apache.sqoop.accumulo.AccumuloUtil; +import org.apache.sqoop.hbase.HBaseUtil; +import org.apache.sqoop.tool.MainframeImportTool; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.cloudera.sqoop.ConnFactory; +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.manager.ConnManager; +import com.cloudera.sqoop.manager.ImportJobContext; +import com.cloudera.sqoop.metastore.JobData; +import com.cloudera.sqoop.testutil.BaseSqoopTestCase; +import com.cloudera.sqoop.util.ImportException; + +/** + * Test methods of the generic SqlManager implementation. + */ +public class TestMainframeManager extends BaseSqoopTestCase { + + private static final Log LOG = LogFactory.getLog(TestMainframeManager.class + .getName()); + + private ConnManager manager; + + private SqoopOptions opts; + + private ImportJobContext context; + + @Before + public void setUp() { + Configuration conf = getConf(); + opts = getSqoopOptions(conf); + opts.setConnectString("dummy.server"); + opts.setTableName("dummy.pds"); + opts.setConnManagerClassName("org.apache.sqoop.manager.MainframeManager"); + context = new ImportJobContext(getTableName(), null, opts, null); + ConnFactory f = new ConnFactory(conf); + try { + this.manager = f.getManager(new JobData(opts, new MainframeImportTool())); + } catch (IOException ioe) { + fail("IOException instantiating manager: " + + StringUtils.stringifyException(ioe)); + } + } + + @After + public void tearDown() { + try { + manager.close(); + } catch (SQLException sqlE) { + LOG.error("Got SQLException: " + sqlE.toString()); + fail("Got SQLException: " + sqlE.toString()); + } + } + + @Test + public void testListColNames() { + String[] colNames = manager.getColumnNames(getTableName()); + assertNotNull("manager should return a column list", colNames); + assertEquals("Column list should be length 1", 1, colNames.length); + assertEquals(MainframeManager.DEFAULT_DATASET_COLUMN_NAME, colNames[0]); + } + + @Test + public void testListColTypes() { + Map<String, Integer> types = manager.getColumnTypes(getTableName()); + assertNotNull("manager should return a column types map", types); + assertEquals("Column types map should be size 1", 1, types.size()); + assertEquals(types.get(MainframeManager.DEFAULT_DATASET_COLUMN_NAME) + .intValue(), Types.VARCHAR); + } + + @Test + public void testImportTableNoHBaseJarPresent() { + HBaseUtil.setAlwaysNoHBaseJarMode(true); + opts.setHBaseTable("dummy_table"); + try { + manager.importTable(context); + fail("An ImportException should be thrown: " + + "HBase jars are not present in classpath, cannot import to HBase!"); + } catch (ImportException e) { + assertEquals(e.toString(), + "HBase jars are not present in classpath, cannot import to HBase!"); + } catch (IOException e) { + fail("No IOException should be thrown!"); + } finally { + opts.setHBaseTable(null); + } + } + + @Test + public void testImportTableNoAccumuloJarPresent() { + AccumuloUtil.setAlwaysNoAccumuloJarMode(true); + opts.setAccumuloTable("dummy_table"); + try { + manager.importTable(context); + fail("An ImportException should be thrown: " + + "Accumulo jars are not present in classpath, cannot import to " + + "Accumulo!"); + } catch (ImportException e) { + assertEquals(e.toString(), + "Accumulo jars are not present in classpath, cannot import to " + + "Accumulo!"); + } catch (IOException e) { + fail("No IOException should be thrown!"); + } finally { + opts.setAccumuloTable(null); + } + } + + @Test + public void testListTables() { + String[] tables = manager.listTables(); + assertNull("manager should not return a list of tables", tables); + } + + @Test + public void testListDatabases() { + String[] databases = manager.listDatabases(); + assertNull("manager should not return a list of databases", databases); + } + + @Test + public void testGetPrimaryKey() { + String primaryKey = manager.getPrimaryKey(getTableName()); + assertNull("manager should not return a primary key", primaryKey); + } + + @Test + public void testReadTable() { + String[] colNames = manager.getColumnNames(getTableName()); + try { + ResultSet table = manager.readTable(getTableName(), colNames); + assertNull("manager should not read a table", table); + } catch (SQLException sqlE) { + fail("Got SQLException: " + sqlE.toString()); + } + } + + @Test + public void testGetConnection() { + try { + Connection con = manager.getConnection(); + assertNull("manager should not return a connection", con); + } catch (SQLException sqlE) { + fail("Got SQLException: " + sqlE.toString()); + } + } + + @Test + public void testGetDriverClass() { + String driverClass = manager.getDriverClass(); + assertNotNull("manager should return a driver class", driverClass); + assertEquals("manager should return an empty driver class", "", + driverClass); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/268299ee/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetFTPRecordReader.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetFTPRecordReader.java b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetFTPRecordReader.java new file mode 100644 index 0000000..613ee7a --- /dev/null +++ b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetFTPRecordReader.java @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.mainframe; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.commons.net.ftp.FTPClient; +import org.apache.commons.net.ftp.FTPFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.sqoop.lib.SqoopRecord; +import org.apache.sqoop.mapreduce.DBWritable; +import org.apache.sqoop.mapreduce.db.DBConfiguration; +import org.apache.sqoop.util.MainframeFTPClientUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.cloudera.sqoop.lib.DelimiterSet; +import com.cloudera.sqoop.lib.LargeObjectLoader; + +public class TestMainframeDatasetFTPRecordReader { + + private MainframeImportJob mfImportJob; + + private MainframeImportJob avroImportJob; + + private MainframeDatasetInputSplit mfDIS; + + private TaskAttemptContext context; + + private MainframeDatasetRecordReader mfDRR; + + private MainframeDatasetFTPRecordReader mfDFTPRR; + + private FTPClient mockFTPClient; + + public static class DummySqoopRecord extends SqoopRecord { + private String field; + + public Map<String, Object> getFieldMap() { + Map<String, Object> map = new HashMap<String, Object>(); + map.put("fieldName", field); + return map; + } + + public void setField(String fieldName, Object fieldVal) { + if (fieldVal instanceof String) { + field = (String) fieldVal; + } + } + + public void setField(final String val) { + this.field = val; + } + + @Override + public void readFields(DataInput in) throws IOException { + field = in.readUTF(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(field); + } + + @Override + public void readFields(ResultSet rs) throws SQLException { + field = rs.getString(1); + } + + @Override + public void write(PreparedStatement s) throws SQLException { + s.setString(1, field); + } + + @Override + public String toString() { + return field; + } + + @Override + public int write(PreparedStatement stmt, int offset) throws SQLException { + return 0; + } + + @Override + public String toString(DelimiterSet delimiters) { + return null; + } + + @Override + public int getClassFormatVersion() { + return 0; + } + + @Override + public int hashCode() { + return Integer.parseInt(field); + } + + public void loadLargeObjects(LargeObjectLoader loader) { + } + + public void parse(CharSequence s) { + } + + public void parse(Text s) { + } + + public void parse(byte[] s) { + } + + public void parse(char[] s) { + } + + public void parse(ByteBuffer s) { + } + + public void parse(CharBuffer s) { + } + + } + + @Before + public void setUp() throws IOException { + mockFTPClient = mock(FTPClient.class); + MainframeFTPClientUtils.setMockFTPClient(mockFTPClient); + try { + when(mockFTPClient.login("user", "pssword")).thenReturn(true); + when(mockFTPClient.logout()).thenReturn(true); + when(mockFTPClient.isConnected()).thenReturn(true); + when(mockFTPClient.completePendingCommand()).thenReturn(true); + when(mockFTPClient.changeWorkingDirectory(anyString())).thenReturn(true); + when(mockFTPClient.getReplyCode()).thenReturn(200); + when(mockFTPClient.noop()).thenReturn(200); + when(mockFTPClient.setFileType(anyInt())).thenReturn(true); + + FTPFile ftpFile1 = new FTPFile(); + ftpFile1.setType(FTPFile.FILE_TYPE); + ftpFile1.setName("test1"); + FTPFile ftpFile2 = new FTPFile(); + ftpFile2.setType(FTPFile.FILE_TYPE); + ftpFile2.setName("test2"); + FTPFile[] ftpFiles = { ftpFile1, ftpFile2 }; + when(mockFTPClient.listFiles()).thenReturn(ftpFiles); + + when(mockFTPClient.retrieveFileStream("test1")).thenReturn( + new ByteArrayInputStream("123\n456\n".getBytes())); + when(mockFTPClient.retrieveFileStream("test2")).thenReturn( + new ByteArrayInputStream("789\n".getBytes())); + when(mockFTPClient.retrieveFileStream("NotComplete")).thenReturn( + new ByteArrayInputStream("NotComplete\n".getBytes())); + } catch (IOException e) { + fail("No IOException should be thrown!"); + } + + JobConf conf = new JobConf(); + conf.set(DBConfiguration.URL_PROPERTY, "localhost:" + "11111"); + conf.set(DBConfiguration.USERNAME_PROPERTY, "user"); + conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword"); + // set the password in the secure credentials object + Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY); + conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, + "pssword".getBytes()); + conf.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, DummySqoopRecord.class, + DBWritable.class); + + Job job = Job.getInstance(conf); + mfDIS = new MainframeDatasetInputSplit(); + mfDIS.addDataset("test1"); + mfDIS.addDataset("test2"); + context = mock(TaskAttemptContext.class); + when(context.getConfiguration()).thenReturn(job.getConfiguration()); + mfDFTPRR = new MainframeDatasetFTPRecordReader(); + } + + @After + public void tearDown() { + try { + mfDFTPRR.close(); + } catch (IOException ioe) { + fail("Got IOException: " + ioe.toString()); + } + MainframeFTPClientUtils.setMockFTPClient(null); + } + + @Test + public void testReadAllData() { + try { + mfDFTPRR.initialize(mfDIS, context); + Assert.assertTrue("Retrieve of dataset", mfDFTPRR.nextKeyValue()); + Assert.assertEquals("Key should increase by records", 1, mfDFTPRR + .getCurrentKey().get()); + Assert.assertEquals("Read value by line and by dataset", "123", mfDFTPRR + .getCurrentValue().toString()); + Assert.assertEquals("Get progress according to left dataset", + mfDFTPRR.getProgress(), (float) 0.5, 0.02); + Assert.assertTrue("Retrieve of dataset", mfDFTPRR.nextKeyValue()); + Assert.assertEquals("Key should increase by records", 2, mfDFTPRR + .getCurrentKey().get()); + Assert.assertEquals("Read value by line and by dataset", "456", mfDFTPRR + .getCurrentValue().toString()); + Assert.assertEquals("Get progress according to left dataset", + mfDFTPRR.getProgress(), (float) 0.5, 0.02); + Assert.assertTrue("Retrieve of dataset", mfDFTPRR.nextKeyValue()); + Assert.assertEquals("Key should increase by records", 3, mfDFTPRR + .getCurrentKey().get()); + Assert.assertEquals("Read value by line and by dataset", "789", mfDFTPRR + .getCurrentValue().toString()); + Assert.assertEquals("Get progress according to left dataset", + mfDFTPRR.getProgress(), (float) 1, 0.02); + Assert.assertFalse("End of dataset", mfDFTPRR.nextKeyValue()); + } catch (IOException ioe) { + fail("Got IOException: " + ioe.toString()); + } catch (InterruptedException ie) { + fail("Got InterruptedException: " + ie.toString()); + } + } + + @Test + public void testReadPartOfData() { + try { + mfDFTPRR.initialize(mfDIS, context); + Assert.assertTrue("Retrieve of dataset", mfDFTPRR.nextKeyValue()); + Assert.assertEquals("Key should increase by records", 1, mfDFTPRR + .getCurrentKey().get()); + Assert.assertEquals("Read value by line and by dataset", "123", mfDFTPRR + .getCurrentValue().toString()); + Assert.assertEquals("Get progress according to left dataset", + mfDFTPRR.getProgress(), (float) 0.5, 0.02); + } catch (IOException ioe) { + fail("Got IOException: " + ioe.toString()); + } catch (InterruptedException ie) { + fail("Got InterruptedException: " + ie.toString()); + } + } + + @Test + public void testFTPNotComplete() { + try { + mfDIS = new MainframeDatasetInputSplit(); + mfDIS.addDataset("NotComplete"); + mfDFTPRR.initialize(mfDIS, context); + Assert.assertTrue("Retrieve of dataset", mfDFTPRR.nextKeyValue()); + when(mockFTPClient.completePendingCommand()).thenReturn(false); + mfDFTPRR.nextKeyValue(); + } catch (IOException ioe) { + Assert.assertEquals( + "java.io.IOException: IOException during data transfer: " + + "java.io.IOException: Failed to complete ftp command.", + ioe.toString()); + } catch (InterruptedException ie) { + fail("Got InterruptedException: " + ie.toString()); + } + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/268299ee/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetInputFormat.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetInputFormat.java b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetInputFormat.java new file mode 100644 index 0000000..70958e0 --- /dev/null +++ b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetInputFormat.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.mainframe; + +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.net.ftp.FTPClient; +import org.apache.commons.net.ftp.FTPFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.sqoop.lib.SqoopRecord; +import org.apache.sqoop.mapreduce.db.DBConfiguration; +import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration; +import org.apache.sqoop.util.MainframeFTPClientUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestMainframeDatasetInputFormat { + + private MainframeDatasetInputFormat<SqoopRecord> format; + + private FTPClient mockFTPClient; + + @Before + public void setUp() { + format = new MainframeDatasetInputFormat<SqoopRecord>(); + mockFTPClient = mock(FTPClient.class); + MainframeFTPClientUtils.setMockFTPClient(mockFTPClient); + try { + when(mockFTPClient.login("user", "pssword")).thenReturn(true); + when(mockFTPClient.logout()).thenReturn(true); + when(mockFTPClient.isConnected()).thenReturn(true); + when(mockFTPClient.completePendingCommand()).thenReturn(true); + when(mockFTPClient.changeWorkingDirectory(anyString())).thenReturn(true); + when(mockFTPClient.getReplyCode()).thenReturn(200); + when(mockFTPClient.getReplyString()).thenReturn(""); + when(mockFTPClient.noop()).thenReturn(200); + when(mockFTPClient.setFileType(anyInt())).thenReturn(true); + + FTPFile ftpFile1 = new FTPFile(); + ftpFile1.setType(FTPFile.FILE_TYPE); + ftpFile1.setName("test1"); + FTPFile ftpFile2 = new FTPFile(); + ftpFile2.setType(FTPFile.FILE_TYPE); + ftpFile2.setName("test2"); + FTPFile[] ftpFiles = { ftpFile1, ftpFile2 }; + when(mockFTPClient.listFiles()).thenReturn(ftpFiles); + } catch (IOException e) { + fail("No IOException should be thrown!"); + } + } + + @After + public void tearDown() { + MainframeFTPClientUtils.setMockFTPClient(null); + } + + @Test + public void testRetrieveDatasets() throws IOException { + JobConf conf = new JobConf(); + conf.set(DBConfiguration.URL_PROPERTY, "localhost:12345"); + conf.set(DBConfiguration.USERNAME_PROPERTY, "user"); + conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword"); + // set the password in the secure credentials object + Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY); + conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, + "pssword".getBytes()); + + String dsName = "dsName1"; + conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME, dsName); + Job job = Job.getInstance(conf); + format.getSplits(job); + + List<InputSplit> splits = new ArrayList<InputSplit>(); + splits = ((MainframeDatasetInputFormat<SqoopRecord>) format).getSplits(job); + Assert.assertEquals("test1", ((MainframeDatasetInputSplit) splits.get(0)) + .getNextDataset().toString()); + Assert.assertEquals("test2", ((MainframeDatasetInputSplit) splits.get(1)) + .getNextDataset().toString()); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/268299ee/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetInputSplit.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetInputSplit.java b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetInputSplit.java new file mode 100644 index 0000000..5d92f6d --- /dev/null +++ b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetInputSplit.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.mainframe; + +import java.io.IOException; + +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestMainframeDatasetInputSplit { + + private MainframeDatasetInputSplit mfDatasetInputSplit; + + @Before + public void setUp() { + mfDatasetInputSplit = new MainframeDatasetInputSplit(); + } + + @Test + public void testGetCurrentDataset() { + String currentDataset = mfDatasetInputSplit.getCurrentDataset(); + Assert.assertNull(currentDataset); + } + + @Test + public void testGetNextDatasetWithNull() { + String currentDataset = mfDatasetInputSplit.getNextDataset(); + Assert.assertNull(currentDataset); + } + + @Test + public void testGetNextDataset() { + String mainframeDataset = "test"; + mfDatasetInputSplit.addDataset(mainframeDataset); + String currentDataset = mfDatasetInputSplit.getNextDataset(); + Assert.assertEquals("test", currentDataset); + } + + @Test + public void testHasMoreWithFalse() { + boolean retVal = mfDatasetInputSplit.hasMore(); + Assert.assertFalse(retVal); + } + + @Test + public void testHasMoreWithTrue() { + String mainframeDataset = "test"; + mfDatasetInputSplit.addDataset(mainframeDataset); + boolean retVal = mfDatasetInputSplit.hasMore(); + Assert.assertTrue(retVal); + } + + @Test + public void testGetLength() { + String mainframeDataset = "test"; + mfDatasetInputSplit.addDataset(mainframeDataset); + try { + long retVal = mfDatasetInputSplit.getLength(); + Assert.assertEquals(1, retVal); + } catch (IOException ioe) { + Assert.fail("No IOException should be thrown!"); + } catch (InterruptedException ie) { + Assert.fail("No InterruptedException should be thrown!"); + } + } + + @Test + public void testGetLocations() { + try { + String[] retVal = mfDatasetInputSplit.getLocations(); + Assert.assertNotNull(retVal); + } catch (IOException ioe) { + Assert.fail("No IOException should be thrown!"); + } catch (InterruptedException ie) { + Assert.fail("No InterruptedException should be thrown!"); + } + } + + @Test + public void testWriteRead() { + mfDatasetInputSplit.addDataset("dataSet1"); + mfDatasetInputSplit.addDataset("dataSet2"); + DataOutputBuffer dob = new DataOutputBuffer(); + DataInputBuffer dib = new DataInputBuffer(); + MainframeDatasetInputSplit mfReader = new MainframeDatasetInputSplit(); + try { + mfDatasetInputSplit.write(dob); + dib.reset(dob.getData(), dob.getLength()); + mfReader.readFields(dib); + Assert.assertNotNull("MFReader get data from tester", mfReader); + Assert.assertEquals(2, mfReader.getLength()); + Assert.assertEquals("dataSet1", mfReader.getNextDataset()); + Assert.assertEquals("dataSet2", mfReader.getNextDataset()); + } catch (IOException ioe) { + Assert.fail("No IOException should be thrown!"); + } catch (InterruptedException ie) { + Assert.fail("No InterruptedException should be thrown!"); + } + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/268299ee/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java new file mode 100644 index 0000000..ecaa8d5 --- /dev/null +++ b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.mainframe; + +import static org.junit.Assert.assertEquals; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Mapper; +import org.junit.Before; +import org.junit.Test; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.manager.ImportJobContext; + +public class TestMainframeImportJob { + + private MainframeImportJob mfImportJob; + + private MainframeImportJob avroImportJob; + + private SqoopOptions options; + + @Before + public void setUp() { + options = new SqoopOptions(); + } + + @Test + public void testGetMainframeDatasetImportMapperClass() + throws SecurityException, NoSuchMethodException, + IllegalArgumentException, IllegalAccessException, + InvocationTargetException { + String jarFile = "dummyJarFile"; + String tableName = "dummyTableName"; + Path path = new Path("dummyPath"); + ImportJobContext context = new ImportJobContext(tableName, jarFile, + options, path); + mfImportJob = new MainframeImportJob(options, context); + + // To access protected method by means of reflection + Class[] types = {}; + Method m_getMapperClass = MainframeImportJob.class.getDeclaredMethod( + "getMapperClass", types); + m_getMapperClass.setAccessible(true); + Class<? extends Mapper> mapper = (Class<? extends Mapper>) m_getMapperClass + .invoke(mfImportJob); + assertEquals(mapper, + org.apache.sqoop.mapreduce.mainframe.MainframeDatasetImportMapper.class); + } + + @Test + public void testSuperMapperClass() throws SecurityException, + NoSuchMethodException, IllegalArgumentException, IllegalAccessException, + InvocationTargetException { + String jarFile = "dummyJarFile"; + String tableName = "dummyTableName"; + Path path = new Path("dummyPath"); + options.setFileLayout(SqoopOptions.FileLayout.AvroDataFile); + ImportJobContext context = new ImportJobContext(tableName, jarFile, + options, path); + avroImportJob = new MainframeImportJob(options, context); + + // To access protected method by means of reflection + Class[] types = {}; + Method m_getMapperClass = MainframeImportJob.class.getDeclaredMethod( + "getMapperClass", types); + m_getMapperClass.setAccessible(true); + Class<? extends Mapper> mapper = (Class<? extends Mapper>) m_getMapperClass + .invoke(avroImportJob); + assertEquals(mapper, org.apache.sqoop.mapreduce.AvroImportMapper.class); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/268299ee/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java b/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java new file mode 100644 index 0000000..936afd3 --- /dev/null +++ b/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.tool; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import org.apache.commons.cli.ParseException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.sqoop.cli.RelatedOptions; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException; +import com.cloudera.sqoop.cli.ToolOptions; +import com.cloudera.sqoop.testutil.BaseSqoopTestCase; + +public class TestMainframeImportTool extends BaseSqoopTestCase { + + private static final Log LOG = LogFactory.getLog(TestMainframeImportTool.class + .getName()); + + private MainframeImportTool mfImportTool; + + private final ByteArrayOutputStream outContent = new ByteArrayOutputStream(); + + @Before + public void setUp() { + + mfImportTool = new MainframeImportTool(); + System.setOut(new PrintStream(outContent)); + } + + @After + public void tearDown() { + System.setOut(null); + } + + @Test + public void testPrintHelp() { + ToolOptions toolOptions = new ToolOptions(); + String separator = System.getProperty("line.separator"); + mfImportTool.printHelp(toolOptions); + String outputMsg = "usage: sqoop " + + mfImportTool.getToolName() + + " [GENERIC-ARGS] [TOOL-ARGS]" + + separator + + "" + + separator + + "" + + separator + + "Generic Hadoop command-line arguments:" + + separator + + "(must preceed any tool-specific arguments)" + + separator + + "Generic options supported are" + + separator + + "-conf <configuration file> specify an application configuration file" + + separator + + "-D <property=value> use value for given property" + + separator + + "-fs <local|namenode:port> specify a namenode" + + separator + + "-jt <local|jobtracker:port> specify a job tracker" + + separator + + "-files <comma separated list of files> " + + "specify comma separated files to be copied to the map reduce cluster" + + separator + + "-libjars <comma separated list of jars> " + + "specify comma separated jar files to include in the classpath." + + separator + + "-archives <comma separated list of archives> " + + "specify comma separated archives to be unarchived on the compute machines.\n" + + separator + "The general command line syntax is" + separator + + "bin/hadoop command [genericOptions] [commandOptions]\n" + separator + + "" + separator + "At minimum, you must specify --connect and --" + + MainframeImportTool.DS_ARG + separator; + assertEquals(outputMsg, outContent.toString()); + } + + @SuppressWarnings("deprecation") + @Test + public void testGetImportOptions() throws SecurityException, + NoSuchMethodException, IllegalArgumentException, IllegalAccessException, + InvocationTargetException { + // To access protected method by means of reflection + Class[] types = {}; + Object[] params = {}; + Method m_getImportOptions = MainframeImportTool.class.getDeclaredMethod( + "getImportOptions", types); + m_getImportOptions.setAccessible(true); + RelatedOptions rOptions = (RelatedOptions) m_getImportOptions.invoke( + mfImportTool, params); + assertNotNull("It should return a RelatedOptions", rOptions); + assertTrue(rOptions.hasOption(MainframeImportTool.DS_ARG)); + assertTrue(rOptions.hasOption(MainframeImportTool.DELETE_ARG)); + assertTrue(rOptions.hasOption(MainframeImportTool.TARGET_DIR_ARG)); + assertTrue(rOptions.hasOption(MainframeImportTool.WAREHOUSE_DIR_ARG)); + assertTrue(rOptions.hasOption(MainframeImportTool.FMT_TEXTFILE_ARG)); + assertTrue(rOptions.hasOption(MainframeImportTool.NUM_MAPPERS_ARG)); + assertTrue(rOptions.hasOption(MainframeImportTool.MAPREDUCE_JOB_NAME)); + assertTrue(rOptions.hasOption(MainframeImportTool.COMPRESS_ARG)); + assertTrue(rOptions.hasOption(MainframeImportTool.COMPRESSION_CODEC_ARG)); + } + + @Test + public void testApplyOptions() + throws InvalidOptionsException, ParseException { + String[] args = { "--" + MainframeImportTool.DS_ARG, "dummy_ds" }; + ToolOptions toolOptions = new ToolOptions(); + SqoopOptions sqoopOption = new SqoopOptions(); + mfImportTool.configureOptions(toolOptions); + sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false); + assertEquals(sqoopOption.getConnManagerClassName(), + "org.apache.sqoop.manager.MainframeManager"); + assertEquals(sqoopOption.getTableName(), "dummy_ds"); + } + + @Test + public void testNotApplyOptions() throws ParseException, + InvalidOptionsException { + String[] args = new String[] { "--connection-manager=dummy_ClassName" }; + ToolOptions toolOptions = new ToolOptions(); + SqoopOptions sqoopOption = new SqoopOptions(); + mfImportTool.configureOptions(toolOptions); + sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false); + assertEquals(sqoopOption.getConnManagerClassName(), "dummy_ClassName"); + assertNull(sqoopOption.getTableName()); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/268299ee/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java new file mode 100644 index 0000000..6b89502 --- /dev/null +++ b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; + +import org.apache.commons.net.ftp.FTPClient; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.sqoop.mapreduce.JobBase; +import org.apache.sqoop.mapreduce.db.DBConfiguration; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestMainframeFTPClientUtils { + + private JobConf conf; + + private FTPClient mockFTPClient; + + @Before + public void setUp() { + conf = new JobConf(); + mockFTPClient = mock(FTPClient.class); + when(mockFTPClient.getReplyString()).thenReturn(""); + MainframeFTPClientUtils.setMockFTPClient(mockFTPClient); + } + + @After + public void tearDown() { + MainframeFTPClientUtils.setMockFTPClient(null); + } + + @Test + public void testAnonymous_VERBOSE_IllegelPort() { + try { + when(mockFTPClient.login("anonymous", "")).thenReturn(true); + when(mockFTPClient.logout()).thenReturn(true); + when(mockFTPClient.isConnected()).thenReturn(false); + when(mockFTPClient.getReplyCode()).thenReturn(200); + } catch (IOException e) { + fail("No IOException should be thrown!"); + } + + conf.set(DBConfiguration.URL_PROPERTY, "localhost:testPort"); + conf.setBoolean(JobBase.PROPERTY_VERBOSE, true); + + FTPClient ftp = null; + boolean success = false; + try { + ftp = MainframeFTPClientUtils.getFTPConnection(conf); + } catch (IOException ioe) { + fail("No IOException should be thrown!"); + } finally { + success = MainframeFTPClientUtils.closeFTPConnection(ftp); + } + Assert.assertTrue(success); + } + + @Test + public void testCannotConnect() { + try { + when(mockFTPClient.login("testUser", "")).thenReturn(false); + } catch (IOException ioe) { + fail("No IOException should be thrown!"); + } + + conf.set(DBConfiguration.URL_PROPERTY, "testUser:11111"); + try { + MainframeFTPClientUtils.getFTPConnection(conf); + } catch (IOException ioe) { + Assert.assertEquals( + "java.io.IOException: FTP server testUser refused connection:", + ioe.toString()); + } + } + + @Test + public void testWrongUsername() { + try { + when(mockFTPClient.login("user", "pssword")).thenReturn(true); + when(mockFTPClient.logout()).thenReturn(true); + when(mockFTPClient.isConnected()).thenReturn(false); + when(mockFTPClient.getReplyCode()).thenReturn(200); + } catch (IOException e) { + fail("No IOException should be thrown!"); + } + + FTPClient ftp = null; + conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111"); + conf.set(DBConfiguration.USERNAME_PROPERTY, "userr"); + conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword"); + // set the password in the secure credentials object + Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY); + conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, + "pssword".getBytes()); + + try { + ftp = MainframeFTPClientUtils.getFTPConnection(conf); + } catch (IOException ioe) { + Assert.assertEquals( + "java.io.IOException: Could not login to server localhost:", + ioe.toString()); + } + Assert.assertNull(ftp); + } + + @Test + public void testNotListDatasets() { + try { + when(mockFTPClient.login("user", "pssword")).thenReturn(true); + when(mockFTPClient.logout()).thenReturn(true); + when(mockFTPClient.isConnected()).thenReturn(false); + when(mockFTPClient.getReplyCode()).thenReturn(200); + } catch (IOException e) { + fail("No IOException should be thrown!"); + } + + conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111"); + conf.set(DBConfiguration.USERNAME_PROPERTY, "userr"); + conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword"); + // set the password in the secure credentials object + Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY); + conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, + "pssword".getBytes()); + + try { + MainframeFTPClientUtils.listSequentialDatasets("pdsName", conf); + } catch (IOException ioe) { + Assert.assertEquals("java.io.IOException: " + + "Could not list datasets from pdsName:" + + "java.io.IOException: Could not login to server localhost:", + ioe.toString()); + } + } +}
