I'm importing a cvs file which has values as double-quoted and with comma (as the fields separator) also in the value of a field, something like this:
*"1","Charlie's Auto Body, \" Inc.","N","1-1"* The first issue is that first field which is a number cannot be parse like one because of the quote. If I remove the quotes from csv file than comma from the value field is treated as field separator and I get one more extra field, which causes the import to fail. This is because the cvs parser don't parse it from double-quoted representation, where it should handle the comma as part of the value. The PSQL import handles such cases. I know I can change the comma separator in my files to something else but it not the best approach and also you need to first pre process the data to find an unused character to use. So, I started to look in the code and I come with a fix, attached patch. Please share your thoughts if you think this would be a useful feature and I can create a jira issue with the patch. The fix includes support to specify quote and escape chars as arguments. -- And in the end, it's not the years in your life that count. It's the life in your years.
From e90272e7eebf27ef1bb543fae1f57b0b30a4f0cb Mon Sep 17 00:00:00 2001 From: Radu Marias <[email protected]> Date: Fri, 7 Nov 2014 20:12:48 +0200 Subject: [PATCH] Add support for quote and escape chars in map reduce bulk import. --- .../phoenix/mapreduce/CsvBulkImportUtil.java | 6 +- .../apache/phoenix/mapreduce/CsvBulkLoadTool.java | 67 +++++++++++++--------- .../phoenix/mapreduce/CsvToKeyValueMapper.java | 57 ++++++++++-------- .../phoenix/mapreduce/CsvBulkImportUtilTest.java | 15 ++--- .../phoenix/mapreduce/CsvToKeyValueMapperTest.java | 23 ++++---- 5 files changed, 98 insertions(+), 70 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java index c27e95f..43e7bb1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java @@ -34,11 +34,13 @@ public class CsvBulkImportUtil { * @param conf job configuration to be set up * @param tableName name of the table to be imported to, can include a schema name * @param fieldDelimiter field delimiter character for the CSV input + * @param quoteChar quote character for the CSV input + * @param escapeChar escape character for the CSV input * @param arrayDelimiter array delimiter character, can be null * @param columnInfoList list of columns to be imported * @param ignoreInvalidRows flag to ignore invalid input rows */ - public static void initCsvImportJob(Configuration conf, String tableName, char fieldDelimiter, + public static void initCsvImportJob(Configuration conf, String tableName, char fieldDelimiter, char quoteChar, char escapeChar, String arrayDelimiter, List<ColumnInfo> columnInfoList, boolean ignoreInvalidRows) { Preconditions.checkNotNull(tableName); @@ -46,6 +48,8 @@ public class CsvBulkImportUtil { Preconditions.checkArgument(!columnInfoList.isEmpty(), "Column info list is empty"); conf.set(CsvToKeyValueMapper.TABLE_NAME_CONFKEY, tableName); conf.set(CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY, String.valueOf(fieldDelimiter)); + conf.set(CsvToKeyValueMapper.QUOTE_CHAR_CONFKEY, String.valueOf(quoteChar)); + conf.set(CsvToKeyValueMapper.ESCAPE_CHAR_CONFKEY, String.valueOf(escapeChar)); if (arrayDelimiter != null) { conf.set(CsvToKeyValueMapper.ARRAY_DELIMITER_CONFKEY, arrayDelimiter); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java index e29405f..0b0e3cb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java @@ -17,25 +17,10 @@ */ package org.apache.phoenix.mapreduce; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.commons.cli.PosixParser; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import org.apache.commons.cli.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; @@ -58,17 +43,21 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.job.JobManager; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.util.CSVCommonsLoader; -import org.apache.phoenix.util.ColumnInfo; -import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.phoenix.util.SchemaUtil; -import org.apache.phoenix.util.StringUtil; +import org.apache.phoenix.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Splitter; -import com.google.common.collect.Lists; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; /** * Base tool for running MapReduce-based ingests of data. @@ -85,6 +74,8 @@ public class CsvBulkLoadTool extends Configured implements Tool { static final Option TABLE_NAME_OPT = new Option("t", "table", true, "Phoenix table name (mandatory)"); static final Option INDEX_TABLE_NAME_OPT = new Option("it", "index-table", true, "Phoenix index table name when just loading this particualar index table"); static final Option DELIMITER_OPT = new Option("d", "delimiter", true, "Input delimiter, defaults to comma"); + static final Option QUOTE_OPT = new Option("q", "quote", true, "Supply a custom phrase delimiter, defaults to double quote character"); + static final Option ESCAPE_OPT = new Option("e", "escape", true, "Supply a custom escape character, default is a backslash"); static final Option ARRAY_DELIMITER_OPT = new Option("a", "array-delimiter", true, "Array element delimiter (optional)"); static final Option IMPORT_COLUMNS_OPT = new Option("c", "import-columns", true, "Comma-separated list of columns to be imported"); static final Option IGNORE_ERRORS_OPT = new Option("g", "ignore-errors", false, "Ignore input errors"); @@ -144,6 +135,8 @@ public class CsvBulkLoadTool extends Configured implements Tool { options.addOption(OUTPUT_PATH_OPT); options.addOption(SCHEMA_NAME_OPT); options.addOption(DELIMITER_OPT); + options.addOption(QUOTE_OPT); + options.addOption(ESCAPE_OPT); options.addOption(ARRAY_DELIMITER_OPT); options.addOption(IMPORT_COLUMNS_OPT); options.addOption(IGNORE_ERRORS_OPT); @@ -323,6 +316,24 @@ public class CsvBulkLoadTool extends Configured implements Tool { delimiterChar = delimString.charAt(0); } + char quoteChar = '"'; + if (cmdLine.hasOption(QUOTE_OPT.getOpt())) { + String quoteString = cmdLine.getOptionValue(QUOTE_OPT.getOpt()); + if (quoteString.length() != 1) { + throw new IllegalArgumentException("Illegal quote character: " + quoteString); + } + quoteChar = quoteString.charAt(0); + } + + char escapeChar = '\\'; + if (cmdLine.hasOption(ESCAPE_OPT.getOpt())) { + String escapeString = cmdLine.getOptionValue(ESCAPE_OPT.getOpt()); + if (escapeString.length() != 1) { + throw new IllegalArgumentException("Illegal escape character: " + escapeString); + } + escapeChar = escapeString.charAt(0); + } + if (cmdLine.hasOption(ZK_QUORUM_OPT.getOpt())) { String zkQuorum = cmdLine.getOptionValue(ZK_QUORUM_OPT.getOpt()); LOG.info("Configuring ZK quorum to {}", zkQuorum); @@ -335,6 +346,8 @@ public class CsvBulkLoadTool extends Configured implements Tool { cmdLine.getOptionValue(SCHEMA_NAME_OPT.getOpt()), cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt())), delimiterChar, + quoteChar, + escapeChar, cmdLine.getOptionValue(ARRAY_DELIMITER_OPT.getOpt()), importColumns, cmdLine.hasOption(IGNORE_ERRORS_OPT.getOpt())); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java index eb701c5..4dc815a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java @@ -17,17 +17,14 @@ */ package org.apache.phoenix.mapreduce; -import java.io.IOException; -import java.io.StringReader; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.util.Iterator; -import java.util.List; -import java.util.Map.Entry; -import java.util.Properties; - -import javax.annotation.Nullable; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; @@ -50,14 +47,15 @@ import org.apache.phoenix.util.csv.CsvUpsertExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; +import javax.annotation.Nullable; +import java.io.IOException; +import java.io.StringReader; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Properties; /** * MapReduce mapper that converts CSV input lines into KeyValues that can be written to HFiles. @@ -79,6 +77,12 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes /** Configuration key for the field delimiter for input csv records */ public static final String FIELD_DELIMITER_CONFKEY = "phoenix.mapreduce.import.fielddelimiter"; + /** Configuration key for the quote char for input csv records */ + public static final String QUOTE_CHAR_CONFKEY = "phoenix.mapreduce.import.quotechar"; + + /** Configuration key for the escape char for input csv records */ + public static final String ESCAPE_CHAR_CONFKEY = "phoenix.mapreduce.import.escapechar"; + /** Configuration key for the array element delimiter for input arrays */ public static final String ARRAY_DELIMITER_CONFKEY = "phoenix.mapreduce.import.arraydelimiter"; @@ -127,7 +131,9 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes upsertListener = new MapperUpsertListener( context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true)); csvUpsertExecutor = buildUpsertExecutor(conf); - csvLineParser = new CsvLineParser(conf.get(FIELD_DELIMITER_CONFKEY).charAt(0)); +// csvLineParser = new CsvLineParser(conf.get(FIELD_DELIMITER_CONFKEY).charAt(0)); + csvLineParser = new CsvLineParser(conf.get(FIELD_DELIMITER_CONFKEY).charAt(0), conf.get(QUOTE_CHAR_CONFKEY).charAt(0), + conf.get(ESCAPE_CHAR_CONFKEY).charAt(0)); preUpdateProcessor = loadPreUpsertProcessor(conf); if(!conf.get(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY, "").isEmpty()){ @@ -298,11 +304,16 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes */ @VisibleForTesting static class CsvLineParser { - private final CSVFormat csvFormat; - CsvLineParser(char fieldDelimiter) { - this.csvFormat = CSVFormat.newFormat(fieldDelimiter); + CsvLineParser(char fieldDelimiter, char quote, char escape) { + this.csvFormat = CSVFormat.DEFAULT + .withIgnoreEmptyLines(true) + .withDelimiter(fieldDelimiter) + .withEscape(escape) + .withQuote(quote) +// .withSkipHeaderRecord(true) + ; } public CSVRecord parse(String input) throws IOException { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java index 28b2b9a..6bf3e47 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java @@ -17,18 +17,17 @@ */ package org.apache.phoenix.mapreduce; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -import java.util.List; - +import com.google.common.collect.ImmutableList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.phoenix.schema.PDataType; import org.apache.phoenix.util.ColumnInfo; import org.junit.Test; -import com.google.common.collect.ImmutableList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; public class CsvBulkImportUtilTest { @@ -38,12 +37,14 @@ public class CsvBulkImportUtilTest { String tableName = "SCHEMANAME.TABLENAME"; char delimiter = '!'; + char quote = '"'; + char escape = '\\'; List<ColumnInfo> columnInfoList = ImmutableList.of( new ColumnInfo("MYCOL", PDataType.INTEGER.getSqlType())); CsvBulkImportUtil.initCsvImportJob( - conf, tableName, delimiter, null, columnInfoList, true); + conf, tableName, delimiter, quote, escape, null, columnInfoList, true); assertEquals(tableName, conf.get(CsvToKeyValueMapper.TABLE_NAME_CONFKEY)); assertEquals("!", conf.get(CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY)); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java index 889bf30..a8f4c1b 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java @@ -17,12 +17,8 @@ */ package org.apache.phoenix.mapreduce; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.List; - +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import org.apache.commons.csv.CSVRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; @@ -31,18 +27,21 @@ import org.apache.phoenix.schema.PDataType; import org.apache.phoenix.util.ColumnInfo; import org.junit.Test; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class CsvToKeyValueMapperTest { @Test public void testCsvLineParser() throws IOException { - CsvToKeyValueMapper.CsvLineParser lineParser = new CsvToKeyValueMapper.CsvLineParser(';'); - CSVRecord parsed = lineParser.parse("one;two"); + CsvToKeyValueMapper.CsvLineParser lineParser = new CsvToKeyValueMapper.CsvLineParser(';', '"', '\\'); + CSVRecord parsed = lineParser.parse("\"\\\"one\";\"\\;two\\\\\""); - assertEquals("one", parsed.get(0)); - assertEquals("two", parsed.get(1)); + assertEquals("\"one", parsed.get(0)); + assertEquals(";two\\", parsed.get(1)); assertTrue(parsed.isConsistent()); assertEquals(1, parsed.getRecordNumber()); } -- 2.1.3
