Fix insert-loadgen example to handle insert errors

This commit makes a few changes:

- multi-threading in insert-loadgen is removed. insert-loadgen used
  separate clients per-thread anyway, so there isn't much advantage to
  running with threads over just starting more instances as separate
  processes. The threading made shutting down on errors harder since the
  shutdown had to be communicated to all threads.
- insert-loadgen now fails on row errors by periodically checking the
  accumulated errors instead of ignoring them.
- All java projects have been updated to use the latest 1.1.0 Kudu
  client.


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4d2c46f2
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4d2c46f2
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4d2c46f2

Branch: refs/heads/master
Commit: 4d2c46f236562f0631ce5c6020431e0f93a5f55a
Parents: 81157cc
Author: Dan Burkert <danburk...@apache.org>
Authored: Tue Dec 6 20:37:10 2016 -0800
Committer: Dan Burkert <danburk...@apache.org>
Committed: Wed Dec 7 09:51:56 2016 -0800

----------------------------------------------------------------------
 java/collectl/pom.xml                           |  2 +-
 java/insert-loadgen/pom.xml                     |  2 +-
 .../kududb/examples/loadgen/InsertLoadgen.java  | 68 ++++++--------------
 java/java-sample/pom.xml                        |  2 +-
 4 files changed, 22 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/4d2c46f2/java/collectl/pom.xml
----------------------------------------------------------------------
diff --git a/java/collectl/pom.xml b/java/collectl/pom.xml
index 0744dec..3727201 100644
--- a/java/collectl/pom.xml
+++ b/java/collectl/pom.xml
@@ -57,7 +57,7 @@
     <dependency>
       <groupId>org.apache.kudu</groupId>
       <artifactId>kudu-client</artifactId>
-      <version>0.10.0</version>
+      <version>1.1.0</version>
     </dependency>
 
     <!-- for logging messages -->

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d2c46f2/java/insert-loadgen/pom.xml
----------------------------------------------------------------------
diff --git a/java/insert-loadgen/pom.xml b/java/insert-loadgen/pom.xml
index e7a4872..09bc0d1 100644
--- a/java/insert-loadgen/pom.xml
+++ b/java/insert-loadgen/pom.xml
@@ -57,7 +57,7 @@
     <dependency>
       <groupId>org.apache.kudu</groupId>
       <artifactId>kudu-client</artifactId>
-      <version>0.10.0</version>
+      <version>1.1.0</version>
     </dependency>
 
     <!-- for logging messages -->

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d2c46f2/java/insert-loadgen/src/main/java/org/kududb/examples/loadgen/InsertLoadgen.java
----------------------------------------------------------------------
diff --git 
a/java/insert-loadgen/src/main/java/org/kududb/examples/loadgen/InsertLoadgen.java
 
b/java/insert-loadgen/src/main/java/org/kududb/examples/loadgen/InsertLoadgen.java
index 2b32af1..4b76c34 100644
--- 
a/java/insert-loadgen/src/main/java/org/kududb/examples/loadgen/InsertLoadgen.java
+++ 
b/java/insert-loadgen/src/main/java/org/kududb/examples/loadgen/InsertLoadgen.java
@@ -1,5 +1,10 @@
 package org.kududb.examples.loadgen;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.Insert;
@@ -9,14 +14,7 @@ import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.PartialRow;
 import org.apache.kudu.client.SessionConfiguration;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-
 public class InsertLoadgen {
-
   private static class RandomDataGenerator {
     private final Random rng;
     private final int index;
@@ -50,7 +48,7 @@ public class InsertLoadgen {
           row.addInt(index, rng.nextInt(Integer.MAX_VALUE));
           return;
         case INT64:
-        case TIMESTAMP:
+        case UNIXTIME_MICROS:
           row.addLong(index, rng.nextLong());
           return;
         case BINARY:
@@ -76,10 +74,16 @@ public class InsertLoadgen {
     }
   }
 
-  public static void runLoad(String masterHost, String tableName) {
-    KuduClient client = new KuduClient.KuduClientBuilder(masterHost).build();
+  public static void main(String[] args) throws Exception {
+    if (args.length != 2) {
+      System.err.println("Usage: InsertLoadgen kudu_master_host kudu_table");
+      System.exit(1);
+    }
+
+    String masterHost = args[0];
+    String tableName = args[1];
 
-    try {
+    try (KuduClient client = new 
KuduClient.KuduClientBuilder(masterHost).build()) {
       KuduTable table = client.openTable(tableName);
       Schema schema = table.getSchema();
       List<RandomDataGenerator> generators = new 
ArrayList<>(schema.getColumnCount());
@@ -89,52 +93,18 @@ public class InsertLoadgen {
 
       KuduSession session = client.newSession();
       
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
-      while (true) {
+      for (int insertCount = 0; ; insertCount++) {
         Insert insert = table.newInsert();
         PartialRow row = insert.getRow();
         for (int i = 0; i < schema.getColumnCount(); i++) {
           generators.get(i).generateColumnData(row);
         }
         session.apply(insert);
-      }
 
-    } catch (Exception e) {
-      e.printStackTrace();
-    } finally {
-      try {
-        client.shutdown();
-      } catch (Exception e) {
-        e.printStackTrace();
-      }
-    }
-  }
-
-  public static void main(String[] args) {
-    if (args.length != 3) {
-      System.err.println("Usage: InsertLoadgen kudu_master_host kudu_table 
num_threads");
-      System.exit(1);
-    }
-
-    final String masterHost = args[0];
-    final String tableName = args[1];
-    int numThreads = Integer.parseInt(args[2]);
-
-    final CountDownLatch latch = new CountDownLatch(numThreads);
-
-    List<Thread> threads = new ArrayList<>(numThreads);
-    for (int i = 0; i < numThreads; i++) {
-      threads.add(new Thread(new Runnable() {
-        public void run() {
-          runLoad(masterHost, tableName);
-          latch.countDown();
+        if (insertCount % 1000 == 0 && session.countPendingErrors() > 0) {
+          throw new 
RuntimeException(session.getPendingErrors().getRowErrors()[0].toString());
         }
-      }));
-      threads.get(i).start();
-    }
-    try {
-      latch.await();
-    } catch (InterruptedException e) {
-      e.printStackTrace();
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d2c46f2/java/java-sample/pom.xml
----------------------------------------------------------------------
diff --git a/java/java-sample/pom.xml b/java/java-sample/pom.xml
index a948bf3..b078b04 100644
--- a/java/java-sample/pom.xml
+++ b/java/java-sample/pom.xml
@@ -57,7 +57,7 @@
     <dependency>
       <groupId>org.apache.kudu</groupId>
       <artifactId>kudu-client</artifactId>
-      <version>0.10.0</version>
+      <version>1.1.0</version>
     </dependency>
 
     <!-- for logging messages -->

Reply via email to