Repository: sqoop Updated Branches: refs/heads/sqoop2 015bb94d4 -> c820aaf87
SQOOP-1678: Sqoop2: [HDFS Connector] Configurable null values (Abraham Elmahrek via Venkat Ranganathan) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/c820aaf8 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/c820aaf8 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/c820aaf8 Branch: refs/heads/sqoop2 Commit: c820aaf870ddfacc1039f559b138cb9e72b0a287 Parents: 015bb94 Author: Venkat Ranganathan <[email protected]> Authored: Wed Dec 10 20:50:15 2014 -0800 Committer: Venkat Ranganathan <[email protected]> Committed: Wed Dec 10 20:50:15 2014 -0800 ---------------------------------------------------------------------- .../sqoop/connector/hdfs/HdfsConstants.java | 2 + .../sqoop/connector/hdfs/HdfsExtractor.java | 32 ++++++--- .../apache/sqoop/connector/hdfs/HdfsLoader.java | 14 +++- .../apache/sqoop/connector/hdfs/HdfsUtils.java | 73 ++++++++++++++++++++ .../hdfs/configuration/FromJobConfig.java | 4 ++ .../hdfs/configuration/ToJobConfig.java | 6 ++ .../resources/hdfs-connector-config.properties | 18 +++++ .../sqoop/connector/hdfs/TestExtractor.java | 71 ++++++++++++++++--- .../sqoop/connector/hdfs/TestHdfsBase.java | 34 +++++++-- .../sqoop/connector/hdfs/TestHdfsUtils.java | 45 +++++++++++- .../apache/sqoop/connector/hdfs/TestLoader.java | 67 +++++++++++++++++- 11 files changed, 335 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/c820aaf8/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java index 6e369c6..bd74bec 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java @@ -26,6 +26,8 @@ public final class HdfsConstants extends Constants { public static final String RESOURCE_BUNDLE_NAME = "hdfs-connector-config"; + public static final char DEFAULT_FIELD_DELIMITER = ','; + public static final char DEFAULT_RECORD_DELIMITER = '\n'; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/c820aaf8/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java index 2586f94..3c417b4 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java @@ -52,7 +52,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura @Override public void extract(ExtractorContext context, LinkConfiguration linkConfiguration, - FromJobConfiguration jobConfig, HdfsPartition partition) { + FromJobConfiguration jobConfiguration, HdfsPartition partition) { conf = HdfsUtils.configureURI(((PrefixContext) context.getContext()).getConfiguration(), linkConfiguration); dataWriter = context.getDataWriter(); @@ -62,14 +62,16 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura LOG.info("Working on partition: " + p); int numFiles = p.getNumberOfFiles(); for (int i = 0; i < numFiles; i++) { - extractFile(p.getFile(i), p.getOffset(i), p.getLength(i)); + extractFile(linkConfiguration, jobConfiguration, p.getFile(i), p.getOffset(i), p.getLength(i)); } } catch (IOException e) { throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0001, e); } } - private void extractFile(Path file, long start, long length) + private void extractFile(LinkConfiguration linkConfiguration, + FromJobConfiguration fromJobCOnfiguration, + Path file, long start, long length) throws IOException { long end = start + length; LOG.info("Extracting file " + file); @@ -77,9 +79,9 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura LOG.info("\t to offset " + end); LOG.info("\t of length " + length); if(isSequenceFile(file)) { - extractSequenceFile(file, start, length); + extractSequenceFile(linkConfiguration, fromJobCOnfiguration, file, start, length); } else { - extractTextFile(file, start, length); + extractTextFile(linkConfiguration, fromJobCOnfiguration, file, start, length); } } @@ -91,7 +93,9 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura * @throws IOException */ @SuppressWarnings("deprecation") - private void extractSequenceFile(Path file, long start, long length) + private void extractSequenceFile(LinkConfiguration linkConfiguration, + FromJobConfiguration fromJobConfiguration, + Path file, long start, long length) throws IOException { LOG.info("Extracting sequence file"); long end = start + length; @@ -106,7 +110,11 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura boolean hasNext = filereader.next(line); while (hasNext) { rowRead++; - dataWriter.writeStringRecord(line.toString()); + if (HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)) { + dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, fromJobConfiguration, line.toString())); + } else { + dataWriter.writeStringRecord(line.toString()); + } line = new Text(); hasNext = filereader.next(line); if (filereader.getPosition() >= end && filereader.syncSeen()) { @@ -124,7 +132,9 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura * @throws IOException */ @SuppressWarnings("resource") - private void extractTextFile(Path file, long start, long length) + private void extractTextFile(LinkConfiguration linkConfiguration, + FromJobConfiguration fromJobConfiguration, + Path file, long start, long length) throws IOException { LOG.info("Extracting text file"); long end = start + length; @@ -167,7 +177,11 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura next = fileseeker.getPos(); } rowRead++; - dataWriter.writeStringRecord(line.toString()); + if (HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)) { + dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, fromJobConfiguration, line.toString())); + } else { + dataWriter.writeStringRecord(line.toString()); + } } LOG.info("Extracting ended on position: " + fileseeker.getPos()); filestream.close(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/c820aaf8/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java index 6c57cf2..05b0230 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java @@ -81,10 +81,18 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> { filewriter.initialize(filepath,conf,codec); - String csv; + if (HdfsUtils.hasCustomFormat(linkConfiguration, toJobConfig)) { + Object[] record; - while ((csv = reader.readTextRecord()) != null) { - filewriter.write(csv); + while ((record = reader.readArrayRecord()) != null) { + filewriter.write(HdfsUtils.formatRecord(linkConfiguration, toJobConfig, record)); + } + } else { + String record; + + while ((record = reader.readTextRecord()) != null) { + filewriter.write(record); + } } filewriter.destroy(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/c820aaf8/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsUtils.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsUtils.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsUtils.java index 352ee17..353c1f2 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsUtils.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsUtils.java @@ -17,8 +17,11 @@ */ package org.apache.sqoop.connector.hdfs; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; /** * Utilities for HDFS. @@ -40,4 +43,74 @@ public class HdfsUtils { return conf; } + + /** + * Given the configurations, should data received be customized? + * @param linkConfiguration Link configuration + * @param fromJobConfiguration Job configuration + * @return boolean + */ + public static boolean hasCustomFormat(LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration) { + return fromJobConfiguration.fromJobConfig.overrideNullValue != null + && fromJobConfiguration.fromJobConfig.overrideNullValue; + } + + /** + * Given the configurations, should data received be customized? + * @param linkConfiguration Link configuration + * @param toJobConfiguration Job configuration + * @return boolean + */ + public static boolean hasCustomFormat(LinkConfiguration linkConfiguration, ToJobConfiguration toJobConfiguration) { + return toJobConfiguration.toJobConfig.overrideNullValue != null + && toJobConfiguration.toJobConfig.overrideNullValue; + } + + /** + * Given a String record as provided by an intermediate data format or existing HDFS output + * format the record according to configuration. + * @param linkConfiguration Link configuration + * @param fromJobConfiguration Job configuration + * @param record String record + * @return Object[] + */ + public static Object[] formatRecord(LinkConfiguration linkConfiguration, + FromJobConfiguration fromJobConfiguration, + String record) { + Object[] arrayRecord = StringUtils.split(record, HdfsConstants.DEFAULT_FIELD_DELIMITER); + + if (fromJobConfiguration.fromJobConfig.overrideNullValue != null + && fromJobConfiguration.fromJobConfig.overrideNullValue) { + for (int i = 0; i < arrayRecord.length; ++i) { + if (arrayRecord[i].equals(fromJobConfiguration.fromJobConfig.nullValue)) { + arrayRecord[i] = null; + } + } + } + + return arrayRecord; + } + + /** + * Given an object array record as provided by an intermediate data format + * format record according to configuration. + * @param linkConfiguration Link configuration + * @param toJobConfiguration Job configuration + * @param record Record array + * @return String + */ + public static String formatRecord(LinkConfiguration linkConfiguration, + ToJobConfiguration toJobConfiguration, + Object[] record) { + if (toJobConfiguration.toJobConfig.overrideNullValue != null + && toJobConfiguration.toJobConfig.overrideNullValue) { + for (int i = 0; i < record.length; ++i) { + if (record[i] == null) { + record[i] = toJobConfiguration.toJobConfig.nullValue; + } + } + } + + return StringUtils.join(record, HdfsConstants.DEFAULT_FIELD_DELIMITER); + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/c820aaf8/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfig.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfig.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfig.java index 509d772..89ff9aa 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfig.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfig.java @@ -29,4 +29,8 @@ import org.apache.sqoop.validation.validators.NotEmpty; public class FromJobConfig { @Input(size = 255, validators = { @Validator(NotEmpty.class) }) public String inputDirectory; + + @Input(size = 255) public Boolean overrideNullValue; + + @Input(size = 255) public String nullValue; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/c820aaf8/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java index abddbfb..b7a9c3d 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java @@ -30,6 +30,12 @@ import org.apache.sqoop.validation.validators.NotEmpty; @ConfigClass(validators = { @Validator(ToJobConfig.ToJobConfigValidator.class)}) public class ToJobConfig { + public static String DEFAULT_NULL_VALUE = "NULL"; + + @Input(size = 255) public Boolean overrideNullValue; + + @Input(size = 255) public String nullValue; + @Input public ToFormat outputFormat; @Input public ToCompression compression; http://git-wip-us.apache.org/repos/asf/sqoop/blob/c820aaf8/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties index 3d088d0..2dca634 100644 --- a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties +++ b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties @@ -48,6 +48,15 @@ toJobConfig.outputDirectory.help = Output directory for final data toJobConfig.ignored.label = Ignored toJobConfig.ignored.help = This value is ignored +toJobConfig.overrideNullValue.label = Override null value +toJobConfig.overrideNullValue.help = If set to true, then the null value will \ + be overridden with the value set in \ + toJobConfig.nullValue. + +toJobConfig.nullValue.label = Null value +toJobConfig.nullValue.help = Use this particular character or sequence of characters \ + as a value representing null when outputting to a file. + # From Job Config # fromJobConfig.label = From Job configuration @@ -55,3 +64,12 @@ fromJobConfig.help = Specifies information required to get data from Hadoop ecos fromJobConfig.inputDirectory.label = Input directory fromJobConfig.inputDirectory.help = Directory that should be exported + +fromJobConfig.overrideNullValue.label = Override null value +fromJobConfig.overrideNullValue.help = If set to true, then the null value will \ + be overridden with the value set in \ + toJobConfig.nullValue. + +fromJobConfig.nullValue.label = Null value +fromJobConfig.nullValue.help = Use this particular character or sequence of characters \ + as a value representing null when outputting to a file. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/c820aaf8/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java index f4b4454..e5b7b2a 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java @@ -79,11 +79,11 @@ public class TestExtractor extends TestHdfsBase { FileUtils.mkdirs(inputDirectory); switch (this.outputFileType) { case TEXT_FILE: - createTextInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE); + createTextInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\N"); break; case SEQUENCE_FILE: - createSequenceInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE); + createSequenceInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\N"); break; } } @@ -108,10 +108,10 @@ public class TestExtractor extends TestHdfsBase { public void writeStringRecord(String text) { int index; String[] components = text.split(","); - Assert.assertEquals(3, components.length); + Assert.assertEquals(5, components.length); - // Value should take on the form <integer>,<float>,'<integer>' - // for a single index. IE: 1,1.0,'1'. + // Value should take on the form <integer>,<float>,NULL,'<integer>' + // for a single index. IE: 1,1.0,NULL,'1'. try { index = Integer.parseInt(components[0]); } catch (NumberFormatException e) { @@ -119,8 +119,10 @@ public class TestExtractor extends TestHdfsBase { } Assert.assertFalse(visited[index - 1]); - Assert.assertEquals(String.valueOf((double)index), components[1]); - Assert.assertEquals("'" + index + "'", components[2]); + Assert.assertEquals(String.valueOf((double) index), components[1]); + Assert.assertEquals("NULL", components[2]); + Assert.assertEquals("'" + index + "'", components[3]); + Assert.assertEquals("\\N", components[4]); visited[index - 1] = true; } @@ -141,4 +143,57 @@ public class TestExtractor extends TestHdfsBase { Assert.assertTrue("Index " + (index + 1) + " was not visited", visited[index]); } } -} \ No newline at end of file + + @Test + public void testOverrideNull() throws Exception { + Configuration conf = new Configuration(); + PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context."); + final boolean[] visited = new boolean[NUMBER_OF_FILES * NUMBER_OF_ROWS_PER_FILE]; + ExtractorContext context = new ExtractorContext(prefixContext, new DataWriter() { + @Override + public void writeArrayRecord(Object[] array) { + int index; + Assert.assertEquals(5, array.length); + + // Value should take on the form <integer>,<float>,NULL,'<integer>' + // for a single index. IE: 1,1.0,NULL,'1'. + try { + index = Integer.parseInt(array[0].toString()); + } catch (NumberFormatException e) { + throw new AssertionError("Could not parse int for " + array[0]); + } + + Assert.assertFalse(visited[index - 1]); + Assert.assertEquals(String.valueOf((double) index), array[1]); + Assert.assertEquals("NULL", array[2]); + Assert.assertEquals("'" + index + "'", array[3]); + Assert.assertNull(array[4]); + + visited[index - 1] = true; + } + + @Override + public void writeStringRecord(String text) { + throw new AssertionError("Should not be writing string."); + } + + @Override + public void writeRecord(Object obj) { + throw new AssertionError("Should not be writing object."); + } + }, null); + + LinkConfiguration emptyLinkConfig = new LinkConfiguration(); + FromJobConfiguration fromJobConfiguration = new FromJobConfiguration(); + fromJobConfiguration.fromJobConfig.overrideNullValue = true; + // Should skip "NULL" values + fromJobConfiguration.fromJobConfig.nullValue = "\\N"; + HdfsPartition partition = createPartition(FileUtils.listDir(inputDirectory)); + + extractor.extract(context, emptyLinkConfig, fromJobConfiguration, partition); + + for (int index = 0; index < NUMBER_OF_FILES * NUMBER_OF_ROWS_PER_FILE; ++index) { + Assert.assertTrue("Index " + (index + 1) + " was not visited", visited[index]); + } + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/c820aaf8/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java index 6eae7fd..ac44595 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java @@ -48,10 +48,17 @@ public class TestHdfsBase { return new HdfsPartition(paths, offsets, lengths, locations); } + protected String formatRow(String format, int index) { + String row = format.replaceAll("\\%s", "'" + index + "'"); + row = row.replaceAll("\\%d", Integer.toString(index)); + return row.replaceAll("\\%f", Double.toString((double)index)); + } + protected void createTextInput(String indir, Class<? extends CompressionCodec> clz, int numberOfFiles, - int numberOfRows) + int numberOfRows, + String format) throws IOException, InstantiationException, IllegalAccessException { Configuration conf = new Configuration(); @@ -80,8 +87,7 @@ public class TestHdfsBase { } for (int ri = 0; ri < numberOfRows; ri++) { - String row = index + "," + (double)index + ",'" + index + "'"; - filewriter.write(row + HdfsConstants.DEFAULT_RECORD_DELIMITER); + filewriter.write(formatRow(format, index) + HdfsConstants.DEFAULT_RECORD_DELIMITER); index++; } @@ -89,10 +95,19 @@ public class TestHdfsBase { } } + protected void createTextInput(String indir, + Class<? extends CompressionCodec> clz, + int numberOfFiles, + int numberOfRows) + throws IOException, InstantiationException, IllegalAccessException { + createTextInput(indir, clz, numberOfFiles, numberOfRows, "%d,%f,%s"); + } + protected void createSequenceInput(String indir, Class<? extends CompressionCodec> clz, int numberOfFiles, - int numberOfRows) + int numberOfRows, + String format) throws IOException, InstantiationException, IllegalAccessException { Configuration conf = new Configuration(); @@ -119,8 +134,7 @@ public class TestHdfsBase { Text text = new Text(); for (int ri = 0; ri < numberOfRows; ri++) { - String row = index + "," + (double)index + ",'" + index + "'"; - text.set(row); + text.set(formatRow(format, index)); filewriter.append(text, NullWritable.get()); index++; } @@ -128,4 +142,12 @@ public class TestHdfsBase { filewriter.close(); } } + + protected void createSequenceInput(String indir, + Class<? extends CompressionCodec> clz, + int numberOfFiles, + int numberOfRows) + throws IOException, InstantiationException, IllegalAccessException { + createSequenceInput(indir, clz, numberOfFiles, numberOfRows, "%d,%f,%s"); + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/c820aaf8/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsUtils.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsUtils.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsUtils.java index 63e14ae..bba6502 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsUtils.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsUtils.java @@ -18,11 +18,12 @@ package org.apache.sqoop.connector.hdfs; import org.apache.hadoop.conf.Configuration; +import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.*; public class TestHdfsUtils { @@ -41,4 +42,44 @@ public class TestHdfsUtils { assertEquals(TEST_URI, conf.get("fs.default.name")); assertEquals(TEST_URI, conf.get("fs.defaultFS")); } + + @Test + public void testIsModifiable() throws Exception { + LinkConfiguration linkConfiguration = new LinkConfiguration(); + FromJobConfiguration fromJobConfiguration = new FromJobConfiguration(); + ToJobConfiguration toJobConfiguration = new ToJobConfiguration(); + + // No configuration + assertFalse(HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)); + assertFalse(HdfsUtils.hasCustomFormat(linkConfiguration, toJobConfiguration)); + + // Without override + fromJobConfiguration.fromJobConfig.nullValue = "\0"; + toJobConfiguration.toJobConfig.nullValue = "\0"; + assertFalse(HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)); + assertFalse(HdfsUtils.hasCustomFormat(linkConfiguration, toJobConfiguration)); + + // With override + fromJobConfiguration.fromJobConfig.overrideNullValue = true; + toJobConfiguration.toJobConfig.overrideNullValue = true; + assertTrue(HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)); + assertTrue(HdfsUtils.hasCustomFormat(linkConfiguration, toJobConfiguration)); + } + + @Test + public void testTransformRecord() throws Exception { + LinkConfiguration linkConfiguration = new LinkConfiguration(); + FromJobConfiguration fromJobConfiguration = new FromJobConfiguration(); + ToJobConfiguration toJobConfiguration = new ToJobConfiguration(); + final String record = "'Abe',\0,'test'"; + final Object[] arrayRecord = new Object[]{ + "'Abe'", + "\0", + "'test'" + }; + + // No transformations + assertArrayEquals(arrayRecord, HdfsUtils.formatRecord(linkConfiguration, fromJobConfiguration, record)); + assertEquals(record, HdfsUtils.formatRecord(linkConfiguration, toJobConfiguration, arrayRecord)); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/c820aaf8/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java index b404c34..be57fa0 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java @@ -143,7 +143,64 @@ public class TestLoader extends TestHdfsBase { Assert.assertEquals(5, fs.listStatus(outputPath).length); } - private void verifyOutput(FileSystem fs, Path file) throws IOException { + @Test + public void testOverrideNull() throws Exception { + FileSystem fs = FileSystem.get(new Configuration()); + + Configuration conf = new Configuration(); + PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context."); + LoaderContext context = new LoaderContext(prefixContext, new DataReader() { + private long index = 0L; + + @Override + public Object[] readArrayRecord() { + if (index++ < NUMBER_OF_ROWS_PER_FILE) { + return new Object[]{ + index, + (double)index, + null, + "'" + index + "'" + }; + } else { + return null; + } + } + + @Override + public String readTextRecord() { + throw new AssertionError("should not be at readTextRecord"); + } + + @Override + public Object readContent() { + throw new AssertionError("should not be at readContent"); + } + }, null); + LinkConfiguration linkConf = new LinkConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + jobConf.toJobConfig.outputDirectory = outputDirectory; + jobConf.toJobConfig.compression = compression; + jobConf.toJobConfig.outputFormat = outputFormat; + jobConf.toJobConfig.overrideNullValue = true; + jobConf.toJobConfig.nullValue = "\\N"; + Path outputPath = new Path(outputDirectory); + + loader.load(context, linkConf, jobConf); + Assert.assertEquals(1, fs.listStatus(outputPath).length); + + for (FileStatus status : fs.listStatus(outputPath)) { + verifyOutput(fs, status.getPath(), "%d,%f,\\N,%s"); + } + + loader.load(context, linkConf, jobConf); + Assert.assertEquals(2, fs.listStatus(outputPath).length); + loader.load(context, linkConf, jobConf); + loader.load(context, linkConf, jobConf); + loader.load(context, linkConf, jobConf); + Assert.assertEquals(5, fs.listStatus(outputPath).length); + } + + private void verifyOutput(FileSystem fs, Path file, String format) throws IOException { Configuration conf = new Configuration(); FSDataInputStream fsin = fs.open(file); CompressionCodec codec; @@ -181,7 +238,7 @@ public class TestLoader extends TestHdfsBase { BufferedReader textReader = new BufferedReader(in); for (int i = 1; i <= NUMBER_OF_ROWS_PER_FILE; ++i) { - Assert.assertEquals(i + "," + (double)i + ",'" + i + "'", textReader.readLine()); + Assert.assertEquals(formatRow(format, i), textReader.readLine()); } break; @@ -208,10 +265,14 @@ public class TestLoader extends TestHdfsBase { Text line = new Text(); int index = 1; while (sequenceReader.next(line)) { - Assert.assertEquals(index + "," + (double)index + ",'" + index++ + "'", line.toString()); + Assert.assertEquals(formatRow(format, index++), line.toString()); line = new Text(); } break; } } + + private void verifyOutput(FileSystem fs, Path file) throws IOException { + verifyOutput(fs, file, "%d,%f,%s"); + } }
