Repository: crunch Updated Branches: refs/heads/apache-crunch-0.8 5b82e2612 -> 8c7cd4ac5
CRUNCH-429: Fix CSVInputFormat Signed-off-by: Micah Whitacre <[email protected]> CRUNCH-429: Fixed spelling error, handled potential NPE, and moved FileSystem retrieval outside of for loop. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/8c7cd4ac Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/8c7cd4ac Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/8c7cd4ac Branch: refs/heads/apache-crunch-0.8 Commit: 8c7cd4ac5e699f2c7065d7956737ef277e313259 Parents: 5b82e26 Author: Mac Champion <[email protected]> Authored: Tue Jun 24 12:30:34 2014 -0500 Committer: Micah Whitacre <[email protected]> Committed: Thu Jun 26 20:32:56 2014 -0500 ---------------------------------------------------------------------- .../crunch/io/text/csv/CSVFileSourceIT.java | 66 ++++++++++++-------- .../crunch/io/text/csv/CSVInputFormat.java | 47 ++++++++------ 2 files changed, 66 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/8c7cd4ac/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java b/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java index a81f78d..ba8e193 100644 --- a/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java @@ -17,19 +17,18 @@ */ package org.apache.crunch.io.text.csv; -import java.io.File; -import java.io.FileInputStream; +import static org.junit.Assert.assertTrue; + import java.io.IOException; import java.util.Collection; -import static org.junit.Assert.*; import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; import org.apache.crunch.Pipeline; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; import org.junit.Rule; import org.junit.Test; @@ -39,14 +38,30 @@ public class CSVFileSourceIT { @Test public void testVanillaCSV() throws Exception { - String[] expectedFileContents = { "1,2,3,4", "5,6,7,8", "9,10,11", "12,13,14" }; + final String[] expectedFileContents = { "1,2,3,4", "5,6,7,8", "9,10,11", "12,13,14" }; + + final String vanillaCSVFile = tmpDir.copyResourceFileName("vanilla.csv"); + final Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration()); + final PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(vanillaCSVFile))); + + final Collection<String> csvLinesList = csvLines.asCollection().getValue(); + + for (int i = 0; i < expectedFileContents.length; i++) { + assertTrue(csvLinesList.contains(expectedFileContents[i])); + } + } + + @Test + public void testVanillaCSVWithAdditionalActions() throws Exception { + final String[] expectedFileContents = { "1,2,3,4", "5,6,7,8", "9,10,11", "12,13,14" }; - String vanillaCSVFile = tmpDir.copyResourceFileName("vanilla.csv"); - Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration()); - PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(vanillaCSVFile))); - pipeline.run(); + final String vanillaCSVFile = tmpDir.copyResourceFileName("vanilla.csv"); + final Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration()); + final PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(vanillaCSVFile))); - Collection<String> csvLinesList = csvLines.asCollection().getValue(); + final PTable<String, Long> countTable = csvLines.count(); + final PCollection<String> csvLines2 = countTable.keys(); + final Collection<String> csvLinesList = csvLines2.asCollection().getValue(); for (int i = 0; i < expectedFileContents.length; i++) { assertTrue(csvLinesList.contains(expectedFileContents[i])); @@ -55,16 +70,15 @@ public class CSVFileSourceIT { @Test public void testCSVWithNewlines() throws Exception { - String[] expectedFileContents = { + final String[] expectedFileContents = { "\"Champion, Mac\",\"1234 Hoth St.\n\tApartment 101\n\tAtlanta, GA\n\t64086\",\"30\",\"M\",\"5/28/2010 12:00:00 AM\",\"Just some guy\"", "\"Champion, Mac\",\"5678 Tatooine Rd. Apt 5, Mobile, AL 36608\",\"30\",\"M\",\"Some other date\",\"short description\"" }; - String csvWithNewlines = tmpDir.copyResourceFileName("withNewlines.csv"); - Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration()); - PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(csvWithNewlines))); - pipeline.run(); + final String csvWithNewlines = tmpDir.copyResourceFileName("withNewlines.csv"); + final Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration()); + final PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(csvWithNewlines))); - Collection<String> csvLinesList = csvLines.asCollection().getValue(); + final Collection<String> csvLinesList = csvLines.asCollection().getValue(); for (int i = 0; i < expectedFileContents.length; i++) { assertTrue(csvLinesList.contains(expectedFileContents[i])); @@ -77,18 +91,17 @@ public class CSVFileSourceIT { */ @Test public void testCSVWithCustomQuoteAndNewlines() throws IOException { - String[] expectedFileContents = { + final String[] expectedFileContents = { "*Champion, Mac*,*1234 Hoth St.\n\tApartment 101\n\tAtlanta, GA\n\t64086*,*30*,*M*,*5/28/2010 12:00:00 AM*,*Just some guy*", "*Mac, Champion*,*5678 Tatooine Rd. Apt 5, Mobile, AL 36608*,*30*,*M*,*Some other date*,*short description*" }; - String csvWithNewlines = tmpDir.copyResourceFileName("customQuoteCharWithNewlines.csv"); - Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration()); - PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(csvWithNewlines), + final String csvWithNewlines = tmpDir.copyResourceFileName("customQuoteCharWithNewlines.csv"); + final Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration()); + final PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(csvWithNewlines), CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, '*', '*', CSVLineReader.DEFAULT_ESCAPE_CHARACTER)); - pipeline.run(); - Collection<String> csvLinesList = csvLines.asCollection().getValue(); + final Collection<String> csvLinesList = csvLines.asCollection().getValue(); for (int i = 0; i < expectedFileContents.length; i++) { assertTrue(csvLinesList.contains(expectedFileContents[i])); @@ -105,13 +118,12 @@ public class CSVFileSourceIT { public void testBrokenLineParsingInChinese() throws IOException { final String[] expectedChineseLines = { "æ¨å¥½æå«é©¬å ï¼æä»äºæå·´é©¬å·æ¥ï¼ææ¯è½¯ä»¶å·¥ç¨å¸ï¼æäºåå «å²", "ææä¸ä¸ªå® ç©ï¼å®æ¯ä¸ä¸ªå°ç«ï¼å®å å²ï¼å®å¾æ¼äº®", "æå欢åé¥ï¼âæè§å¾è¿ä¸ªé¥æå¥½\nï¼èç³\nï¼å å\nï¼å°æ·æ·\nï¼å¤é âï¼ä»ä»¬é½å¾å¥½ï¼æä¹å¾åæ¬¢å¥¶é ªä½å®æ¯ä¸å¥åº·ç", "ææ¯ç·çï¼æç头åå¾çï¼æç©¿èè²ç裤åï¼âæç©¿é»è²çãâè¡£æâ" }; - String chineseLines = tmpDir.copyResourceFileName("brokenChineseLines.csv"); + final String chineseLines = tmpDir.copyResourceFileName("brokenChineseLines.csv"); - Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration()); - PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(chineseLines), + final Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration()); + final PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(chineseLines), CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, 'â', 'â', 'ã')); - pipeline.run(); - Collection<String> csvLinesList = csvLines.asCollection().getValue(); + final Collection<String> csvLinesList = csvLines.asCollection().getValue(); for (int i = 0; i < expectedChineseLines.length; i++) { assertTrue(csvLinesList.contains(expectedChineseLines[i])); } http://git-wip-us.apache.org/repos/asf/crunch/blob/8c7cd4ac/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java index 5d5abe3..867b704 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -28,7 +30,6 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; @@ -43,12 +44,13 @@ import com.google.common.annotations.VisibleForTesting; * format deals with the fact that CSV files can potentially have multiple lines * within fields which should all be treated as one record. */ -public class CSVInputFormat extends FileInputFormat<LongWritable, Text> { +public class CSVInputFormat extends FileInputFormat<LongWritable, Text> implements Configurable { private int bufferSize; private String inputFileEncoding; private char openQuoteChar; private char closeQuoteChar; private char escapeChar; + private Configuration configuration; /** * This method is used by crunch to get an instance of {@link CSVRecordReader} @@ -57,10 +59,7 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text> { * the {@link InputSplit} that will be assigned to the record reader * @param context * the {@TaskAttemptContext} for the job - * @return an instance of {@link CSVRecordReader} created using - * {@link CSVInputFormat#getSeparatorChar()}, - * {@link CSVInputFormat#getQuoteChar()}, and - * {@link CSVInputFormat#getEscapeChar()}. + * @return an instance of {@link CSVRecordReader} created using configured separator, quote, and escape characters. */ @Override public RecordReader<LongWritable, Text> createRecordReader(final InputSplit split, final TaskAttemptContext context) { @@ -83,17 +82,18 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text> { final long splitSize = job.getConfiguration().getLong("csv.input.split.size", 67108864); final List<InputSplit> splits = new ArrayList<InputSplit>(); final Path[] paths = FileUtil.stat2Paths(listStatus(job).toArray(new FileStatus[0])); - FileSystem fileSystem = null; + FileSystem fileSystem = FileSystem.get(job.getConfiguration()); FSDataInputStream inputStream = null; try { for (final Path path : paths) { - fileSystem = path.getFileSystem(job.getConfiguration()); inputStream = fileSystem.open(path); splits.addAll(getSplitsForFile(splitSize, fileSystem.getFileStatus(path).getLen(), path, inputStream)); } return splits; } finally { - inputStream.close(); + if(inputStream != null) { + inputStream.close(); + } } } @@ -166,44 +166,51 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text> { return splitsList; } + @Override + public Configuration getConf() { + return configuration; + } + + @Override + public void setConf(final Configuration conf) { + configuration = conf; + configure(); + } + /** - * This method will read the configuration that is set in + * This method will read the configuration options that were set in * {@link CSVFileSource}'s private getBundle() method - * - * @param jobConf - * The {@code JobConf} instance from which the CSV configuration - * parameters will be read, if necessary. */ - public void configure(JobConf jobConf) { - String bufferValue = jobConf.get(CSVFileSource.CSV_BUFFER_SIZE); + public void configure() { + final String bufferValue = this.configuration.get(CSVFileSource.CSV_BUFFER_SIZE); if ("".equals(bufferValue)) { bufferSize = CSVLineReader.DEFAULT_BUFFER_SIZE; } else { bufferSize = Integer.parseInt(bufferValue); } - String inputFileEncodingValue = jobConf.get(CSVFileSource.CSV_INPUT_FILE_ENCODING); + final String inputFileEncodingValue = this.configuration.get(CSVFileSource.CSV_INPUT_FILE_ENCODING); if ("".equals(inputFileEncodingValue)) { inputFileEncoding = CSVLineReader.DEFAULT_INPUT_FILE_ENCODING; } else { inputFileEncoding = inputFileEncodingValue; } - String openQuoteCharValue = jobConf.get(CSVFileSource.CSV_OPEN_QUOTE_CHAR); + final String openQuoteCharValue = this.configuration.get(CSVFileSource.CSV_OPEN_QUOTE_CHAR); if ("".equals(openQuoteCharValue)) { openQuoteChar = CSVLineReader.DEFAULT_QUOTE_CHARACTER; } else { openQuoteChar = openQuoteCharValue.charAt(0); } - String closeQuoteCharValue = jobConf.get(CSVFileSource.CSV_CLOSE_QUOTE_CHAR); + final String closeQuoteCharValue = this.configuration.get(CSVFileSource.CSV_CLOSE_QUOTE_CHAR); if ("".equals(closeQuoteCharValue)) { closeQuoteChar = CSVLineReader.DEFAULT_QUOTE_CHARACTER; } else { closeQuoteChar = closeQuoteCharValue.charAt(0); } - String escapeCharValue = jobConf.get(CSVFileSource.CSV_ESCAPE_CHAR); + final String escapeCharValue = this.configuration.get(CSVFileSource.CSV_ESCAPE_CHAR); if ("".equals(escapeCharValue)) { escapeChar = CSVLineReader.DEFAULT_ESCAPE_CHARACTER; } else {
