Repository: phoenix
Updated Branches:
  refs/heads/master 09360c4e4 -> e6eb77121


PHOENIX-2517 Bulk load tools should support multiple input files


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e6eb7712
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e6eb7712
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e6eb7712

Branch: refs/heads/master
Commit: e6eb77121c0f543b9daf0dada6f53ea1d2518a30
Parents: 09360c4
Author: Nick Dimiduk <ndimi...@apache.org>
Authored: Fri Dec 11 12:41:49 2015 -0800
Committer: Nick Dimiduk <ndimi...@apache.org>
Committed: Tue Dec 15 12:21:50 2015 -0800

----------------------------------------------------------------------
 .../phoenix/mapreduce/CsvBulkLoadToolIT.java    | 41 ++++++++++++++++++++
 .../phoenix/mapreduce/AbstractBulkLoadTool.java | 16 ++++----
 .../mapreduce/MultiHfileOutputFormat.java       |  2 +-
 3 files changed, 50 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e6eb7712/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
index 4a440d6..7daacb4 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
@@ -36,6 +36,7 @@ import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -166,6 +167,46 @@ public class CsvBulkLoadToolIT {
     }
 
     @Test
+    public void testMultipleInputFiles() throws Exception {
+
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE TABLE TABLE7 (ID INTEGER NOT NULL PRIMARY KEY, 
NAME VARCHAR, T DATE) SPLIT ON (1,2)");
+
+        FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+        FSDataOutputStream outputStream = fs.create(new 
Path("/tmp/input1.csv"));
+        PrintWriter printWriter = new PrintWriter(outputStream);
+        printWriter.println("1,Name 1,1970/01/01");
+        printWriter.close();
+        outputStream = fs.create(new Path("/tmp/input2.csv"));
+        printWriter = new PrintWriter(outputStream);
+        printWriter.println("2,Name 2,1970/01/02");
+        printWriter.close();
+
+        CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+        csvBulkLoadTool.setConf(new 
Configuration(hbaseTestUtil.getConfiguration()));
+        csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd");
+        int exitCode = csvBulkLoadTool.run(new String[] {
+            "--input", "/tmp/input1.csv,/tmp/input2.csv",
+            "--table", "table7",
+            "--zookeeper", zkQuorum});
+        assertEquals(0, exitCode);
+
+        ResultSet rs = stmt.executeQuery("SELECT id, name, t FROM table7 ORDER 
BY id");
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        assertEquals("Name 1", rs.getString(2));
+        assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3));
+        assertTrue(rs.next());
+        assertEquals(2, rs.getInt(1));
+        assertEquals("Name 2", rs.getString(2));
+        assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3));
+        assertFalse(rs.next());
+
+        rs.close();
+        stmt.close();
+    }
+
+    @Test
     public void testImportWithIndex() throws Exception {
 
         Statement stmt = conn.createStatement();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e6eb7712/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index cf9ddef..1d2594d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -74,7 +74,7 @@ public abstract class AbstractBulkLoadTool extends Configured 
implements Tool {
     protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractBulkLoadTool.class);
 
     static final Option ZK_QUORUM_OPT = new Option("z", "zookeeper", true, 
"Supply zookeeper connection details (optional)");
-    static final Option INPUT_PATH_OPT = new Option("i", "input", true, "Input 
path (mandatory)");
+    static final Option INPUT_PATH_OPT = new Option("i", "input", true, "Input 
path(s) (comma-separated, mandatory)");
     static final Option OUTPUT_PATH_OPT = new Option("o", "output", true, 
"Output path for temporary HFiles (optional)");
     static final Option SCHEMA_NAME_OPT = new Option("s", "schema", true, 
"Phoenix schema name (optional)");
     static final Option TABLE_NAME_OPT = new Option("t", "table", true, 
"Phoenix table name (mandatory)");
@@ -219,7 +219,7 @@ public abstract class AbstractBulkLoadTool extends 
Configured implements Tool {
             conn.close();
         }
 
-        final Path inputPath = new 
Path(cmdLine.getOptionValue(INPUT_PATH_OPT.getOpt()));
+        final String inputPaths = 
cmdLine.getOptionValue(INPUT_PATH_OPT.getOpt());
         final Path outputPath;
         if (cmdLine.hasOption(OUTPUT_PATH_OPT.getOpt())) {
             outputPath = new 
Path(cmdLine.getOptionValue(OUTPUT_PATH_OPT.getOpt()));
@@ -249,18 +249,18 @@ public abstract class AbstractBulkLoadTool extends 
Configured implements Tool {
             tablesToBeLoaded.add(targetIndexRef);
         }
 
-        return submitJob(conf, tableName, inputPath, outputPath, 
tablesToBeLoaded);
+        return submitJob(conf, tableName, inputPaths, outputPath, 
tablesToBeLoaded);
     }
 
     /**
      * Submits the jobs to the cluster.
      * Loads the HFiles onto the respective tables.
      */
-    public int submitJob(final Configuration conf, final String 
qualifiedTableName, final Path inputPath,
-                         final Path outputPath , List<TargetTableRef> 
tablesToBeLoaded) {
+    public int submitJob(final Configuration conf, final String 
qualifiedTableName,
+        final String inputPaths, final Path outputPath, List<TargetTableRef> 
tablesToBeLoaded) {
         try {
             Job job = new Job(conf, "Phoenix MapReduce import for " + 
qualifiedTableName);
-            FileInputFormat.addInputPath(job, inputPath);
+            FileInputFormat.addInputPaths(job, inputPaths);
             FileOutputFormat.setOutputPath(job, outputPath);
 
             job.setInputFormatClass(TextInputFormat.class);
@@ -278,7 +278,7 @@ public abstract class AbstractBulkLoadTool extends 
Configured implements Tool {
             // give subclasses their hook
             setupJob(job);
 
-            LOG.info("Running MapReduce import job from {} to {}", inputPath, 
outputPath);
+            LOG.info("Running MapReduce import job from {} to {}", inputPaths, 
outputPath);
             boolean success = job.waitForCompletion(true);
 
             if (success) {
@@ -292,7 +292,7 @@ public abstract class AbstractBulkLoadTool extends 
Configured implements Tool {
             }
             return 0;
         } catch(Exception e) {
-            LOG.error("Error {} occurred submitting BulkLoad ",e.getMessage());
+            LOG.error("Error occurred submitting BulkLoad ", e);
             return -1;
         }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e6eb7712/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java
index 05fbab2..7d79d64 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java
@@ -687,7 +687,7 @@ public class MultiHfileOutputFormat extends 
FileOutputFormat<TableRowkeyPair, Ce
                conf.set(tableName, tableDefns);
                
                TargetTableRef tbl = 
TargetTableRefFunctions.FROM_JSON.apply(tableDefns);
-               LOG.error(" the table logical name is "+ tbl.getLogicalName());
+               LOG.info(" the table logical name is "+ tbl.getLogicalName());
            }
        }
     

Reply via email to