Repository: phoenix Updated Branches: refs/heads/4.5-HBase-1.0 b2cb0005b -> c35c47571
PHOENIX-2238 Support non-printable delimiters Work around serialization issues for non-printable characters in Hadoop Configuration objects by base64-encoding the delimiter characters for CSV bulk load. Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c35c4757 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c35c4757 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c35c4757 Branch: refs/heads/4.5-HBase-1.0 Commit: c35c475711593b2dfc1f8a9ec05c09688cca3e7c Parents: b2cb000 Author: Gabriel Reid <gabri...@ngdata.com> Authored: Fri Sep 11 22:50:01 2015 +0200 Committer: Gabriel Reid <gr...@apache.org> Committed: Sat Sep 19 20:31:17 2015 +0200 ---------------------------------------------------------------------- .../phoenix/mapreduce/CsvBulkImportUtil.java | 22 ++++++-- .../phoenix/mapreduce/CsvToKeyValueMapper.java | 6 ++- .../mapreduce/CsvBulkImportUtilTest.java | 57 ++++++++++++++++---- 3 files changed, 71 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c35c4757/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java index 8f0f7d5..6d77cd5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java @@ -19,7 +19,9 @@ package org.apache.phoenix.mapreduce; import java.util.List; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Base64; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.util.ColumnInfo; @@ -49,9 +51,9 @@ public class CsvBulkImportUtil { Preconditions.checkNotNull(columnInfoList); Preconditions.checkArgument(!columnInfoList.isEmpty(), "Column info list is empty"); conf.set(CsvToKeyValueMapper.TABLE_NAME_CONFKEY, tableName); - conf.set(CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY, String.valueOf(fieldDelimiter)); - conf.set(CsvToKeyValueMapper.QUOTE_CHAR_CONFKEY, String.valueOf(quoteChar)); - conf.set(CsvToKeyValueMapper.ESCAPE_CHAR_CONFKEY, String.valueOf(escapeChar)); + setChar(conf, CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY, fieldDelimiter); + setChar(conf, CsvToKeyValueMapper.QUOTE_CHAR_CONFKEY, quoteChar); + setChar(conf, CsvToKeyValueMapper.ESCAPE_CHAR_CONFKEY, escapeChar); if (arrayDelimiter != null) { conf.set(CsvToKeyValueMapper.ARRAY_DELIMITER_CONFKEY, arrayDelimiter); } @@ -70,4 +72,18 @@ public class CsvBulkImportUtil { conf.setClass(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, processorClass, ImportPreUpsertKeyValueProcessor.class); } + + @VisibleForTesting + static void setChar(Configuration conf, String confKey, char charValue) { + conf.set(confKey, Base64.encodeBytes(Character.toString(charValue).getBytes())); + } + + @VisibleForTesting + static Character getCharacter(Configuration conf, String confKey) { + String strValue = conf.get(confKey); + if (strValue == null) { + return null; + } + return new String(Base64.decode(strValue)).charAt(0); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c35c4757/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java index c0328bd..87420c8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java @@ -124,8 +124,10 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes upsertListener = new MapperUpsertListener( context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true)); csvUpsertExecutor = buildUpsertExecutor(conf); - csvLineParser = new CsvLineParser(conf.get(FIELD_DELIMITER_CONFKEY).charAt(0), conf.get(QUOTE_CHAR_CONFKEY).charAt(0), - conf.get(ESCAPE_CHAR_CONFKEY).charAt(0)); + csvLineParser = new CsvLineParser( + CsvBulkImportUtil.getCharacter(conf, FIELD_DELIMITER_CONFKEY), + CsvBulkImportUtil.getCharacter(conf, QUOTE_CHAR_CONFKEY), + CsvBulkImportUtil.getCharacter(conf, ESCAPE_CHAR_CONFKEY)); preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf); if(!conf.get(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY, "").isEmpty()){ http://git-wip-us.apache.org/repos/asf/phoenix/blob/c35c4757/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java index a00e228..f52a837 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java @@ -20,6 +20,10 @@ package org.apache.phoenix.mapreduce; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -34,13 +38,13 @@ import com.google.common.collect.ImmutableList; public class CsvBulkImportUtilTest { @Test - public void testInitCsvImportJob() { + public void testInitCsvImportJob() throws IOException { Configuration conf = new Configuration(); String tableName = "SCHEMANAME.TABLENAME"; - char delimiter = '!'; - char quote = '"'; - char escape = '\\'; + char delimiter = '\001'; + char quote = '\002'; + char escape = '!'; List<ColumnInfo> columnInfoList = ImmutableList.of( new ColumnInfo("MYCOL", PInteger.INSTANCE.getSqlType())); @@ -48,11 +52,27 @@ public class CsvBulkImportUtilTest { CsvBulkImportUtil.initCsvImportJob( conf, tableName, delimiter, quote, escape, null, columnInfoList, true); - assertEquals(tableName, conf.get(CsvToKeyValueMapper.TABLE_NAME_CONFKEY)); - assertEquals("!", conf.get(CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY)); - assertNull(conf.get(CsvToKeyValueMapper.ARRAY_DELIMITER_CONFKEY)); - assertEquals(columnInfoList, CsvToKeyValueMapper.buildColumnInfoList(conf)); - assertEquals(true, conf.getBoolean(CsvToKeyValueMapper.IGNORE_INVALID_ROW_CONFKEY, false)); + // Serialize and deserialize the config to ensure that there aren't any issues + // with non-printable characters as delimiters + File tempFile = File.createTempFile("test-config", ".xml"); + FileOutputStream fileOutputStream = new FileOutputStream(tempFile); + conf.writeXml(fileOutputStream); + fileOutputStream.close(); + Configuration deserialized = new Configuration(); + deserialized.addResource(new FileInputStream(tempFile)); + + assertEquals(tableName, deserialized.get(CsvToKeyValueMapper.TABLE_NAME_CONFKEY)); + assertEquals(Character.valueOf('\001'), + CsvBulkImportUtil.getCharacter(deserialized, CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY)); + assertEquals(Character.valueOf('\002'), + CsvBulkImportUtil.getCharacter(deserialized, CsvToKeyValueMapper.QUOTE_CHAR_CONFKEY)); + assertEquals(Character.valueOf('!'), + CsvBulkImportUtil.getCharacter(deserialized, CsvToKeyValueMapper.ESCAPE_CHAR_CONFKEY)); + assertNull(deserialized.get(CsvToKeyValueMapper.ARRAY_DELIMITER_CONFKEY)); + assertEquals(columnInfoList, CsvToKeyValueMapper.buildColumnInfoList(deserialized)); + assertEquals(true, deserialized.getBoolean(CsvToKeyValueMapper.IGNORE_INVALID_ROW_CONFKEY, false)); + + tempFile.delete(); } @Test @@ -63,6 +83,25 @@ public class CsvBulkImportUtilTest { assertEquals(MockProcessor.class, processor.getClass()); } + @Test + public void testGetAndSetChar_BasicChar() { + Configuration conf = new Configuration(); + CsvBulkImportUtil.setChar(conf, "conf.key", '|'); + assertEquals(Character.valueOf('|'), CsvBulkImportUtil.getCharacter(conf, "conf.key")); + } + + @Test + public void testGetAndSetChar_NonPrintableChar() { + Configuration conf = new Configuration(); + CsvBulkImportUtil.setChar(conf, "conf.key", '\001'); + assertEquals(Character.valueOf('\001'), CsvBulkImportUtil.getCharacter(conf, "conf.key")); + } + + @Test + public void testGetChar_NotPresent() { + Configuration conf = new Configuration(); + assertNull(CsvBulkImportUtil.getCharacter(conf, "conf.key")); + } public static class MockProcessor implements ImportPreUpsertKeyValueProcessor {