Repository: phoenix Updated Branches: refs/heads/master 09360c4e4 -> e6eb77121
PHOENIX-2517 Bulk load tools should support multiple input files Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e6eb7712 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e6eb7712 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e6eb7712 Branch: refs/heads/master Commit: e6eb77121c0f543b9daf0dada6f53ea1d2518a30 Parents: 09360c4 Author: Nick Dimiduk <ndimi...@apache.org> Authored: Fri Dec 11 12:41:49 2015 -0800 Committer: Nick Dimiduk <ndimi...@apache.org> Committed: Tue Dec 15 12:21:50 2015 -0800 ---------------------------------------------------------------------- .../phoenix/mapreduce/CsvBulkLoadToolIT.java | 41 ++++++++++++++++++++ .../phoenix/mapreduce/AbstractBulkLoadTool.java | 16 ++++---- .../mapreduce/MultiHfileOutputFormat.java | 2 +- 3 files changed, 50 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/e6eb7712/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java index 4a440d6..7daacb4 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java @@ -36,6 +36,7 @@ import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -166,6 +167,46 @@ public class CsvBulkLoadToolIT { } @Test + public void testMultipleInputFiles() throws Exception { + + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE TABLE7 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE) SPLIT ON (1,2)"); + + FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration()); + FSDataOutputStream outputStream = fs.create(new Path("/tmp/input1.csv")); + PrintWriter printWriter = new PrintWriter(outputStream); + printWriter.println("1,Name 1,1970/01/01"); + printWriter.close(); + outputStream = fs.create(new Path("/tmp/input2.csv")); + printWriter = new PrintWriter(outputStream); + printWriter.println("2,Name 2,1970/01/02"); + printWriter.close(); + + CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); + csvBulkLoadTool.setConf(new Configuration(hbaseTestUtil.getConfiguration())); + csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd"); + int exitCode = csvBulkLoadTool.run(new String[] { + "--input", "/tmp/input1.csv,/tmp/input2.csv", + "--table", "table7", + "--zookeeper", zkQuorum}); + assertEquals(0, exitCode); + + ResultSet rs = stmt.executeQuery("SELECT id, name, t FROM table7 ORDER BY id"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Name 1", rs.getString(2)); + assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3)); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals("Name 2", rs.getString(2)); + assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3)); + assertFalse(rs.next()); + + rs.close(); + stmt.close(); + } + + @Test public void testImportWithIndex() throws Exception { Statement stmt = conn.createStatement(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/e6eb7712/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java index cf9ddef..1d2594d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java @@ -74,7 +74,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { protected static final Logger LOG = LoggerFactory.getLogger(AbstractBulkLoadTool.class); static final Option ZK_QUORUM_OPT = new Option("z", "zookeeper", true, "Supply zookeeper connection details (optional)"); - static final Option INPUT_PATH_OPT = new Option("i", "input", true, "Input path (mandatory)"); + static final Option INPUT_PATH_OPT = new Option("i", "input", true, "Input path(s) (comma-separated, mandatory)"); static final Option OUTPUT_PATH_OPT = new Option("o", "output", true, "Output path for temporary HFiles (optional)"); static final Option SCHEMA_NAME_OPT = new Option("s", "schema", true, "Phoenix schema name (optional)"); static final Option TABLE_NAME_OPT = new Option("t", "table", true, "Phoenix table name (mandatory)"); @@ -219,7 +219,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { conn.close(); } - final Path inputPath = new Path(cmdLine.getOptionValue(INPUT_PATH_OPT.getOpt())); + final String inputPaths = cmdLine.getOptionValue(INPUT_PATH_OPT.getOpt()); final Path outputPath; if (cmdLine.hasOption(OUTPUT_PATH_OPT.getOpt())) { outputPath = new Path(cmdLine.getOptionValue(OUTPUT_PATH_OPT.getOpt())); @@ -249,18 +249,18 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { tablesToBeLoaded.add(targetIndexRef); } - return submitJob(conf, tableName, inputPath, outputPath, tablesToBeLoaded); + return submitJob(conf, tableName, inputPaths, outputPath, tablesToBeLoaded); } /** * Submits the jobs to the cluster. * Loads the HFiles onto the respective tables. */ - public int submitJob(final Configuration conf, final String qualifiedTableName, final Path inputPath, - final Path outputPath , List<TargetTableRef> tablesToBeLoaded) { + public int submitJob(final Configuration conf, final String qualifiedTableName, + final String inputPaths, final Path outputPath, List<TargetTableRef> tablesToBeLoaded) { try { Job job = new Job(conf, "Phoenix MapReduce import for " + qualifiedTableName); - FileInputFormat.addInputPath(job, inputPath); + FileInputFormat.addInputPaths(job, inputPaths); FileOutputFormat.setOutputPath(job, outputPath); job.setInputFormatClass(TextInputFormat.class); @@ -278,7 +278,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { // give subclasses their hook setupJob(job); - LOG.info("Running MapReduce import job from {} to {}", inputPath, outputPath); + LOG.info("Running MapReduce import job from {} to {}", inputPaths, outputPath); boolean success = job.waitForCompletion(true); if (success) { @@ -292,7 +292,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { } return 0; } catch(Exception e) { - LOG.error("Error {} occurred submitting BulkLoad ",e.getMessage()); + LOG.error("Error occurred submitting BulkLoad ", e); return -1; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e6eb7712/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java index 05fbab2..7d79d64 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java @@ -687,7 +687,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce conf.set(tableName, tableDefns); TargetTableRef tbl = TargetTableRefFunctions.FROM_JSON.apply(tableDefns); - LOG.error(" the table logical name is "+ tbl.getLogicalName()); + LOG.info(" the table logical name is "+ tbl.getLogicalName()); } }