Repository: phoenix
Updated Branches:
  refs/heads/master 22e27e5ed -> 9b285dae0


PHOENIX-1440 Support csv quoting in bulk loader

Add support for quote and escape chars in map reduce bulk import.

Original patch by Radu Marias, with very minor cleanup by
Gabriel Reid.

Signed-off-by: Gabriel Reid <gr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9b285dae
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9b285dae
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9b285dae

Branch: refs/heads/master
Commit: 9b285dae032d1e61bb6a804b2357151d9d1905c5
Parents: 22e27e5
Author: Radu Marias <radumar...@gmail.com>
Authored: Fri Nov 7 20:12:48 2014 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Thu Nov 13 20:07:07 2014 +0100

----------------------------------------------------------------------
 .../phoenix/mapreduce/CsvBulkImportUtil.java    | 10 ++++--
 .../phoenix/mapreduce/CsvBulkLoadTool.java      | 31 ++++++++++++++---
 .../phoenix/mapreduce/CsvToKeyValueMapper.java  | 35 ++++++++++++--------
 .../mapreduce/CsvBulkImportUtilTest.java        | 15 +++++----
 .../mapreduce/CsvToKeyValueMapperTest.java      | 30 ++++++++++++-----
 5 files changed, 85 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/9b285dae/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
index c27e95f..e62cbb8 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
@@ -17,12 +17,12 @@
  */
 package org.apache.phoenix.mapreduce;
 
+import java.util.List;
+
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.phoenix.util.ColumnInfo;
 
-import java.util.List;
-
 /**
  * Collection of utility methods for setting up bulk import jobs.
  */
@@ -34,11 +34,13 @@ public class CsvBulkImportUtil {
      * @param conf job configuration to be set up
      * @param tableName name of the table to be imported to, can include a 
schema name
      * @param fieldDelimiter field delimiter character for the CSV input
+     * @param quoteChar quote character for the CSV input
+     * @param escapeChar escape character for the CSV input
      * @param arrayDelimiter array delimiter character, can be null
      * @param columnInfoList list of columns to be imported
      * @param ignoreInvalidRows flag to ignore invalid input rows
      */
-    public static void initCsvImportJob(Configuration conf, String tableName, 
char fieldDelimiter,
+    public static void initCsvImportJob(Configuration conf, String tableName, 
char fieldDelimiter, char quoteChar, char escapeChar,
             String arrayDelimiter,  List<ColumnInfo> columnInfoList, boolean 
ignoreInvalidRows) {
 
         Preconditions.checkNotNull(tableName);
@@ -46,6 +48,8 @@ public class CsvBulkImportUtil {
         Preconditions.checkArgument(!columnInfoList.isEmpty(), "Column info 
list is empty");
         conf.set(CsvToKeyValueMapper.TABLE_NAME_CONFKEY, tableName);
         conf.set(CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY, 
String.valueOf(fieldDelimiter));
+        conf.set(CsvToKeyValueMapper.QUOTE_CHAR_CONFKEY, 
String.valueOf(quoteChar));
+        conf.set(CsvToKeyValueMapper.ESCAPE_CHAR_CONFKEY, 
String.valueOf(escapeChar));
         if (arrayDelimiter != null) {
             conf.set(CsvToKeyValueMapper.ARRAY_DELIMITER_CONFKEY, 
arrayDelimiter);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9b285dae/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
index e29405f..54e3f2c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
@@ -29,6 +29,9 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -66,10 +69,6 @@ import org.apache.phoenix.util.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-
 /**
  * Base tool for running MapReduce-based ingests of data.
  */
@@ -85,6 +84,8 @@ public class CsvBulkLoadTool extends Configured implements 
Tool {
     static final Option TABLE_NAME_OPT = new Option("t", "table", true, 
"Phoenix table name (mandatory)");
     static final Option INDEX_TABLE_NAME_OPT = new Option("it", "index-table", 
true, "Phoenix index table name when just loading this particualar index 
table");
     static final Option DELIMITER_OPT = new Option("d", "delimiter", true, 
"Input delimiter, defaults to comma");
+    static final Option QUOTE_OPT = new Option("q", "quote", true, "Supply a 
custom phrase delimiter, defaults to double quote character");
+    static final Option ESCAPE_OPT = new Option("e", "escape", true, "Supply a 
custom escape character, default is a backslash");
     static final Option ARRAY_DELIMITER_OPT = new Option("a", 
"array-delimiter", true, "Array element delimiter (optional)");
     static final Option IMPORT_COLUMNS_OPT = new Option("c", "import-columns", 
true, "Comma-separated list of columns to be imported");
     static final Option IGNORE_ERRORS_OPT = new Option("g", "ignore-errors", 
false, "Ignore input errors");
@@ -144,6 +145,8 @@ public class CsvBulkLoadTool extends Configured implements 
Tool {
         options.addOption(OUTPUT_PATH_OPT);
         options.addOption(SCHEMA_NAME_OPT);
         options.addOption(DELIMITER_OPT);
+        options.addOption(QUOTE_OPT);
+        options.addOption(ESCAPE_OPT);
         options.addOption(ARRAY_DELIMITER_OPT);
         options.addOption(IMPORT_COLUMNS_OPT);
         options.addOption(IGNORE_ERRORS_OPT);
@@ -323,6 +326,24 @@ public class CsvBulkLoadTool extends Configured implements 
Tool {
             delimiterChar = delimString.charAt(0);
         }
 
+        char quoteChar = '"';
+        if (cmdLine.hasOption(QUOTE_OPT.getOpt())) {
+            String quoteString = cmdLine.getOptionValue(QUOTE_OPT.getOpt());
+            if (quoteString.length() != 1) {
+                throw new IllegalArgumentException("Illegal quote character: " 
+ quoteString);
+            }
+            quoteChar = quoteString.charAt(0);
+        }
+
+        char escapeChar = '\\';
+        if (cmdLine.hasOption(ESCAPE_OPT.getOpt())) {
+            String escapeString = cmdLine.getOptionValue(ESCAPE_OPT.getOpt());
+            if (escapeString.length() != 1) {
+                throw new IllegalArgumentException("Illegal escape character: 
" + escapeString);
+            }
+            escapeChar = escapeString.charAt(0);
+        }
+
         if (cmdLine.hasOption(ZK_QUORUM_OPT.getOpt())) {
             String zkQuorum = cmdLine.getOptionValue(ZK_QUORUM_OPT.getOpt());
             LOG.info("Configuring ZK quorum to {}", zkQuorum);
@@ -335,6 +356,8 @@ public class CsvBulkLoadTool extends Configured implements 
Tool {
                         cmdLine.getOptionValue(SCHEMA_NAME_OPT.getOpt()),
                         cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt())),
                 delimiterChar,
+                quoteChar,
+                escapeChar,
                 cmdLine.getOptionValue(ARRAY_DELIMITER_OPT.getOpt()),
                 importColumns,
                 cmdLine.hasOption(IGNORE_ERRORS_OPT.getOpt()));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9b285dae/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
index eb701c5..ead5067 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
@@ -28,6 +28,14 @@ import java.util.Properties;
 
 import javax.annotation.Nullable;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVParser;
 import org.apache.commons.csv.CSVRecord;
@@ -50,15 +58,6 @@ import org.apache.phoenix.util.csv.CsvUpsertExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
 /**
  * MapReduce mapper that converts CSV input lines into KeyValues that can be 
written to HFiles.
  * <p/>
@@ -79,6 +78,12 @@ public class CsvToKeyValueMapper extends 
Mapper<LongWritable,Text,ImmutableBytes
     /** Configuration key for the field delimiter for input csv records */
     public static final String FIELD_DELIMITER_CONFKEY = 
"phoenix.mapreduce.import.fielddelimiter";
 
+    /** Configuration key for the quote char for input csv records */
+    public static final String QUOTE_CHAR_CONFKEY = 
"phoenix.mapreduce.import.quotechar";
+
+    /** Configuration key for the escape char for input csv records */
+    public static final String ESCAPE_CHAR_CONFKEY = 
"phoenix.mapreduce.import.escapechar";
+
     /** Configuration key for the array element delimiter for input arrays */
     public static final String ARRAY_DELIMITER_CONFKEY = 
"phoenix.mapreduce.import.arraydelimiter";
 
@@ -127,7 +132,8 @@ public class CsvToKeyValueMapper extends 
Mapper<LongWritable,Text,ImmutableBytes
         upsertListener = new MapperUpsertListener(
                 context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true));
         csvUpsertExecutor = buildUpsertExecutor(conf);
-        csvLineParser = new 
CsvLineParser(conf.get(FIELD_DELIMITER_CONFKEY).charAt(0));
+        csvLineParser = new 
CsvLineParser(conf.get(FIELD_DELIMITER_CONFKEY).charAt(0), 
conf.get(QUOTE_CHAR_CONFKEY).charAt(0),
+                conf.get(ESCAPE_CHAR_CONFKEY).charAt(0));
 
         preUpdateProcessor = loadPreUpsertProcessor(conf);
         if(!conf.get(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY, 
"").isEmpty()){
@@ -298,11 +304,14 @@ public class CsvToKeyValueMapper extends 
Mapper<LongWritable,Text,ImmutableBytes
      */
     @VisibleForTesting
     static class CsvLineParser {
-
         private final CSVFormat csvFormat;
 
-        CsvLineParser(char fieldDelimiter) {
-            this.csvFormat = CSVFormat.newFormat(fieldDelimiter);
+        CsvLineParser(char fieldDelimiter, char quote, char escape) {
+            this.csvFormat = CSVFormat.DEFAULT
+                    .withIgnoreEmptyLines(true)
+                    .withDelimiter(fieldDelimiter)
+                    .withEscape(escape)
+                    .withQuote(quote);
         }
 
         public CSVRecord parse(String input) throws IOException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9b285dae/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
index 28b2b9a..6bf3e47 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
@@ -17,18 +17,17 @@
  */
 package org.apache.phoenix.mapreduce;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.util.List;
-
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.util.ColumnInfo;
 import org.junit.Test;
 
-import com.google.common.collect.ImmutableList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 public class CsvBulkImportUtilTest {
 
@@ -38,12 +37,14 @@ public class CsvBulkImportUtilTest {
 
         String tableName = "SCHEMANAME.TABLENAME";
         char delimiter = '!';
+        char quote = '"';
+        char escape = '\\';
 
         List<ColumnInfo> columnInfoList = ImmutableList.of(
                 new ColumnInfo("MYCOL", PDataType.INTEGER.getSqlType()));
 
         CsvBulkImportUtil.initCsvImportJob(
-                conf, tableName, delimiter, null, columnInfoList, true);
+                conf, tableName, delimiter, quote, escape, null, 
columnInfoList, true);
 
         assertEquals(tableName, 
conf.get(CsvToKeyValueMapper.TABLE_NAME_CONFKEY));
         assertEquals("!", 
conf.get(CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9b285dae/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
index 889bf30..56a03e2 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
@@ -17,12 +17,8 @@
  */
 package org.apache.phoenix.mapreduce;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.List;
-
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import org.apache.commons.csv.CSVRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
@@ -31,14 +27,18 @@ import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.util.ColumnInfo;
 import org.junit.Test;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class CsvToKeyValueMapperTest {
 
     @Test
     public void testCsvLineParser() throws IOException {
-        CsvToKeyValueMapper.CsvLineParser lineParser = new 
CsvToKeyValueMapper.CsvLineParser(';');
+        CsvToKeyValueMapper.CsvLineParser lineParser =
+                new CsvToKeyValueMapper.CsvLineParser(';', '"', '\\');
         CSVRecord parsed = lineParser.parse("one;two");
 
         assertEquals("one", parsed.get(0));
@@ -47,6 +47,18 @@ public class CsvToKeyValueMapperTest {
         assertEquals(1, parsed.getRecordNumber());
     }
 
+    @Test
+    public void testCsvLineParserWithQuoting() throws IOException {
+        CsvToKeyValueMapper.CsvLineParser lineParser =
+                new CsvToKeyValueMapper.CsvLineParser(';', '"', '\\');
+        CSVRecord parsed = lineParser.parse("\"\\\"one\";\"\\;two\\\\\"");
+
+        assertEquals("\"one", parsed.get(0));
+        assertEquals(";two\\", parsed.get(1));
+        assertTrue(parsed.isConsistent());
+        assertEquals(1, parsed.getRecordNumber());
+    }
+
 
     @Test
     public void testBuildColumnInfoList() {

Reply via email to