Repository: sqoop Updated Branches: refs/heads/trunk 973629912 -> 18212bece
SQOOP-3326: Mainframe FTP listing for GDG should filter out non-GDG datasets in a heterogeneous listing (Chris Teoh via Szabolcs Vasas) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/18212bec Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/18212bec Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/18212bec Branch: refs/heads/trunk Commit: 18212becec800336533697c5795a06bcd38e6242 Parents: 9736299 Author: Szabolcs Vasas <[email protected]> Authored: Wed Oct 10 15:40:26 2018 +0200 Committer: Szabolcs Vasas <[email protected]> Committed: Wed Oct 10 15:40:26 2018 +0200 ---------------------------------------------------------------------- build.xml | 7 +- .../mainframe/MainframeConfiguration.java | 2 +- .../MainframeFTPFileGdgEntryParser.java | 86 +++++++++++++++++++ .../sqoop/util/MainframeFTPClientUtils.java | 10 ++- .../sqoop-thirdpartytest-db-services.yml | 2 +- .../mainframe/MainframeManagerImportTest.java | 7 ++ .../manager/mainframe/MainframeTestUtil.java | 9 ++ .../TestMainframeFTPFileGdgEntryParser.java | 88 ++++++++++++++++++++ .../sqoop/util/TestMainframeFTPClientUtils.java | 2 +- 9 files changed, 207 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/18212bec/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index cd2e9e2..f397531 100644 --- a/build.xml +++ b/build.xml @@ -262,7 +262,9 @@ <property name="sqoop.test.mainframe.ftp.binary.dataset.seq" value="TSODIQ1.FOLDER.FOLDERTXT" /> <property name="sqoop.test.mainframe.ftp.binary.dataset.seq.filename" value="FOLDERTXT" /> <property name="sqoop.test.mainframe.ftp.binary.dataset.seq.md5" value="1591c0fcc718fda7e9c1f3561d232b2b" /> - + <property name="sqoop.test.mainframe.ftp.binary.dataset.mixed" value="TSODIQ1.MIXED" /> + <property name="sqoop.test.mainframe.ftp.binary.dataset.mixed.filename" value="G0039V00" /> + <property name="sqoop.test.mainframe.ftp.binary.dataset.mixed.md5" value="5e7f4ec7cbeae8e0e0b4d88346eb9349" /> <property name="s3.bucket.url" value="" /> <property name="s3.generator.command" value="" /> @@ -912,6 +914,9 @@ <sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.seq" value="${sqoop.test.mainframe.ftp.binary.dataset.seq}" /> <sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.seq.filename" value="${sqoop.test.mainframe.ftp.binary.dataset.seq.filename}" /> <sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.seq.md5" value="${sqoop.test.mainframe.ftp.binary.dataset.seq.md5}" /> + <sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.mixed" value="${sqoop.test.mainframe.ftp.binary.dataset.mixed}" /> + <sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.mixed.filename" value="${sqoop.test.mainframe.ftp.binary.dataset.mixed.filename}" /> + <sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.mixed.md5" value="${sqoop.test.mainframe.ftp.binary.dataset.mixed.md5}" /> <!-- Location of Hive logs --> <!--<sysproperty key="hive.log.dir" value="${test.build.data}/sqoop/logs"/> --> http://git-wip-us.apache.org/repos/asf/sqoop/blob/18212bec/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java index 9d6a2fe..9842daa 100644 --- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java @@ -31,7 +31,7 @@ public class MainframeConfiguration public static final String MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED = "p"; public static final String MAINFRAME_INPUT_DATASET_TAPE = "mainframe.input.dataset.tape"; - + public static final String MAINFRAME_FTP_FILE_GDG_ENTRY_PARSER_CLASSNAME = MainframeFTPFileGdgEntryParser.class.getName(); public static final String MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME = "org.apache.sqoop.mapreduce.mainframe.MainframeFTPFileEntryParser"; public static final String MAINFRAME_FTP_TRANSFER_MODE = "mainframe.ftp.transfermode"; http://git-wip-us.apache.org/repos/asf/sqoop/blob/18212bec/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeFTPFileGdgEntryParser.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeFTPFileGdgEntryParser.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeFTPFileGdgEntryParser.java new file mode 100644 index 0000000..8467afa --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeFTPFileGdgEntryParser.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.mainframe; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.net.ftp.FTPClientConfig; +import org.apache.commons.net.ftp.FTPFile; +import org.apache.commons.net.ftp.parser.ConfigurableFTPFileEntryParserImpl; + +public class MainframeFTPFileGdgEntryParser extends ConfigurableFTPFileEntryParserImpl { +/* Sample FTP listing +Volume Unit Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname +H19761 Tape G0034V00 +H81751 Tape G0035V00 +H73545 Tape G0036V00 +G10987 Tape G0037V00 +SHT331 3390 **NONE** 1 15 VB 114 27998 PS DUMMY +SHT337 3390 **NONE** 1 15 VB 114 27998 PS G0035V00.COPY +SHT33A 3390 **NONE** 1 15 VB 114 27998 PS HELLO + +* And what we need to get back from parsing are the following entries:- +H19761 Tape G0034V00 +H81751 Tape G0035V00 +H73545 Tape G0036V00 +G10987 Tape G0037V00 +*/ + + private static final String DEFAULT_DATE_FORMAT = "yyyy/MM/dd HH:mm"; + private static final String HEADER = "Volume Unit "; + private static String GDG_REGEX = "^\\S+\\s+.*?\\s+(G\\d{4}V\\d{2})$"; + private static final Log LOG = LogFactory.getLog(MainframeFTPFileGdgEntryParser.class.getName()); + + public MainframeFTPFileGdgEntryParser() { + super(GDG_REGEX); + LOG.info("MainframeFTPFileGdgEntryParser default constructor"); + } + + @Override + public FTPFile parseFTPEntry(String entry) { + LOG.info("parseFTPEntry: "+entry); + if (isFtpListingHeader(entry)) { + return null; + } + if (matches(entry)) { + String dsName = group(1); + return createFtpFile(entry,dsName); + } + return null; + } + + protected FTPFile createFtpFile(String entry, String dsName) { + FTPFile file = new FTPFile(); + file.setRawListing(entry); + file.setName(dsName); + file.setType(FTPFile.FILE_TYPE); + return file; + } + + protected Boolean isFtpListingHeader(String entry) { + return entry.startsWith(HEADER); + } + + @Override + protected FTPClientConfig getDefaultConfiguration() { + return new FTPClientConfig(FTPClientConfig.SYST_MVS, + DEFAULT_DATE_FORMAT, null, null, null, null); + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/18212bec/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java index 654721e..e7c48a6 100644 --- a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java +++ b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java @@ -86,8 +86,14 @@ public final class MainframeFTPClientUtils { ftp.changeWorkingDirectory("'" + pdsName + "'"); FTPFile[] ftpFiles = null; if (!MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED.equals(dsType)) { - // excepting partitioned datasets, use the MainframeFTPFileEntryParser, default doesn't match larger datasets - FTPListParseEngine parser = ftp.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME, ""); + FTPListParseEngine parser = null; + if (MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG.equals(dsType)) { + // use GDG specific parser to filter out non GDG datasets + parser = ftp.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_GDG_ENTRY_PARSER_CLASSNAME, ""); + } else { + // excepting partitioned datasets, use the MainframeFTPFileEntryParser, default doesn't match larger datasets + parser = ftp.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME, ""); + } List<FTPFile> listing = new ArrayList<FTPFile>(); while(parser.hasNext()) { FTPFile[] files = parser.getNext(25); http://git-wip-us.apache.org/repos/asf/sqoop/blob/18212bec/src/scripts/thirdpartytest/docker-compose/sqoop-thirdpartytest-db-services.yml ---------------------------------------------------------------------- diff --git a/src/scripts/thirdpartytest/docker-compose/sqoop-thirdpartytest-db-services.yml b/src/scripts/thirdpartytest/docker-compose/sqoop-thirdpartytest-db-services.yml index 4648f54..b4cf488 100644 --- a/src/scripts/thirdpartytest/docker-compose/sqoop-thirdpartytest-db-services.yml +++ b/src/scripts/thirdpartytest/docker-compose/sqoop-thirdpartytest-db-services.yml @@ -112,7 +112,7 @@ services: timeout: 10s retries: 20 mainframe: - image: cntroversycubed/sqoopgdg:afdf57b15d8e71eb77d24d606b77e185ef39ceb3 + image: cntroversycubed/sqoopgdg:42e6c3a1229a6cdf346eb3976bd7298091ea11e2 container_name: sqoop_mainframe_gdg_container ports: - 2121:2121 http://git-wip-us.apache.org/repos/asf/sqoop/blob/18212bec/src/test/org/apache/sqoop/manager/mainframe/MainframeManagerImportTest.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/mainframe/MainframeManagerImportTest.java b/src/test/org/apache/sqoop/manager/mainframe/MainframeManagerImportTest.java index 3b8ed23..af5c754 100644 --- a/src/test/org/apache/sqoop/manager/mainframe/MainframeManagerImportTest.java +++ b/src/test/org/apache/sqoop/manager/mainframe/MainframeManagerImportTest.java @@ -149,6 +149,13 @@ public class MainframeManagerImportTest extends ImportJobTestCase { doImportAndVerify(MainframeTestUtil.SEQ_BINARY_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL, files, "--as-binaryfile", "--buffersize", "64000"); } + @Test + public void testImportMixedBinaryWithBufferSize() throws IOException { + HashMap<String,String> files = new HashMap<String,String>(); + files.put(MainframeTestUtil.MIXED_BINARY_DATASET_FILENAME, MainframeTestUtil.EXPECTED_MIXED_BINARY_DATASET_MD5); + doImportAndVerify(MainframeTestUtil.MIXED_BINARY_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG, files, "--as-binaryfile", "--buffersize", "64000"); + } + private String [] getArgv(String datasetName, String datasetType, String ... extraArgs) { ArrayList<String> args = new ArrayList<String>(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/18212bec/src/test/org/apache/sqoop/manager/mainframe/MainframeTestUtil.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/mainframe/MainframeTestUtil.java b/src/test/org/apache/sqoop/manager/mainframe/MainframeTestUtil.java index 9f86f6c..a65aea6 100644 --- a/src/test/org/apache/sqoop/manager/mainframe/MainframeTestUtil.java +++ b/src/test/org/apache/sqoop/manager/mainframe/MainframeTestUtil.java @@ -68,4 +68,13 @@ public class MainframeTestUtil { public static final String EXPECTED_SEQ_BINARY_DATASET_MD5 = System.getProperty( "sqoop.test.mainframe.ftp.binary.dataset.seq.md5", "1591c0fcc718fda7e9c1f3561d232b2b"); + public static final String MIXED_BINARY_DATASET_NAME = System.getProperty( + "sqoop.test.mainframe.ftp.binary.dataset.mixed", + "TSODIQ1.MIXED"); + public static final String MIXED_BINARY_DATASET_FILENAME = System.getProperty( + "sqoop.test.mainframe.ftp.binary.dataset.mixed.filename", + "G0039V00"); + public static final String EXPECTED_MIXED_BINARY_DATASET_MD5 = System.getProperty( + "sqoop.test.mainframe.ftp.binary.dataset.mixed.md5", + "5e7f4ec7cbeae8e0e0b4d88346eb9349"); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/18212bec/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeFTPFileGdgEntryParser.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeFTPFileGdgEntryParser.java b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeFTPFileGdgEntryParser.java new file mode 100644 index 0000000..521a042 --- /dev/null +++ b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeFTPFileGdgEntryParser.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.mainframe; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import org.apache.commons.net.ftp.FTPFile; +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestMainframeFTPFileGdgEntryParser { + /* Sample FTP listing + Volume Unit Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname + H19761 Tape G0034V00 + H81751 Tape G0035V00 + H73545 Tape G0036V00 + G10987 Tape G0037V00 + SHT331 3390 **NONE** 1 15 VB 114 27998 PS DUMMY + SHT337 3390 **NONE** 1 15 VB 114 27998 PS G0035V00.COPY + SHT33A 3390 **NONE** 1 15 VB 114 27998 PS HELLO + + * And what we need to get back from parsing are the following entries:- + H19761 Tape G0034V00 + H81751 Tape G0035V00 + H73545 Tape G0036V00 + G10987 Tape G0037V00 + */ + private final static String FTP_LIST_HEADER = "Volume Unit Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname"; + private final String DSNAME = "G0034V00"; + private final String ENTRY = String.format("H19761 Tape %s",DSNAME); + private List<String> listing; + private MainframeFTPFileGdgEntryParser parser; + @Before + public void setUpBefore() throws Exception { + parser = new MainframeFTPFileGdgEntryParser(); + listing = new ArrayList<>(); + listing.add("Volume Unit Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname"); + listing.add(ENTRY); + listing.add("H81751 Tape G0035V00"); + listing.add("H73545 Tape G0036V00"); + listing.add("G10987 Tape G0037V00"); + listing.add("SHT331 3390 **NONE** 1 15 VB 114 27998 PS DUMMY"); + listing.add("SHT337 3390 **NONE** 1 15 VB 114 27998 PS G0035V00.COPY"); + listing.add("SHT33A 3390 **NONE** 1 15 VB 114 27998 PS HELLO"); + } + + @Test + public void testIsHeader() { + assertTrue(parser.isFtpListingHeader(FTP_LIST_HEADER)); + } + + @Test + public void testCreateFtpFile() { + FTPFile file = parser.createFtpFile(ENTRY, DSNAME); + assertEquals(ENTRY,file.getRawListing()); + assertEquals(DSNAME,file.getName()); + } + + @Test + public void testParseFTPEntry() { + final int EXPECTED_RECORD_COUNT=4; + long i = listing.stream() + .map(parser::parseFTPEntry) + .filter(Objects::nonNull) + .count(); + assertEquals(EXPECTED_RECORD_COUNT,i); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/18212bec/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java index 90a8519..0714bdc 100644 --- a/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java +++ b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java @@ -297,7 +297,7 @@ public class TestMainframeFTPClientUtils { FTPFile file2 = new FTPFile(); file2.setName("G0101V00"); file2.setType(FTPFile.FILE_TYPE); - when(mockFTPClient.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME,"")).thenReturn(mockFTPListParseEngine); + when(mockFTPClient.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_GDG_ENTRY_PARSER_CLASSNAME,"")).thenReturn(mockFTPListParseEngine); when(mockFTPListParseEngine.hasNext()).thenReturn(true,true,false); when(mockFTPListParseEngine.getNext(25)).thenReturn(new FTPFile[] {file1,file2}); } catch (IOException e) {
