Repository: phoenix
Updated Branches:
  refs/heads/4.5-HBase-1.0 b2cb0005b -> c35c47571


PHOENIX-2238 Support non-printable delimiters

Work around serialization issues for non-printable characters
in Hadoop Configuration objects by base64-encoding the delimiter
characters for CSV bulk load.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c35c4757
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c35c4757
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c35c4757

Branch: refs/heads/4.5-HBase-1.0
Commit: c35c475711593b2dfc1f8a9ec05c09688cca3e7c
Parents: b2cb000
Author: Gabriel Reid <gabri...@ngdata.com>
Authored: Fri Sep 11 22:50:01 2015 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Sat Sep 19 20:31:17 2015 +0200

----------------------------------------------------------------------
 .../phoenix/mapreduce/CsvBulkImportUtil.java    | 22 ++++++--
 .../phoenix/mapreduce/CsvToKeyValueMapper.java  |  6 ++-
 .../mapreduce/CsvBulkImportUtilTest.java        | 57 ++++++++++++++++----
 3 files changed, 71 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/c35c4757/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
index 8f0f7d5..6d77cd5 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
@@ -19,7 +19,9 @@ package org.apache.phoenix.mapreduce;
 
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Base64;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.util.ColumnInfo;
 
@@ -49,9 +51,9 @@ public class CsvBulkImportUtil {
         Preconditions.checkNotNull(columnInfoList);
         Preconditions.checkArgument(!columnInfoList.isEmpty(), "Column info 
list is empty");
         conf.set(CsvToKeyValueMapper.TABLE_NAME_CONFKEY, tableName);
-        conf.set(CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY, 
String.valueOf(fieldDelimiter));
-        conf.set(CsvToKeyValueMapper.QUOTE_CHAR_CONFKEY, 
String.valueOf(quoteChar));
-        conf.set(CsvToKeyValueMapper.ESCAPE_CHAR_CONFKEY, 
String.valueOf(escapeChar));
+        setChar(conf, CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY, 
fieldDelimiter);
+        setChar(conf, CsvToKeyValueMapper.QUOTE_CHAR_CONFKEY, quoteChar);
+        setChar(conf, CsvToKeyValueMapper.ESCAPE_CHAR_CONFKEY, escapeChar);
         if (arrayDelimiter != null) {
             conf.set(CsvToKeyValueMapper.ARRAY_DELIMITER_CONFKEY, 
arrayDelimiter);
         }
@@ -70,4 +72,18 @@ public class CsvBulkImportUtil {
         conf.setClass(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, 
processorClass,
                 ImportPreUpsertKeyValueProcessor.class);
     }
+
+    @VisibleForTesting
+    static void setChar(Configuration conf, String confKey, char charValue) {
+        conf.set(confKey, 
Base64.encodeBytes(Character.toString(charValue).getBytes()));
+    }
+
+    @VisibleForTesting
+    static Character getCharacter(Configuration conf, String confKey) {
+        String strValue = conf.get(confKey);
+        if (strValue == null) {
+            return null;
+        }
+        return new String(Base64.decode(strValue)).charAt(0);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c35c4757/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
index c0328bd..87420c8 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
@@ -124,8 +124,10 @@ public class CsvToKeyValueMapper extends 
Mapper<LongWritable,Text,ImmutableBytes
         upsertListener = new MapperUpsertListener(
                 context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true));
         csvUpsertExecutor = buildUpsertExecutor(conf);
-        csvLineParser = new 
CsvLineParser(conf.get(FIELD_DELIMITER_CONFKEY).charAt(0), 
conf.get(QUOTE_CHAR_CONFKEY).charAt(0),
-                conf.get(ESCAPE_CHAR_CONFKEY).charAt(0));
+        csvLineParser = new CsvLineParser(
+                CsvBulkImportUtil.getCharacter(conf, FIELD_DELIMITER_CONFKEY),
+                CsvBulkImportUtil.getCharacter(conf, QUOTE_CHAR_CONFKEY),
+                CsvBulkImportUtil.getCharacter(conf, ESCAPE_CHAR_CONFKEY));
 
         preUpdateProcessor = 
PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
         if(!conf.get(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY, 
"").isEmpty()){

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c35c4757/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
index a00e228..f52a837 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
@@ -20,6 +20,10 @@ package org.apache.phoenix.mapreduce;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -34,13 +38,13 @@ import com.google.common.collect.ImmutableList;
 public class CsvBulkImportUtilTest {
 
     @Test
-    public void testInitCsvImportJob() {
+    public void testInitCsvImportJob() throws IOException {
         Configuration conf = new Configuration();
 
         String tableName = "SCHEMANAME.TABLENAME";
-        char delimiter = '!';
-        char quote = '"';
-        char escape = '\\';
+        char delimiter = '\001';
+        char quote = '\002';
+        char escape = '!';
 
         List<ColumnInfo> columnInfoList = ImmutableList.of(
                 new ColumnInfo("MYCOL", PInteger.INSTANCE.getSqlType()));
@@ -48,11 +52,27 @@ public class CsvBulkImportUtilTest {
         CsvBulkImportUtil.initCsvImportJob(
                 conf, tableName, delimiter, quote, escape, null, 
columnInfoList, true);
 
-        assertEquals(tableName, 
conf.get(CsvToKeyValueMapper.TABLE_NAME_CONFKEY));
-        assertEquals("!", 
conf.get(CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY));
-        assertNull(conf.get(CsvToKeyValueMapper.ARRAY_DELIMITER_CONFKEY));
-        assertEquals(columnInfoList, 
CsvToKeyValueMapper.buildColumnInfoList(conf));
-        assertEquals(true, 
conf.getBoolean(CsvToKeyValueMapper.IGNORE_INVALID_ROW_CONFKEY, false));
+        // Serialize and deserialize the config to ensure that there aren't 
any issues
+        // with non-printable characters as delimiters
+        File tempFile = File.createTempFile("test-config", ".xml");
+        FileOutputStream fileOutputStream = new FileOutputStream(tempFile);
+        conf.writeXml(fileOutputStream);
+        fileOutputStream.close();
+        Configuration deserialized = new Configuration();
+        deserialized.addResource(new FileInputStream(tempFile));
+
+        assertEquals(tableName, 
deserialized.get(CsvToKeyValueMapper.TABLE_NAME_CONFKEY));
+        assertEquals(Character.valueOf('\001'),
+                CsvBulkImportUtil.getCharacter(deserialized, 
CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY));
+        assertEquals(Character.valueOf('\002'),
+                CsvBulkImportUtil.getCharacter(deserialized, 
CsvToKeyValueMapper.QUOTE_CHAR_CONFKEY));
+        assertEquals(Character.valueOf('!'),
+                CsvBulkImportUtil.getCharacter(deserialized, 
CsvToKeyValueMapper.ESCAPE_CHAR_CONFKEY));
+        
assertNull(deserialized.get(CsvToKeyValueMapper.ARRAY_DELIMITER_CONFKEY));
+        assertEquals(columnInfoList, 
CsvToKeyValueMapper.buildColumnInfoList(deserialized));
+        assertEquals(true, 
deserialized.getBoolean(CsvToKeyValueMapper.IGNORE_INVALID_ROW_CONFKEY, false));
+
+        tempFile.delete();
     }
 
     @Test
@@ -63,6 +83,25 @@ public class CsvBulkImportUtilTest {
         assertEquals(MockProcessor.class, processor.getClass());
     }
 
+    @Test
+    public void testGetAndSetChar_BasicChar() {
+        Configuration conf = new Configuration();
+        CsvBulkImportUtil.setChar(conf, "conf.key", '|');
+        assertEquals(Character.valueOf('|'), 
CsvBulkImportUtil.getCharacter(conf, "conf.key"));
+    }
+
+    @Test
+    public void testGetAndSetChar_NonPrintableChar() {
+        Configuration conf = new Configuration();
+        CsvBulkImportUtil.setChar(conf, "conf.key", '\001');
+        assertEquals(Character.valueOf('\001'), 
CsvBulkImportUtil.getCharacter(conf, "conf.key"));
+    }
+
+    @Test
+    public void testGetChar_NotPresent() {
+        Configuration conf = new Configuration();
+        assertNull(CsvBulkImportUtil.getCharacter(conf, "conf.key"));
+    }
 
     public static class MockProcessor implements 
ImportPreUpsertKeyValueProcessor {
 

Reply via email to