Repository: sqoop Updated Branches: refs/heads/sqoop2 77be2a8f9 -> 4e2204504
SQOOP-2463: Sqoop2: Add support for schema-less to schema-less transfer for CSV IDF (Abraham Fine via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/4e220450 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/4e220450 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/4e220450 Branch: refs/heads/sqoop2 Commit: 4e22045043dfa3d3c3032d4a433c300980728984 Parents: 77be2a8 Author: Jarek Jarcec Cecho <[email protected]> Authored: Wed Oct 21 08:06:41 2015 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Wed Oct 21 08:06:41 2015 -0700 ---------------------------------------------------------------------- .../sqoop/connector/hdfs/HdfsExtractor.java | 28 ++++---- .../apache/sqoop/connector/hdfs/HdfsLoader.java | 26 ++++--- .../org/apache/sqoop/job/mr/SqoopMapper.java | 2 +- .../org/apache/sqoop/test/utils/HdfsUtils.java | 4 +- .../connector/hdfs/FromHDFSToHDFSTest.java | 74 ++++++++++++++++++++ 5 files changed, 109 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/4e220450/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java index b35c957..23bbcc0 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java @@ -18,6 +18,7 @@ package org.apache.sqoop.connector.hdfs; import java.io.IOException; +import java.io.UnsupportedEncodingException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -38,6 +39,7 @@ import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.etl.io.DataWriter; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; +import org.apache.sqoop.schema.ByteArraySchema; import org.apache.sqoop.schema.Schema; /** @@ -112,12 +114,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura boolean hasNext = filereader.next(line); while (hasNext) { rowsRead++; - if (HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)) { - Object[] data = SqoopIDFUtils.fromCSV(line.toString(), schema); - dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, fromJobConfiguration, data)); - } else { - dataWriter.writeStringRecord(line.toString()); - } + extractRow(linkConfiguration, fromJobConfiguration, line); line = new Text(); hasNext = filereader.next(line); if (filereader.getPosition() >= end && filereader.syncSeen()) { @@ -180,12 +177,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura next = fileseeker.getPos(); } rowsRead++; - if (HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)) { - Object[] data = SqoopIDFUtils.fromCSV(line.toString(), schema); - dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, fromJobConfiguration, data)); - } else { - dataWriter.writeStringRecord(line.toString()); - } + extractRow(linkConfiguration, fromJobConfiguration, line); } LOG.info("Extracting ended on position: " + fileseeker.getPos()); filestream.close(); @@ -213,5 +205,17 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura return true; } + private void extractRow(LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration, Text line) throws UnsupportedEncodingException { + if (schema instanceof ByteArraySchema) { + dataWriter.writeArrayRecord(new Object[] {line.toString().getBytes(SqoopIDFUtils.BYTE_FIELD_CHARSET)}); + } else if (!HdfsUtils.hasCustomFormat(linkConfiguration, + fromJobConfiguration)) { + dataWriter.writeStringRecord(line.toString()); + } else { + Object[] data = SqoopIDFUtils.fromCSV(line.toString(), schema); + dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, fromJobConfiguration, data)); + } + } + } http://git-wip-us.apache.org/repos/asf/sqoop/blob/4e220450/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java index 948b1b6..798e552 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java @@ -36,6 +36,7 @@ import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.etl.io.DataReader; import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.LoaderContext; +import org.apache.sqoop.schema.ByteArraySchema; import org.apache.sqoop.utils.ClassUtils; public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> { @@ -86,21 +87,24 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> { filewriter.initialize(filepath, conf, codec); - if (HdfsUtils.hasCustomFormat(linkConfiguration, toJobConfig)) { - Object[] record; - - while ((record = reader.readArrayRecord()) != null) { - filewriter.write( - SqoopIDFUtils.toCSV( - HdfsUtils.formatRecord(linkConfiguration, toJobConfig, record), - context.getSchema())); + if (!HdfsUtils.hasCustomFormat(linkConfiguration, toJobConfig) || (context.getSchema() instanceof ByteArraySchema)) { + String record; + while ((record = reader.readTextRecord()) != null) { + if (context.getSchema() instanceof ByteArraySchema) { + filewriter.write(SqoopIDFUtils.toText(record)); + } else { + filewriter.write(record); + } rowsWritten++; } } else { - String record; + Object[] record; - while ((record = reader.readTextRecord()) != null) { - filewriter.write(record); + while ((record = reader.readArrayRecord()) != null) { + filewriter.write( + SqoopIDFUtils.toCSV( + HdfsUtils.formatRecord(linkConfiguration, toJobConfig, record), + context.getSchema())); rowsWritten++; } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/4e220450/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index 937ef5a..c93813b 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -85,7 +85,7 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, Object fromJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); SqoopSplit split = context.getCurrentKey(); - ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context), fromSchema); + ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context), matcher.getFromSchema()); try { LOG.info("Starting progress service"); http://git-wip-us.apache.org/repos/asf/sqoop/blob/4e220450/test/src/main/java/org/apache/sqoop/test/utils/HdfsUtils.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/sqoop/test/utils/HdfsUtils.java b/test/src/main/java/org/apache/sqoop/test/utils/HdfsUtils.java index 610156e..0369994 100644 --- a/test/src/main/java/org/apache/sqoop/test/utils/HdfsUtils.java +++ b/test/src/main/java/org/apache/sqoop/test/utils/HdfsUtils.java @@ -64,7 +64,9 @@ public class HdfsUtils { LinkedList<Path> files = new LinkedList<Path>(); for (FileStatus fileStatus : fs.listStatus(new Path(directory), filterHiddenFiles)) { LOG.debug("Found mapreduce output file: " + fileStatus.getPath() + " with size " + fileStatus.getLen()); - files.add(fileStatus.getPath()); + if (fileStatus.isFile()) { + files.add(fileStatus.getPath()); + } } return files.toArray(new Path[files.size()]); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/4e220450/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/FromHDFSToHDFSTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/FromHDFSToHDFSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/FromHDFSToHDFSTest.java new file mode 100644 index 0000000..4b2fa06 --- /dev/null +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/FromHDFSToHDFSTest.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.integration.connector.hdfs; + +import org.apache.hadoop.fs.Path; +import org.apache.sqoop.connector.hdfs.configuration.ToFormat; +import org.apache.sqoop.model.MDriverConfig; +import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MLink; +import org.apache.sqoop.test.asserts.HdfsAsserts; +import org.apache.sqoop.test.testcases.ConnectorTestCase; +import org.apache.sqoop.test.utils.HdfsUtils; +import org.testng.annotations.Test; + +/** + * Test schemaless to schemaless transfer by using two hdfs connectors + */ +public class FromHDFSToHDFSTest extends ConnectorTestCase { + + @Test + public void test() throws Exception { + String[] sampleData = new String[]{ + "1,'USA','2004-10-23','San Francisco'", + "2,'USA','2004-10-24','Sunnyvale'", + "3,'Czech Republic','2004-10-25','Brno'", + "4,'USA','2004-10-26','Palo Alto'" + }; + + createFromFile("input-0001", sampleData); + + MLink hdfsLinkFrom = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsLinkFrom); + saveLink(hdfsLinkFrom); + + MLink hdfsLinkTo = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsLinkTo); + saveLink(hdfsLinkTo); + + MJob job = getClient().createJob(hdfsLinkFrom.getPersistenceId(), hdfsLinkTo.getPersistenceId()); + + fillHdfsFromConfig(job); + + fillHdfsToConfig(job, ToFormat.TEXT_FILE); + hdfsClient.mkdirs(new Path(HdfsUtils.joinPathFragments + (getMapreduceDirectory(), "TO"))); + + job.getToJobConfig().getStringInput("toJobConfig.outputDirectory") + .setValue(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO")); + + + MDriverConfig driverConfig = job.getDriverConfig(); + driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(3); + saveJob(job); + + executeJob(job); + + HdfsAsserts.assertMapreduceOutput(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO"), sampleData); + } +}
