Repository: crunch Updated Branches: refs/heads/master abaa203b6 -> f8c98a6c6
CRUNCH-565_CSVInputFormat-Configuration-Defensiveness Signed-off-by: Micah Whitacre <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f8c98a6c Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f8c98a6c Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f8c98a6c Branch: refs/heads/master Commit: f8c98a6c69dcd1e971f2b35b94a2bb99a4233b4e Parents: abaa203 Author: âMac <[email protected]> Authored: Thu Oct 1 19:30:24 2015 -0500 Committer: Micah Whitacre <[email protected]> Committed: Mon Oct 5 13:59:32 2015 -0500 ---------------------------------------------------------------------- .../crunch/io/text/csv/CSVInputFormat.java | 122 +++++++------------ .../crunch/io/text/csv/CSVInputFormatTest.java | 89 ++++++++++++++ 2 files changed, 133 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/f8c98a6c/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java index 8403f29..2894686 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -45,17 +45,23 @@ import com.google.common.annotations.VisibleForTesting; * within fields which should all be treated as one record. */ public class CSVInputFormat extends FileInputFormat<LongWritable, Text> implements Configurable { - private int bufferSize; - private String inputFileEncoding; - private char openQuoteChar; - private char closeQuoteChar; - private char escapeChar; - private int maximumRecordSize; + @VisibleForTesting + protected int bufferSize; + @VisibleForTesting + protected String inputFileEncoding; + @VisibleForTesting + protected char openQuoteChar; + @VisibleForTesting + protected char closeQuoteChar; + @VisibleForTesting + protected char escapeChar; + @VisibleForTesting + protected int maximumRecordSize; private Configuration configuration; /** * This method is used by crunch to get an instance of {@link CSVRecordReader} - * + * * @param split * the {@link InputSplit} that will be assigned to the record reader * @param context @@ -74,7 +80,7 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text> implemen * split each CSV file at the end of a valid CSV record. The default split * size is 64mb, but this can be reconfigured by setting the * "csv.inputsplitsize" option in the job configuration. - * + * * @param job * the {@link JobContext} for the current job. * @return a List containing all of the calculated splits for a single file. @@ -101,6 +107,30 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text> implemen } } + @Override + public Configuration getConf() { + return configuration; + } + + @Override + public void setConf(final Configuration conf) { + configuration = conf; + configure(); + } + + /** + * This method will read the configuration options that were set in + * {@link CSVFileSource}'s private getBundle() method + */ + public void configure() { + inputFileEncoding = this.configuration.get(CSVFileSource.CSV_INPUT_FILE_ENCODING, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING); + maximumRecordSize = this.configuration.getInt(CSVFileSource.MAXIMUM_RECORD_SIZE, this.configuration.getInt(CSVFileSource.INPUT_SPLIT_SIZE, CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE)); + closeQuoteChar = this.configuration.get(CSVFileSource.CSV_CLOSE_QUOTE_CHAR, String.valueOf(CSVLineReader.DEFAULT_QUOTE_CHARACTER)).charAt(0); + openQuoteChar = this.configuration.get(CSVFileSource.CSV_OPEN_QUOTE_CHAR, String.valueOf(CSVLineReader.DEFAULT_QUOTE_CHARACTER)).charAt(0); + escapeChar = this.configuration.get(CSVFileSource.CSV_ESCAPE_CHAR, String.valueOf(CSVLineReader.DEFAULT_ESCAPE_CHARACTER)).charAt(0); + bufferSize = this.configuration.getInt(CSVFileSource.CSV_BUFFER_SIZE, CSVLineReader.DEFAULT_BUFFER_SIZE); + } + /** * In summary, this method will start at the beginning of the file, seek to * the position corresponding to the desired split size, seek to the end of @@ -111,7 +141,7 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text> implemen */ @VisibleForTesting protected List<FileSplit> getSplitsForFile(final long splitSize, final long fileSize, final Path fileName, - final FSDataInputStream inputStream) throws IOException { + final FSDataInputStream inputStream) throws IOException { final List<FileSplit> splitsList = new ArrayList<FileSplit>(); long splitStart; @@ -130,7 +160,7 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text> implemen if (currentPosition >= fileSize) { currentPosition = fileSize; endOfFile = true; - final FileSplit fileSplit = new FileSplit(fileName, splitStart, currentPosition - splitStart, new String[] {}); + final FileSplit fileSplit = new FileSplit(fileName, splitStart, currentPosition - splitStart, new String[]{}); splitsList.add(fileSplit); break; } @@ -164,73 +194,9 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text> implemen // We're out of the multi-line CSV record, so it's safe to end the // previous split. - splitsList.add(new FileSplit(fileName, splitStart, currentPosition - splitStart, new String[] {})); + splitsList.add(new FileSplit(fileName, splitStart, currentPosition - splitStart, new String[]{})); } return splitsList; } - - @Override - public Configuration getConf() { - return configuration; - } - - @Override - public void setConf(final Configuration conf) { - configuration = conf; - configure(); - } - - /** - * This method will read the configuration options that were set in - * {@link CSVFileSource}'s private getBundle() method - */ - public void configure() { - - bufferSize = this.configuration.getInt(CSVFileSource.CSV_BUFFER_SIZE, -1); - if (bufferSize < 0) { - bufferSize = CSVLineReader.DEFAULT_BUFFER_SIZE; - } - - final String bufferValue = this.configuration.get(CSVFileSource.CSV_BUFFER_SIZE); - if ("".equals(bufferValue)) { - bufferSize = CSVLineReader.DEFAULT_BUFFER_SIZE; - } else { - bufferSize = Integer.parseInt(bufferValue); - } - - final String inputFileEncodingValue = this.configuration.get(CSVFileSource.CSV_INPUT_FILE_ENCODING); - if ("".equals(inputFileEncodingValue)) { - inputFileEncoding = CSVLineReader.DEFAULT_INPUT_FILE_ENCODING; - } else { - inputFileEncoding = inputFileEncodingValue; - } - - final String openQuoteCharValue = this.configuration.get(CSVFileSource.CSV_OPEN_QUOTE_CHAR); - if ("".equals(openQuoteCharValue)) { - openQuoteChar = CSVLineReader.DEFAULT_QUOTE_CHARACTER; - } else { - openQuoteChar = openQuoteCharValue.charAt(0); - } - - final String closeQuoteCharValue = this.configuration.get(CSVFileSource.CSV_CLOSE_QUOTE_CHAR); - if ("".equals(closeQuoteCharValue)) { - closeQuoteChar = CSVLineReader.DEFAULT_QUOTE_CHARACTER; - } else { - closeQuoteChar = closeQuoteCharValue.charAt(0); - } - - final String escapeCharValue = this.configuration.get(CSVFileSource.CSV_ESCAPE_CHAR); - if ("".equals(escapeCharValue)) { - escapeChar = CSVLineReader.DEFAULT_ESCAPE_CHARACTER; - } else { - escapeChar = escapeCharValue.charAt(0); - } - - maximumRecordSize = this.configuration.getInt(CSVFileSource.MAXIMUM_RECORD_SIZE, -1); - if (maximumRecordSize < 0) { - maximumRecordSize = this.configuration.getInt(CSVFileSource.INPUT_SPLIT_SIZE, - CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE); - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/f8c98a6c/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVInputFormatTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVInputFormatTest.java b/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVInputFormatTest.java new file mode 100644 index 0000000..f9d60e9 --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVInputFormatTest.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.text.csv; + +import org.apache.hadoop.conf.Configuration; +import org.junit.After; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class CSVInputFormatTest { + + @Rule + public final ExpectedException exception = ExpectedException.none(); + private final Configuration configuration = new Configuration(); + private final CSVInputFormat csvInputFormat = new CSVInputFormat(); + + @After + public void clearConfiguration() { + configuration.clear(); + } + + @Test + public void testDefaultConfiguration() { + csvInputFormat.setConf(configuration); + csvInputFormat.configure(); + + Assert.assertEquals(CSVLineReader.DEFAULT_BUFFER_SIZE, csvInputFormat.bufferSize); + Assert.assertEquals(CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, csvInputFormat.inputFileEncoding); + Assert.assertEquals(CSVLineReader.DEFAULT_QUOTE_CHARACTER, csvInputFormat.openQuoteChar); + Assert.assertEquals(CSVLineReader.DEFAULT_QUOTE_CHARACTER, csvInputFormat.closeQuoteChar); + Assert.assertEquals(CSVLineReader.DEFAULT_ESCAPE_CHARACTER, csvInputFormat.escapeChar); + Assert.assertEquals(CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE, csvInputFormat.maximumRecordSize); + } + + @Test + public void testReasonableConfiguration() { + configuration.set(CSVFileSource.CSV_INPUT_FILE_ENCODING, "UTF8"); + configuration.set(CSVFileSource.CSV_CLOSE_QUOTE_CHAR, "C"); + configuration.set(CSVFileSource.CSV_OPEN_QUOTE_CHAR, "O"); + configuration.set(CSVFileSource.CSV_ESCAPE_CHAR, "E"); + configuration.setInt(CSVFileSource.CSV_BUFFER_SIZE, 1000); + configuration.setInt(CSVFileSource.MAXIMUM_RECORD_SIZE, 10001); + csvInputFormat.setConf(configuration); + csvInputFormat.configure(); + + Assert.assertEquals(1000, csvInputFormat.bufferSize); + Assert.assertEquals("UTF8", csvInputFormat.inputFileEncoding); + Assert.assertEquals('O', csvInputFormat.openQuoteChar); + Assert.assertEquals('C', csvInputFormat.closeQuoteChar); + Assert.assertEquals('E', csvInputFormat.escapeChar); + Assert.assertEquals(10001, csvInputFormat.maximumRecordSize); + } + + @Test + public void testMaximumRecordSizeFallbackConfiguration() { + configuration.set(CSVFileSource.CSV_INPUT_FILE_ENCODING, "UTF8"); + configuration.set(CSVFileSource.CSV_CLOSE_QUOTE_CHAR, "C"); + configuration.set(CSVFileSource.CSV_OPEN_QUOTE_CHAR, "O"); + configuration.set(CSVFileSource.CSV_ESCAPE_CHAR, "E"); + configuration.setInt(CSVFileSource.CSV_BUFFER_SIZE, 1000); + configuration.setInt(CSVFileSource.INPUT_SPLIT_SIZE, 10002); + csvInputFormat.setConf(configuration); + csvInputFormat.configure(); + + Assert.assertEquals(1000, csvInputFormat.bufferSize); + Assert.assertEquals("UTF8", csvInputFormat.inputFileEncoding); + Assert.assertEquals('O', csvInputFormat.openQuoteChar); + Assert.assertEquals('C', csvInputFormat.closeQuoteChar); + Assert.assertEquals('E', csvInputFormat.escapeChar); + Assert.assertEquals(10002, csvInputFormat.maximumRecordSize); + } +} \ No newline at end of file
