Repository: phoenix Updated Branches: refs/heads/master 22e27e5ed -> 9b285dae0
PHOENIX-1440 Support csv quoting in bulk loader Add support for quote and escape chars in map reduce bulk import. Original patch by Radu Marias, with very minor cleanup by Gabriel Reid. Signed-off-by: Gabriel Reid <gr...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9b285dae Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9b285dae Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9b285dae Branch: refs/heads/master Commit: 9b285dae032d1e61bb6a804b2357151d9d1905c5 Parents: 22e27e5 Author: Radu Marias <radumar...@gmail.com> Authored: Fri Nov 7 20:12:48 2014 +0200 Committer: Gabriel Reid <gr...@apache.org> Committed: Thu Nov 13 20:07:07 2014 +0100 ---------------------------------------------------------------------- .../phoenix/mapreduce/CsvBulkImportUtil.java | 10 ++++-- .../phoenix/mapreduce/CsvBulkLoadTool.java | 31 ++++++++++++++--- .../phoenix/mapreduce/CsvToKeyValueMapper.java | 35 ++++++++++++-------- .../mapreduce/CsvBulkImportUtilTest.java | 15 +++++---- .../mapreduce/CsvToKeyValueMapperTest.java | 30 ++++++++++++----- 5 files changed, 85 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/9b285dae/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java index c27e95f..e62cbb8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java @@ -17,12 +17,12 @@ */ package org.apache.phoenix.mapreduce; +import java.util.List; + import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.phoenix.util.ColumnInfo; -import java.util.List; - /** * Collection of utility methods for setting up bulk import jobs. */ @@ -34,11 +34,13 @@ public class CsvBulkImportUtil { * @param conf job configuration to be set up * @param tableName name of the table to be imported to, can include a schema name * @param fieldDelimiter field delimiter character for the CSV input + * @param quoteChar quote character for the CSV input + * @param escapeChar escape character for the CSV input * @param arrayDelimiter array delimiter character, can be null * @param columnInfoList list of columns to be imported * @param ignoreInvalidRows flag to ignore invalid input rows */ - public static void initCsvImportJob(Configuration conf, String tableName, char fieldDelimiter, + public static void initCsvImportJob(Configuration conf, String tableName, char fieldDelimiter, char quoteChar, char escapeChar, String arrayDelimiter, List<ColumnInfo> columnInfoList, boolean ignoreInvalidRows) { Preconditions.checkNotNull(tableName); @@ -46,6 +48,8 @@ public class CsvBulkImportUtil { Preconditions.checkArgument(!columnInfoList.isEmpty(), "Column info list is empty"); conf.set(CsvToKeyValueMapper.TABLE_NAME_CONFKEY, tableName); conf.set(CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY, String.valueOf(fieldDelimiter)); + conf.set(CsvToKeyValueMapper.QUOTE_CHAR_CONFKEY, String.valueOf(quoteChar)); + conf.set(CsvToKeyValueMapper.ESCAPE_CHAR_CONFKEY, String.valueOf(escapeChar)); if (arrayDelimiter != null) { conf.set(CsvToKeyValueMapper.ARRAY_DELIMITER_CONFKEY, arrayDelimiter); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9b285dae/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java index e29405f..54e3f2c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java @@ -29,6 +29,9 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.HelpFormatter; @@ -66,10 +69,6 @@ import org.apache.phoenix.util.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Splitter; -import com.google.common.collect.Lists; - /** * Base tool for running MapReduce-based ingests of data. */ @@ -85,6 +84,8 @@ public class CsvBulkLoadTool extends Configured implements Tool { static final Option TABLE_NAME_OPT = new Option("t", "table", true, "Phoenix table name (mandatory)"); static final Option INDEX_TABLE_NAME_OPT = new Option("it", "index-table", true, "Phoenix index table name when just loading this particualar index table"); static final Option DELIMITER_OPT = new Option("d", "delimiter", true, "Input delimiter, defaults to comma"); + static final Option QUOTE_OPT = new Option("q", "quote", true, "Supply a custom phrase delimiter, defaults to double quote character"); + static final Option ESCAPE_OPT = new Option("e", "escape", true, "Supply a custom escape character, default is a backslash"); static final Option ARRAY_DELIMITER_OPT = new Option("a", "array-delimiter", true, "Array element delimiter (optional)"); static final Option IMPORT_COLUMNS_OPT = new Option("c", "import-columns", true, "Comma-separated list of columns to be imported"); static final Option IGNORE_ERRORS_OPT = new Option("g", "ignore-errors", false, "Ignore input errors"); @@ -144,6 +145,8 @@ public class CsvBulkLoadTool extends Configured implements Tool { options.addOption(OUTPUT_PATH_OPT); options.addOption(SCHEMA_NAME_OPT); options.addOption(DELIMITER_OPT); + options.addOption(QUOTE_OPT); + options.addOption(ESCAPE_OPT); options.addOption(ARRAY_DELIMITER_OPT); options.addOption(IMPORT_COLUMNS_OPT); options.addOption(IGNORE_ERRORS_OPT); @@ -323,6 +326,24 @@ public class CsvBulkLoadTool extends Configured implements Tool { delimiterChar = delimString.charAt(0); } + char quoteChar = '"'; + if (cmdLine.hasOption(QUOTE_OPT.getOpt())) { + String quoteString = cmdLine.getOptionValue(QUOTE_OPT.getOpt()); + if (quoteString.length() != 1) { + throw new IllegalArgumentException("Illegal quote character: " + quoteString); + } + quoteChar = quoteString.charAt(0); + } + + char escapeChar = '\\'; + if (cmdLine.hasOption(ESCAPE_OPT.getOpt())) { + String escapeString = cmdLine.getOptionValue(ESCAPE_OPT.getOpt()); + if (escapeString.length() != 1) { + throw new IllegalArgumentException("Illegal escape character: " + escapeString); + } + escapeChar = escapeString.charAt(0); + } + if (cmdLine.hasOption(ZK_QUORUM_OPT.getOpt())) { String zkQuorum = cmdLine.getOptionValue(ZK_QUORUM_OPT.getOpt()); LOG.info("Configuring ZK quorum to {}", zkQuorum); @@ -335,6 +356,8 @@ public class CsvBulkLoadTool extends Configured implements Tool { cmdLine.getOptionValue(SCHEMA_NAME_OPT.getOpt()), cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt())), delimiterChar, + quoteChar, + escapeChar, cmdLine.getOptionValue(ARRAY_DELIMITER_OPT.getOpt()), importColumns, cmdLine.hasOption(IGNORE_ERRORS_OPT.getOpt())); http://git-wip-us.apache.org/repos/asf/phoenix/blob/9b285dae/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java index eb701c5..ead5067 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java @@ -28,6 +28,14 @@ import java.util.Properties; import javax.annotation.Nullable; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; @@ -50,15 +58,6 @@ import org.apache.phoenix.util.csv.CsvUpsertExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - /** * MapReduce mapper that converts CSV input lines into KeyValues that can be written to HFiles. * <p/> @@ -79,6 +78,12 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes /** Configuration key for the field delimiter for input csv records */ public static final String FIELD_DELIMITER_CONFKEY = "phoenix.mapreduce.import.fielddelimiter"; + /** Configuration key for the quote char for input csv records */ + public static final String QUOTE_CHAR_CONFKEY = "phoenix.mapreduce.import.quotechar"; + + /** Configuration key for the escape char for input csv records */ + public static final String ESCAPE_CHAR_CONFKEY = "phoenix.mapreduce.import.escapechar"; + /** Configuration key for the array element delimiter for input arrays */ public static final String ARRAY_DELIMITER_CONFKEY = "phoenix.mapreduce.import.arraydelimiter"; @@ -127,7 +132,8 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes upsertListener = new MapperUpsertListener( context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true)); csvUpsertExecutor = buildUpsertExecutor(conf); - csvLineParser = new CsvLineParser(conf.get(FIELD_DELIMITER_CONFKEY).charAt(0)); + csvLineParser = new CsvLineParser(conf.get(FIELD_DELIMITER_CONFKEY).charAt(0), conf.get(QUOTE_CHAR_CONFKEY).charAt(0), + conf.get(ESCAPE_CHAR_CONFKEY).charAt(0)); preUpdateProcessor = loadPreUpsertProcessor(conf); if(!conf.get(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY, "").isEmpty()){ @@ -298,11 +304,14 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes */ @VisibleForTesting static class CsvLineParser { - private final CSVFormat csvFormat; - CsvLineParser(char fieldDelimiter) { - this.csvFormat = CSVFormat.newFormat(fieldDelimiter); + CsvLineParser(char fieldDelimiter, char quote, char escape) { + this.csvFormat = CSVFormat.DEFAULT + .withIgnoreEmptyLines(true) + .withDelimiter(fieldDelimiter) + .withEscape(escape) + .withQuote(quote); } public CSVRecord parse(String input) throws IOException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/9b285dae/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java index 28b2b9a..6bf3e47 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java @@ -17,18 +17,17 @@ */ package org.apache.phoenix.mapreduce; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -import java.util.List; - +import com.google.common.collect.ImmutableList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.phoenix.schema.PDataType; import org.apache.phoenix.util.ColumnInfo; import org.junit.Test; -import com.google.common.collect.ImmutableList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; public class CsvBulkImportUtilTest { @@ -38,12 +37,14 @@ public class CsvBulkImportUtilTest { String tableName = "SCHEMANAME.TABLENAME"; char delimiter = '!'; + char quote = '"'; + char escape = '\\'; List<ColumnInfo> columnInfoList = ImmutableList.of( new ColumnInfo("MYCOL", PDataType.INTEGER.getSqlType())); CsvBulkImportUtil.initCsvImportJob( - conf, tableName, delimiter, null, columnInfoList, true); + conf, tableName, delimiter, quote, escape, null, columnInfoList, true); assertEquals(tableName, conf.get(CsvToKeyValueMapper.TABLE_NAME_CONFKEY)); assertEquals("!", conf.get(CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/9b285dae/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java index 889bf30..56a03e2 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java @@ -17,12 +17,8 @@ */ package org.apache.phoenix.mapreduce; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.List; - +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import org.apache.commons.csv.CSVRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; @@ -31,14 +27,18 @@ import org.apache.phoenix.schema.PDataType; import org.apache.phoenix.util.ColumnInfo; import org.junit.Test; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class CsvToKeyValueMapperTest { @Test public void testCsvLineParser() throws IOException { - CsvToKeyValueMapper.CsvLineParser lineParser = new CsvToKeyValueMapper.CsvLineParser(';'); + CsvToKeyValueMapper.CsvLineParser lineParser = + new CsvToKeyValueMapper.CsvLineParser(';', '"', '\\'); CSVRecord parsed = lineParser.parse("one;two"); assertEquals("one", parsed.get(0)); @@ -47,6 +47,18 @@ public class CsvToKeyValueMapperTest { assertEquals(1, parsed.getRecordNumber()); } + @Test + public void testCsvLineParserWithQuoting() throws IOException { + CsvToKeyValueMapper.CsvLineParser lineParser = + new CsvToKeyValueMapper.CsvLineParser(';', '"', '\\'); + CSVRecord parsed = lineParser.parse("\"\\\"one\";\"\\;two\\\\\""); + + assertEquals("\"one", parsed.get(0)); + assertEquals(";two\\", parsed.get(1)); + assertTrue(parsed.isConsistent()); + assertEquals(1, parsed.getRecordNumber()); + } + @Test public void testBuildColumnInfoList() {