Repository: crunch Updated Branches: refs/heads/apache-crunch-0.8 6e52d7fc8 -> 191e11464
CRUNCH-414: CSVLineReader-Threshold-Logic Signed-off-by: Micah Whitacre <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/191e1146 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/191e1146 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/191e1146 Branch: refs/heads/apache-crunch-0.8 Commit: 191e11464eb307198a7e507f49d4cd8d9099e608 Parents: 6e52d7f Author: Mac Champion <[email protected]> Authored: Fri Jun 27 16:00:48 2014 -0500 Committer: Micah Whitacre <[email protected]> Committed: Tue Jul 15 20:47:30 2014 -0500 ---------------------------------------------------------------------- .../crunch/io/text/csv/CSVFileSourceIT.java | 5 +- .../io/text/csv/CSVFileReaderFactory.java | 29 +++++---- .../crunch/io/text/csv/CSVFileSource.java | 67 ++++++++++++++------ .../crunch/io/text/csv/CSVInputFormat.java | 32 +++++++--- .../crunch/io/text/csv/CSVLineReader.java | 59 +++++++++++------ .../crunch/io/text/csv/CSVReadableData.java | 28 +++++--- .../crunch/io/text/csv/CSVRecordIterator.java | 21 +++--- .../crunch/io/text/csv/CSVRecordReader.java | 16 +++-- .../crunch/io/text/csv/CSVLineReaderTest.java | 28 ++++++-- 9 files changed, 197 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/191e1146/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java b/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java index ba8e193..b1c247f 100644 --- a/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java @@ -99,7 +99,7 @@ public class CSVFileSourceIT { final Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration()); final PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(csvWithNewlines), CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, '*', '*', - CSVLineReader.DEFAULT_ESCAPE_CHARACTER)); + CSVLineReader.DEFAULT_ESCAPE_CHARACTER, CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE)); final Collection<String> csvLinesList = csvLines.asCollection().getValue(); @@ -122,7 +122,8 @@ public class CSVFileSourceIT { final Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration()); final PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(chineseLines), - CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, 'â', 'â', 'ã')); + CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, 'â', 'â', 'ã', + CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE)); final Collection<String> csvLinesList = csvLines.asCollection().getValue(); for (int i = 0; i < expectedChineseLines.length; i++) { assertTrue(csvLinesList.contains(expectedChineseLines[i])); http://git-wip-us.apache.org/repos/asf/crunch/blob/191e1146/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java index c1c687e..8d28439 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java @@ -35,11 +35,12 @@ import com.google.common.collect.Iterators; */ public class CSVFileReaderFactory implements FileReaderFactory<String> { private static final Log LOG = LogFactory.getLog(CSVFileReaderFactory.class); - private int bufferSize; - private String inputFileEncoding; - private char openQuoteChar; - private char closeQuoteChar; - private char escapeChar; + private final int bufferSize; + private final String inputFileEncoding; + private final char openQuoteChar; + private final char closeQuoteChar; + private final char escapeChar; + private final int maximumRecordSize; /** * Creates a new {@code CSVFileReaderFactory} instance with default @@ -48,11 +49,11 @@ public class CSVFileReaderFactory implements FileReaderFactory<String> { CSVFileReaderFactory() { this(CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER, - CSVLineReader.DEFAULT_ESCAPE_CHARACTER); + CSVLineReader.DEFAULT_ESCAPE_CHARACTER, CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE); } /** - * Creates a new {@code CSVFileReaderFactory} instance with custon + * Creates a new {@code CSVFileReaderFactory} instance with custom * configuration * * @param bufferSize @@ -70,23 +71,29 @@ public class CSVFileReaderFactory implements FileReaderFactory<String> { * @param escapeChar * The character representing the escape character to be used in the * underlying {@code CSVLineReader} + * @param maximumRecordSize + * The maximum acceptable size of one CSV record. Beyond this limit, + * {@code CSVLineReader} will stop parsing and an exception will be + * thrown. */ CSVFileReaderFactory(final int bufferSize, final String inputFileEncoding, final char openQuoteChar, - final char closeQuoteChar, final char escapeChar) { + final char closeQuoteChar, final char escapeChar, final int maximumRecordSize) { this.bufferSize = bufferSize; this.inputFileEncoding = inputFileEncoding; this.openQuoteChar = openQuoteChar; this.closeQuoteChar = closeQuoteChar; this.escapeChar = escapeChar; + this.maximumRecordSize = maximumRecordSize; } @Override - public Iterator<String> read(FileSystem fs, Path path) { + public Iterator<String> read(final FileSystem fs, final Path path) { FSDataInputStream is; try { is = fs.open(path); - return new CSVRecordIterator(is, bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar); - } catch (IOException e) { + return new CSVRecordIterator(is, bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar, + maximumRecordSize); + } catch (final IOException e) { LOG.info("Could not read path: " + path, e); return Iterators.emptyIterator(); } http://git-wip-us.apache.org/repos/asf/crunch/blob/191e1146/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileSource.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileSource.java index 9021f78..d0a7631 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileSource.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileSource.java @@ -66,11 +66,27 @@ public class CSVFileSource extends FileSourceImpl<String> implements ReadableSou */ public static final String CSV_ESCAPE_CHAR = "csv.escapechar"; + /** + * The key used in the {@code CSVInputFormat}'s {@code FormatBundle} to set + * the underlying {@code CSVLineReader}'s maximum record size. If this is not + * set, INPUT_SPLIT_SIZE will be checked first, and if that is not set, 64mb + * will be assumed. + */ + public static final String MAXIMUM_RECORD_SIZE = "csv.maximumrecordsize"; + + /** + * The key used in the {@code CSVInputFormat}'s {@code FormatBundle} to set + * the underlying {@code CSVLineReader}'s input split size. If it is not set, + * 64mb will be assumed. + */ + public static final String INPUT_SPLIT_SIZE = "csv.inputsplitsize"; + private int bufferSize; private String inputFileEncoding; private char openQuoteChar; private char closeQuoteChar; private char escapeChar; + private int maximumRecordSize; /** * Create a new CSVFileSource instance @@ -78,10 +94,10 @@ public class CSVFileSource extends FileSourceImpl<String> implements ReadableSou * @param path * The {@code Path} to the input data */ - public CSVFileSource(List<Path> paths) { + public CSVFileSource(final List<Path> paths) { this(paths, CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER, - CSVLineReader.DEFAULT_ESCAPE_CHARACTER); + CSVLineReader.DEFAULT_ESCAPE_CHARACTER, CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE); } /** @@ -90,10 +106,10 @@ public class CSVFileSource extends FileSourceImpl<String> implements ReadableSou * @param path * The {@code Path} to the input data */ - public CSVFileSource(Path path) { + public CSVFileSource(final Path path) { this(path, CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER, - CSVLineReader.DEFAULT_ESCAPE_CHARACTER); + CSVLineReader.DEFAULT_ESCAPE_CHARACTER, CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE); } /** @@ -116,12 +132,16 @@ public class CSVFileSource extends FileSourceImpl<String> implements ReadableSou * @param escapeChar * The character representing the escape character to be used in the * underlying {@code CSVLineReader} + * @param maximumRecordSize + * The maximum acceptable size of one CSV record. Beyond this limit, + * {@code CSVLineReader} will stop parsing and an exception will be + * thrown. */ - public CSVFileSource(List<Path> paths, final int bufferSize, final String inputFileEncoding, - final char openQuoteChar, final char closeQuoteChar, final char escapeChar) { + public CSVFileSource(final List<Path> paths, final int bufferSize, final String inputFileEncoding, + final char openQuoteChar, final char closeQuoteChar, final char escapeChar, final int maximumRecordSize) { super(paths, Writables.strings(), getCSVBundle(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, - escapeChar)); - setPrivateVariables(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar); + escapeChar, maximumRecordSize)); + setPrivateVariables(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar, maximumRecordSize); } /** @@ -144,23 +164,28 @@ public class CSVFileSource extends FileSourceImpl<String> implements ReadableSou * @param escapeChar * The character representing the escape character to be used in the * underlying {@code CSVLineReader} + * @param maximumRecordSize + * The maximum acceptable size of one CSV record. Beyond this limit, + * {@code CSVLineReader} will stop parsing and an exception will be + * thrown. */ - public CSVFileSource(Path path, final int bufferSize, final String inputFileEncoding, final char openQuoteChar, - final char closeQuoteChar, final char escapeChar) { + public CSVFileSource(final Path path, final int bufferSize, final String inputFileEncoding, final char openQuoteChar, + final char closeQuoteChar, final char escapeChar, final int maximumRecordSize) { super(path, Writables.strings(), getCSVBundle(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, - escapeChar)); - setPrivateVariables(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar); + escapeChar, maximumRecordSize)); + setPrivateVariables(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar, maximumRecordSize); } @Override - public Iterable<String> read(Configuration conf) throws IOException { - return read(conf, - new CSVFileReaderFactory(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar)); + public Iterable<String> read(final Configuration conf) throws IOException { + return read(conf, new CSVFileReaderFactory(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, + escapeChar, maximumRecordSize)); } @Override public ReadableData<String> asReadable() { - return new CSVReadableData(paths, bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar); + return new CSVReadableData(paths, bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar, + maximumRecordSize); } @Override @@ -173,19 +198,20 @@ public class CSVFileSource extends FileSourceImpl<String> implements ReadableSou * by {@code CSVInputFormat} */ private static FormatBundle<CSVInputFormat> getCSVBundle(final int bufferSize, final String inputFileEncoding, - final char openQuoteChar, final char closeQuoteChar, final char escapeChar) { - FormatBundle<CSVInputFormat> bundle = FormatBundle.forInput(CSVInputFormat.class); + final char openQuoteChar, final char closeQuoteChar, final char escapeChar, final int maximumRecordSize) { + final FormatBundle<CSVInputFormat> bundle = FormatBundle.forInput(CSVInputFormat.class); bundle.set(RuntimeParameters.DISABLE_COMBINE_FILE, "true"); bundle.set(CSV_BUFFER_SIZE, String.valueOf(bufferSize)); bundle.set(CSV_INPUT_FILE_ENCODING, String.valueOf(inputFileEncoding)); bundle.set(CSV_OPEN_QUOTE_CHAR, String.valueOf(openQuoteChar)); bundle.set(CSV_CLOSE_QUOTE_CHAR, String.valueOf(closeQuoteChar)); bundle.set(CSV_ESCAPE_CHAR, String.valueOf(escapeChar)); + bundle.set(MAXIMUM_RECORD_SIZE, String.valueOf(maximumRecordSize)); return bundle; } private void setPrivateVariables(final int bufferSize, final String inputFileEncoding, final char openQuoteChar, - final char closeQuoteChar, final char escapeChar) { + final char closeQuoteChar, final char escapeChar, final int maximumRecordSize) { if (isSameCharacter(openQuoteChar, escapeChar)) { throw new IllegalArgumentException("The open quote (" + openQuoteChar + ") and escape (" + escapeChar + ") characters must be different!"); @@ -199,8 +225,9 @@ public class CSVFileSource extends FileSourceImpl<String> implements ReadableSou this.openQuoteChar = openQuoteChar; this.closeQuoteChar = closeQuoteChar; this.escapeChar = escapeChar; + this.maximumRecordSize = maximumRecordSize; } - + private boolean isSameCharacter(final char c1, final char c2) { return c2 == c1; } http://git-wip-us.apache.org/repos/asf/crunch/blob/191e1146/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java index 867b704..8403f29 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java @@ -50,6 +50,7 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text> implemen private char openQuoteChar; private char closeQuoteChar; private char escapeChar; + private int maximumRecordSize; private Configuration configuration; /** @@ -59,17 +60,20 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text> implemen * the {@link InputSplit} that will be assigned to the record reader * @param context * the {@TaskAttemptContext} for the job - * @return an instance of {@link CSVRecordReader} created using configured separator, quote, and escape characters. + * @return an instance of {@link CSVRecordReader} created using configured + * separator, quote, escape, and maximum record size. */ @Override public RecordReader<LongWritable, Text> createRecordReader(final InputSplit split, final TaskAttemptContext context) { return new CSVRecordReader(this.bufferSize, this.inputFileEncoding, this.openQuoteChar, this.closeQuoteChar, - this.escapeChar); + this.escapeChar, this.maximumRecordSize); } /** * A method used by crunch to calculate the splits for each file. This will - * split each CSV file at the end of a valid CSV record. + * split each CSV file at the end of a valid CSV record. The default split + * size is 64mb, but this can be reconfigured by setting the + * "csv.inputsplitsize" option in the job configuration. * * @param job * the {@link JobContext} for the current job. @@ -79,10 +83,10 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text> implemen */ @Override public List<InputSplit> getSplits(final JobContext job) throws IOException { - final long splitSize = job.getConfiguration().getLong("csv.input.split.size", 67108864); + final long splitSize = job.getConfiguration().getLong(CSVFileSource.INPUT_SPLIT_SIZE, 67108864); final List<InputSplit> splits = new ArrayList<InputSplit>(); final Path[] paths = FileUtil.stat2Paths(listStatus(job).toArray(new FileStatus[0])); - FileSystem fileSystem = FileSystem.get(job.getConfiguration()); + final FileSystem fileSystem = FileSystem.get(job.getConfiguration()); FSDataInputStream inputStream = null; try { for (final Path path : paths) { @@ -91,7 +95,7 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text> implemen } return splits; } finally { - if(inputStream != null) { + if (inputStream != null) { inputStream.close(); } } @@ -135,13 +139,13 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text> implemen // we need to create a new CSVLineReader around the stream. inputStream.seek(currentPosition); final CSVLineReader csvLineReader = new CSVLineReader(inputStream, this.bufferSize, this.inputFileEncoding, - this.openQuoteChar, this.closeQuoteChar, this.escapeChar); + this.openQuoteChar, this.closeQuoteChar, this.escapeChar, this.maximumRecordSize); // This line is potentially garbage because we most likely just sought to // the middle of a line. Read the rest of the line and leave it for the // previous split. Then reset the multi-line CSV record boolean, because // the partial line will have a very high chance of falsely triggering the - // class wide multi-line logic. + // class-wide multi-line logic. currentPosition += csvLineReader.readFileLine(new Text()); csvLineReader.resetMultiLine(); @@ -182,6 +186,12 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text> implemen * {@link CSVFileSource}'s private getBundle() method */ public void configure() { + + bufferSize = this.configuration.getInt(CSVFileSource.CSV_BUFFER_SIZE, -1); + if (bufferSize < 0) { + bufferSize = CSVLineReader.DEFAULT_BUFFER_SIZE; + } + final String bufferValue = this.configuration.get(CSVFileSource.CSV_BUFFER_SIZE); if ("".equals(bufferValue)) { bufferSize = CSVLineReader.DEFAULT_BUFFER_SIZE; @@ -216,5 +226,11 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text> implemen } else { escapeChar = escapeCharValue.charAt(0); } + + maximumRecordSize = this.configuration.getInt(CSVFileSource.MAXIMUM_RECORD_SIZE, -1); + if (maximumRecordSize < 0) { + maximumRecordSize = this.configuration.getInt(CSVFileSource.INPUT_SPLIT_SIZE, + CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/191e1146/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java index 83e2abe..79af67d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java @@ -29,6 +29,8 @@ import java.nio.charset.CharsetEncoder; import javax.annotation.ParametersAreNonnullByDefault; import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; @@ -38,6 +40,7 @@ import com.google.common.base.Preconditions; */ @ParametersAreNonnullByDefault public class CSVLineReader { + private static final Logger LOGGER = LoggerFactory.getLogger(CSVLineReader.class); // InputStream related variables /** @@ -74,6 +77,12 @@ public class CSVLineReader { * The default input file encoding to read with, UTF-8 */ public static final String DEFAULT_INPUT_FILE_ENCODING = "UTF-8"; + /** + * The default input maximum record size + */ + public static final int DEFAULT_MAXIMUM_RECORD_SIZE = 67108864; + + private final int maximumRecordSize; private final char openQuoteChar; private final char closeQuoteChar; private final char escape; @@ -96,7 +105,7 @@ public class CSVLineReader { */ public CSVLineReader(final InputStream inputStream) throws UnsupportedEncodingException { this(inputStream, DEFAULT_BUFFER_SIZE, DEFAULT_INPUT_FILE_ENCODING, DEFAULT_QUOTE_CHARACTER, - DEFAULT_QUOTE_CHARACTER, DEFAULT_ESCAPE_CHARACTER); + DEFAULT_QUOTE_CHARACTER, DEFAULT_ESCAPE_CHARACTER, DEFAULT_MAXIMUM_RECORD_SIZE); } /** @@ -118,10 +127,13 @@ public class CSVLineReader { * Used to specify a custom close quote character * @param escape * Used to specify a custom escape character + * @param maximumRecordSize + * The maximum acceptable size of one CSV record. Beyond this limit, + * parsing will stop and an exception will be thrown. * @throws UnsupportedEncodingException */ public CSVLineReader(final InputStream inputStream, final int bufferSize, final String inputFileEncoding, - final char openQuoteChar, final char closeQuoteChar, final char escapeChar) { + final char openQuoteChar, final char closeQuoteChar, final char escapeChar, final int maximumRecordSize) { Preconditions.checkNotNull(inputStream, "inputStream may not be null"); Preconditions.checkNotNull(inputFileEncoding, "inputFileEncoding may not be null"); if (bufferSize <= 0) { @@ -151,6 +163,7 @@ public class CSVLineReader { this.escape = escapeChar; this.inputFileEncoding = inputFileEncoding; this.charsetEncoder = Charset.forName(inputFileEncoding).newEncoder(); + this.maximumRecordSize = maximumRecordSize; } /** @@ -177,23 +190,30 @@ public class CSVLineReader { throw new RuntimeException("Cannot begin reading a CSV record while inside of a multi-line CSV record."); } - inputText.clear(); + final StringBuilder stringBuilder = new StringBuilder(); do { + // Read a line from the file and add it to the builder + inputText.clear(); totalBytesConsumed += readFileLine(inputText); - // a line has been read. We need to see if we're still in quotes and tack - // on a newline if so + stringBuilder.append(inputText.toString()); + if (currentlyInQuotes && !endOfFile) { - // Add one LF to mark the line return, otherwise any multi-line CSV - // record will all be on one line. - inputText.set(new StringBuilder().append(inputText.toString()).append('\n').toString()); + // If we end up in a multi-line record, we need append a newline + stringBuilder.append('\n'); + + // Do a check on the total bytes consumed to see if something has gone + // wrong. + if (totalBytesConsumed > maximumRecordSize || totalBytesConsumed > Integer.MAX_VALUE) { + final String record = stringBuilder.toString(); + LOGGER.error("Possibly malformed file encountered. First line of record: " + + record.substring(0, record.indexOf('\n'))); + throw new IOException("Possibly malformed file encountered. Check log statements for more information"); + } } - } while (currentlyInQuotes); + } while (currentlyInQuotes && !endOfFile); - if (totalBytesConsumed > Integer.MAX_VALUE) { - throw new IOException("Too many bytes consumed before newline: " + Integer.MAX_VALUE); - } - - input.set(inputText); + // Set the input to the multi-line record + input.set(stringBuilder.toString()); return (int) totalBytesConsumed; } @@ -222,8 +242,8 @@ public class CSVLineReader { } // This integer keeps track of the number of newline characters used to - // terminate the line being read. This - // could be 1, in the case of LF or CR, or 2, in the case of CRLF. + // terminate the line being read. This could be 1, in the case of LF or CR, + // or 2, in the case of CRLF. int newlineLength = 0; int inputTextLength = 0; long bytesConsumed = 0; @@ -245,8 +265,7 @@ public class CSVLineReader { newlineLength = 0; // Iterate through the buffer looking for newline characters while keeping - // track of if we're in a field - // and/or in quotes. + // track of if we're in a field and/or in quotes. for (; bufferPosition < bufferLength; ++bufferPosition) { bytesConsumed += calculateCharacterByteLength(buffer[bufferPosition]); if (buffer[bufferPosition] == this.escape) { @@ -271,8 +290,8 @@ public class CSVLineReader { if (lastCharWasCR && buffer[bufferPosition] == LF) { lastCharWasCR = false; // Check for LF (in case of CRLF line endings) and increment the - // counter, skip it by moving the - // buffer position, then record the length of the LF. + // counter, skip it by moving the buffer position, then record the + // length of the LF. ++newlineLength; ++bufferPosition; bytesConsumed += calculateCharacterByteLength(buffer[bufferPosition]); http://git-wip-us.apache.org/repos/asf/crunch/blob/191e1146/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVReadableData.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVReadableData.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVReadableData.java index 63e74d9..b266a08 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVReadableData.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVReadableData.java @@ -25,11 +25,12 @@ import org.apache.hadoop.fs.Path; public class CSVReadableData extends ReadableDataImpl<String> { - private int bufferSize; - private String inputFileEncoding; - private char openQuoteChar; - private char closeQuoteChar; - private char escapeChar; + private final int bufferSize; + private final String inputFileEncoding; + private final char openQuoteChar; + private final char closeQuoteChar; + private final char escapeChar; + private final int maximumRecordSize; /** * Creates an instance of {@code CSVReadableData} with default configuration @@ -37,14 +38,15 @@ public class CSVReadableData extends ReadableDataImpl<String> { * @param paths * The paths of the files to be read */ - protected CSVReadableData(List<Path> paths) { + protected CSVReadableData(final List<Path> paths) { this(paths, CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER, - CSVLineReader.DEFAULT_ESCAPE_CHARACTER); + CSVLineReader.DEFAULT_ESCAPE_CHARACTER, CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE); } /** * Creates an instance of {@code CSVReadableData} with specified configuration + * * @param paths * a list of input file paths * @param bufferSize @@ -57,19 +59,25 @@ public class CSVReadableData extends ReadableDataImpl<String> { * the character to use to close quote blocks * @param escape * the character to use for escaping control characters and quotes + * @param maximumRecordSize + * The maximum acceptable size of one CSV record. Beyond this limit, + * {@code CSVLineReader} will stop parsing and an exception will be + * thrown. */ - protected CSVReadableData(List<Path> paths, final int bufferSize, final String inputFileEncoding, - final char openQuoteChar, final char closeQuoteChar, final char escapeChar) { + protected CSVReadableData(final List<Path> paths, final int bufferSize, final String inputFileEncoding, + final char openQuoteChar, final char closeQuoteChar, final char escapeChar, final int maximumRecordSize) { super(paths); this.bufferSize = bufferSize; this.inputFileEncoding = inputFileEncoding; this.openQuoteChar = openQuoteChar; this.closeQuoteChar = closeQuoteChar; this.escapeChar = escapeChar; + this.maximumRecordSize = maximumRecordSize; } @Override protected FileReaderFactory<String> getFileReaderFactory() { - return new CSVFileReaderFactory(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar); + return new CSVFileReaderFactory(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar, + maximumRecordSize); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/191e1146/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordIterator.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordIterator.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordIterator.java index df5e39c..447a6d0 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordIterator.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordIterator.java @@ -31,7 +31,7 @@ import com.google.common.io.Closeables; * An {@code Iterator} for an internally created {@code CSVLineReader} */ public class CSVRecordIterator implements Iterator<String>, Closeable { - private CSVLineReader csvLineReader; + private final CSVLineReader csvLineReader; private InputStream inputStream; private String currentLine; @@ -45,7 +45,7 @@ public class CSVRecordIterator implements Iterator<String>, Closeable { public CSVRecordIterator(final InputStream inputStream) throws UnsupportedEncodingException { this(inputStream, CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER, - CSVLineReader.DEFAULT_ESCAPE_CHARACTER); + CSVLineReader.DEFAULT_ESCAPE_CHARACTER, CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE); } /** @@ -63,12 +63,17 @@ public class CSVRecordIterator implements Iterator<String>, Closeable { * the character to use to close quote blocks * @param escape * the character to use for escaping control characters and quotes + * @param maximumRecordSize + * The maximum acceptable size of one CSV record. Beyond this limit, + * {@code CSVLineReader} will stop parsing and an exception will be + * thrown. * @throws UnsupportedEncodingException */ public CSVRecordIterator(final InputStream inputStream, final int bufferSize, final String inputFileEncoding, - final char openQuoteChar, final char closeQuoteChar, final char escapeChar) throws UnsupportedEncodingException { + final char openQuoteChar, final char closeQuoteChar, final char escapeChar, final int maximumRecordSize) + throws UnsupportedEncodingException { csvLineReader = new CSVLineReader(inputStream, bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, - escapeChar); + escapeChar, maximumRecordSize); this.inputStream = inputStream; incrementValue(); } @@ -84,7 +89,7 @@ public class CSVRecordIterator implements Iterator<String>, Closeable { @Override public String next() { - String result = currentLine; + final String result = currentLine; incrementValue(); return result; } @@ -95,13 +100,13 @@ public class CSVRecordIterator implements Iterator<String>, Closeable { } private void incrementValue() { - Text tempText = new Text(); + final Text tempText = new Text(); try { csvLineReader.readCSVLine(tempText); - } catch (IOException e) { + } catch (final IOException e) { throw new RuntimeException("A problem occurred accessing the underlying CSV file stream.", e); } - String tempTextAsString = tempText.toString(); + final String tempTextAsString = tempText.toString(); if ("".equals(tempTextAsString)) { currentLine = null; } else { http://git-wip-us.apache.org/repos/asf/crunch/blob/191e1146/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java index d04da98..192a018 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java @@ -51,6 +51,7 @@ public class CSVRecordReader extends RecordReader<LongWritable, Text> { private final char escape; private final String inputFileEncoding; private final int fileStreamBufferSize; + private final int maximumRecordSize; private int totalRecordsRead = 0; @@ -60,7 +61,7 @@ public class CSVRecordReader extends RecordReader<LongWritable, Text> { public CSVRecordReader() { this(CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER, - CSVLineReader.DEFAULT_ESCAPE_CHARACTER); + CSVLineReader.DEFAULT_ESCAPE_CHARACTER, CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE); } /** @@ -77,9 +78,13 @@ public class CSVRecordReader extends RecordReader<LongWritable, Text> { * the character to use to close quote blocks * @param escape * the character to use for escaping control characters and quotes + * @param maximumRecordSize + * The maximum acceptable size of one CSV record. Beyond this limit, + * {@code CSVLineReader} will stop parsing and an exception will be + * thrown. */ public CSVRecordReader(final int bufferSize, final String inputFileEncoding, final char openQuote, - final char closeQuote, final char escape) { + final char closeQuote, final char escape, final int maximumRecordSize) { Preconditions.checkNotNull(openQuote, "quote cannot be null."); Preconditions.checkNotNull(closeQuote, "quote cannot be null."); Preconditions.checkNotNull(escape, "escape cannot be null."); @@ -89,6 +94,7 @@ public class CSVRecordReader extends RecordReader<LongWritable, Text> { this.openQuote = openQuote; this.closeQuote = closeQuote; this.escape = escape; + this.maximumRecordSize = maximumRecordSize; } /** @@ -116,12 +122,12 @@ public class CSVRecordReader extends RecordReader<LongWritable, Text> { LOGGER.info("Split starts at: " + start); LOGGER.info("Split will end at: " + end); - // Open the file, seek to the start of the split, then wrap it in a - // CSVLineReader + // Open the file, seek to the start of the split + // then wrap it in a CSVLineReader fileIn = file.getFileSystem(job).open(file); fileIn.seek(start); csvLineReader = new CSVLineReader(fileIn, this.fileStreamBufferSize, inputFileEncoding, this.openQuote, - this.closeQuote, this.escape); + this.closeQuote, this.escape, this.maximumRecordSize); } /** http://git-wip-us.apache.org/repos/asf/crunch/blob/191e1146/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVLineReaderTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVLineReaderTest.java b/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVLineReaderTest.java index 55f2f1f..2c51c41 100644 --- a/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVLineReaderTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVLineReaderTest.java @@ -17,7 +17,7 @@ */ package org.apache.crunch.io.text.csv; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; import java.io.File; import java.io.FileInputStream; @@ -36,7 +36,7 @@ public class CSVLineReaderTest { @Test public void testVariousUTF8Characters() throws IOException { final String variousCharacters = "â¬AbبÏи×¥£â¬Â¢â¡â¢â£â¤â¥â¦Â§â§â¨â©âªâ«ââ®æ¼¢Ã©óÃÃä"; - String utf8Junk = tmpDir.copyResourceFileName("UTF8.csv"); + final String utf8Junk = tmpDir.copyResourceFileName("UTF8.csv"); FileInputStream fileInputStream = null; try { @@ -60,12 +60,32 @@ public class CSVLineReaderTest { public void testBrokenLineParsingInChinese() throws IOException { final String[] expectedChineseLines = { "æ¨å¥½æå«é©¬å ï¼æä»äºæå·´é©¬å·æ¥ï¼ææ¯è½¯ä»¶å·¥ç¨å¸ï¼æäºåå «å²", "ææä¸ä¸ªå® ç©ï¼å®æ¯ä¸ä¸ªå°ç«ï¼å®å å²ï¼å®å¾æ¼äº®", "æå欢åé¥ï¼âæè§å¾è¿ä¸ªé¥æå¥½\nï¼èç³\nï¼å å\nï¼å°æ·æ·\nï¼å¤é âï¼ä»ä»¬é½å¾å¥½ï¼æä¹å¾åæ¬¢å¥¶é ªä½å®æ¯ä¸å¥åº·ç", "ææ¯ç·çï¼æç头åå¾çï¼æç©¿èè²ç裤åï¼âæç©¿é»è²çãâè¡£æâ" }; - String chineseLines = tmpDir.copyResourceFileName("brokenChineseLines.csv"); + final String chineseLines = tmpDir.copyResourceFileName("brokenChineseLines.csv"); FileInputStream fileInputStream = null; try { fileInputStream = new FileInputStream(new File(chineseLines)); final CSVLineReader csvLineReader = new CSVLineReader(fileInputStream, CSVLineReader.DEFAULT_BUFFER_SIZE, - CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, 'â', 'â', 'ã'); + CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, 'â', 'â', 'ã', CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE); + for (int i = 0; i < expectedChineseLines.length; i++) { + final Text readText = new Text(); + csvLineReader.readCSVLine(readText); + assertEquals(expectedChineseLines[i], readText.toString()); + } + } finally { + fileInputStream.close(); + } + } + + @Test(expected = IOException.class) + public void testMalformedBrokenLineParsingInChinese() throws IOException { + final String[] expectedChineseLines = { "æ¨å¥½æå«é©¬å ï¼æä»äºæå·´é©¬å·æ¥ï¼ææ¯è½¯ä»¶å·¥ç¨å¸ï¼æäºåå «å²", "ææä¸ä¸ªå® ç©ï¼å®æ¯ä¸ä¸ªå°ç«ï¼å®å å²ï¼å®å¾æ¼äº®", + "æå欢åé¥ï¼âæè§å¾è¿ä¸ªé¥æå¥½\nï¼èç³\nï¼å å\nï¼å°æ·æ·\nï¼å¤é âï¼ä»ä»¬é½å¾å¥½ï¼æä¹å¾åæ¬¢å¥¶é ªä½å®æ¯ä¸å¥åº·ç", "ææ¯ç·çï¼æç头åå¾çï¼æç©¿èè²ç裤åï¼âæç©¿é»è²çãâè¡£æâ" }; + final String chineseLines = tmpDir.copyResourceFileName("brokenChineseLines.csv"); + FileInputStream fileInputStream = null; + try { + fileInputStream = new FileInputStream(new File(chineseLines)); + final CSVLineReader csvLineReader = new CSVLineReader(fileInputStream, CSVLineReader.DEFAULT_BUFFER_SIZE, + CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, 'â', 'â', 'ã', 5); for (int i = 0; i < expectedChineseLines.length; i++) { final Text readText = new Text(); csvLineReader.readCSVLine(readText);
