This is an automated email from the ASF dual-hosted git repository.

jmark99 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo-examples.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a71fb7  Update BulkIngest example to work with 2.0
     new befc2e3  Merge pull request #63 from jmark99/bingest
8a71fb7 is described below

commit 8a71fb7a967f57f69ec65a8c8012a7dc4915c911
Author: Mark Owens <jmar...@apache.org>
AuthorDate: Fri Feb 12 14:29:51 2021 -0500

    Update BulkIngest example to work with 2.0
    
    Update the bulkIngest example to work correctly with Accumulo 2.0.x.
    Minor updates made to the documentation. The classes were refactored so
    the example will work correctly with 2.0 changes. Generation of test
    data was moved from SetupTable to BulkIngestExample (as in 1.10.x
    version). Prior to change, necessary data was not correctly being written
    to HDFS thereby preventing the BulkIngestExample class from finding
    information needed to generate the data.
---
 docs/bulkIngest.md                                 | 10 ++---
 .../examples/mapreduce/bulk/BulkIngestExample.java | 51 ++++++++++++++++------
 .../examples/mapreduce/bulk/SetupTable.java        | 30 ++++---------
 .../examples/mapreduce/bulk/VerifyIngest.java      | 27 +++++++-----
 4 files changed, 64 insertions(+), 54 deletions(-)

diff --git a/docs/bulkIngest.md b/docs/bulkIngest.md
index d581b41..6edee37 100644
--- a/docs/bulkIngest.md
+++ b/docs/bulkIngest.md
@@ -16,18 +16,14 @@ limitations under the License.
 -->
 # Apache Accumulo Bulk Ingest Example
 
-This is an example of how to bulk ingest data into Accumulo using map reduce.
+This is an example of how to bulk ingest data into Accumulo using mapReduce.
 
 This tutorial uses the following Java classes.
 
- * [SetupTable.java] - creates the table and some data to ingest
- * [BulkIngestExample.java] - ingest the data using map reduce
+ * [SetupTable.java] - creates the table, 'test_bulk', and sets two split 
points.
+ * [BulkIngestExample.java] - creates some data to ingest and then ingests the 
data using mapReduce
  * [VerifyIngest.java] - checks that the data was ingested
  
-Remember to copy the accumulo-examples\*.jar to Accumulo's 'lib/ext' directory.
-
-    $ cp target/accumulo-examples*.jar /path/accumulo/lib/ext
-
 The following commands show how to run this example. This example creates a
 table called test_bulk which has two initial split points. Then 1000 rows of
 test data are created in HDFS. After that the 1000 rows are ingested into
diff --git 
a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
 
b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
index 835b3ab..0791265 100644
--- 
a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
+++ 
b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
@@ -42,13 +42,21 @@ import 
org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  * Example map reduce job that bulk ingest data into an accumulo table. The 
expected input is text
  * files containing tab separated key value pairs on each line.
  */
-public class BulkIngestExample {
-  static String workDir = "tmp/bulkWork";
-  static String inputDir = "bulk";
+public final class BulkIngestExample {
+  static final String workDir = "tmp/bulkWork";
+  static final String inputDir = "bulk";
+  static final String outputFile = "bulk/test_1.txt";
+  static final int numRows = 1000;
+
+  static final String SLASH_FILES = "/files";
+  static final String FAILURES = "failures";
+  static final String SPLITS_TXT = "/splits.txt";
+
+  private BulkIngestExample() {}
 
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text> {
-    private Text outputKey = new Text();
-    private Text outputValue = new Text();
+    private final Text outputKey = new Text();
+    private final Text outputValue = new Text();
 
     @Override
     public void map(LongWritable key, Text value, Context output)
@@ -94,10 +102,25 @@ public class BulkIngestExample {
     }
   }
 
-  public static void main(String[] args) throws Exception {
+  public static int main(String[] args) throws Exception {
     ClientOpts opts = new ClientOpts();
     opts.parseArgs(BulkIngestExample.class.getName(), args);
+    FileSystem fs = FileSystem.get(opts.getHadoopConfig());
+
+    generateTestData(fs);
+    return ingestTestData(fs, opts);
+  }
 
+  private static void generateTestData(FileSystem fs) throws IOException {
+    try (PrintStream out = new PrintStream(
+        new BufferedOutputStream(fs.create(new Path(outputFile))))) {
+      for (int i = 0; i < numRows; i++) {
+        out.printf("row_%010d\tvalue_%010d%n", i, i);
+      }
+    }
+  }
+
+  private static int ingestTestData(FileSystem fs, ClientOpts opts) throws 
Exception {
     Job job = Job.getInstance(opts.getHadoopConfig());
     job.setJobName(BulkIngestExample.class.getSimpleName());
     job.setJarByClass(BulkIngestExample.class);
@@ -112,13 +135,12 @@ public class BulkIngestExample {
     job.setOutputFormatClass(AccumuloFileOutputFormat.class);
 
     TextInputFormat.setInputPaths(job, new Path(inputDir));
-    AccumuloFileOutputFormat.configure().outputPath(new Path(workDir + 
"/files")).store(job);
+    AccumuloFileOutputFormat.configure().outputPath(new Path(workDir + 
SLASH_FILES)).store(job);
 
-    FileSystem fs = FileSystem.get(opts.getHadoopConfig());
     try (AccumuloClient client = opts.createAccumuloClient()) {
 
       try (PrintStream out = new PrintStream(
-          new BufferedOutputStream(fs.create(new Path(workDir + 
"/splits.txt"))))) {
+          new BufferedOutputStream(fs.create(new Path(workDir + 
SPLITS_TXT))))) {
         Collection<Text> splits = 
client.tableOperations().listSplits(SetupTable.tableName, 100);
         for (Text split : splits)
           out.println(Base64.getEncoder().encodeToString(split.copyBytes()));
@@ -126,17 +148,18 @@ public class BulkIngestExample {
       }
 
       job.setPartitionerClass(RangePartitioner.class);
-      RangePartitioner.setSplitFile(job, workDir + "/splits.txt");
+      RangePartitioner.setSplitFile(job, workDir + SPLITS_TXT);
 
       job.waitForCompletion(true);
-      Path failures = new Path(workDir, "failures");
+      Path failures = new Path(workDir, FAILURES);
       fs.delete(failures, true);
-      fs.mkdirs(new Path(workDir, "failures"));
+      fs.mkdirs(new Path(workDir, FAILURES));
       // With HDFS permissions on, we need to make sure the Accumulo user can 
read/move the rfiles
       FsShell fsShell = new FsShell(opts.getHadoopConfig());
       fsShell.run(new String[] {"-chmod", "-R", "777", workDir});
-      client.tableOperations().importDirectory(workDir + 
"/files").to(SetupTable.tableName).load();
+      client.tableOperations().importDirectory(workDir + 
SLASH_FILES).to(SetupTable.tableName)
+          .load();
     }
-    System.exit(job.isSuccessful() ? 0 : 1);
+    return job.isSuccessful() ? 0 : 1;
   }
 }
diff --git 
a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java 
b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java
index a917783..ef4edb3 100644
--- a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java
@@ -16,25 +16,20 @@
  */
 package org.apache.accumulo.examples.mapreduce.bulk;
 
-import java.io.BufferedOutputStream;
-import java.io.PrintStream;
 import java.util.TreeSet;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.examples.cli.ClientOpts;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 
-public class SetupTable {
+public final class SetupTable {
 
-  static String[] splits = {"row_00000333", "row_00000666"};
-  static String tableName = "test_bulk";
-  static int numRows = 1000;
-  static String outputFile = "bulk/test_1.txt";
+  static final String[] splits = {"row_00000333", "row_00000666"};
+  static final String tableName = "test_bulk";
+
+  private SetupTable() {}
 
   public static void main(String[] args) throws Exception {
     ClientOpts opts = new ClientOpts();
@@ -48,20 +43,11 @@ public class SetupTable {
       }
 
       // create a table with initial partitions
-      TreeSet<Text> intialPartitions = new TreeSet<>();
+      TreeSet<Text> initialPartitions = new TreeSet<>();
       for (String split : splits) {
-        intialPartitions.add(new Text(split));
-      }
-      client.tableOperations().addSplits(tableName, intialPartitions);
-
-      FileSystem fs = FileSystem.get(new Configuration());
-      try (PrintStream out = new PrintStream(
-          new BufferedOutputStream(fs.create(new Path(outputFile))))) {
-        // create some data in outputFile
-        for (int i = 0; i < numRows; i++) {
-          out.println(String.format("row_%010d\tvalue_%010d", i, i));
-        }
+        initialPartitions.add(new Text(split));
       }
+      client.tableOperations().addSplits(tableName, initialPartitions);
     }
   }
 }
diff --git 
a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java 
b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java
index 91a3468..dc354b2 100644
--- 
a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java
+++ 
b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java
@@ -31,9 +31,13 @@ import org.apache.accumulo.examples.cli.ClientOpts;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class VerifyIngest {
+public final class VerifyIngest {
 
   private static final Logger log = 
LoggerFactory.getLogger(VerifyIngest.class);
+  private static final String ROW_FORMAT = "row_%010d";
+  private static final String VALUE_FORMAT = "value_%010d";
+
+  private VerifyIngest() {}
 
   public static void main(String[] args) throws TableNotFoundException {
 
@@ -43,31 +47,31 @@ public class VerifyIngest {
     try (AccumuloClient client = 
Accumulo.newClient().from(opts.getClientPropsPath()).build();
         Scanner scanner = client.createScanner(SetupTable.tableName, 
Authorizations.EMPTY)) {
 
-      scanner.setRange(new Range(String.format("row_%010d", 0), null));
+      scanner.setRange(new Range(String.format(ROW_FORMAT, 0), null));
 
       Iterator<Entry<Key,Value>> si = scanner.iterator();
 
       boolean ok = true;
 
-      for (int i = 0; i < SetupTable.numRows; i++) {
+      for (int i = 0; i < BulkIngestExample.numRows; i++) {
 
         if (si.hasNext()) {
           Entry<Key,Value> entry = si.next();
 
-          if 
(!entry.getKey().getRow().toString().equals(String.format("row_%010d", i))) {
-            log.error("unexpected row key " + 
entry.getKey().getRow().toString() + " expected "
-                + String.format("row_%010d", i));
+          if 
(!entry.getKey().getRow().toString().equals(String.format(ROW_FORMAT, i))) {
+            log.error("unexpected row key {}; expected {}", 
entry.getKey().getRow(),
+                String.format(ROW_FORMAT, i));
             ok = false;
           }
 
-          if (!entry.getValue().toString().equals(String.format("value_%010d", 
i))) {
-            log.error("unexpected value " + entry.getValue().toString() + " 
expected "
-                + String.format("value_%010d", i));
+          if (!entry.getValue().toString().equals(String.format(VALUE_FORMAT, 
i))) {
+            log.error("unexpected value {}; expected {}", entry.getValue(),
+                String.format(VALUE_FORMAT, i));
             ok = false;
           }
 
         } else {
-          log.error("no more rows, expected " + String.format("row_%010d", i));
+          log.error("no more rows, expected {}", String.format(ROW_FORMAT, i));
           ok = false;
           break;
         }
@@ -75,9 +79,10 @@ public class VerifyIngest {
       }
 
       if (ok) {
-        System.out.println("OK");
+        System.out.println("Data verification succeeded!");
         System.exit(0);
       } else {
+        System.out.println("Data verification failed!");
         System.exit(1);
       }
     }

Reply via email to