Repository: kylin
Updated Branches:
  refs/heads/master b5a02430b -> 280f6738b


KYLIN-2996 DeployCoprocessorCLI Log failed tables info

Signed-off-by: Li Yang <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a79035a6
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a79035a6
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a79035a6

Branch: refs/heads/master
Commit: a79035a63c6928eee6d556bd15bc8e5c50880ae8
Parents: b5a0243
Author: kangkaisen <[email protected]>
Authored: Wed Sep 6 16:11:48 2017 +0800
Committer: Li Yang <[email protected]>
Committed: Thu Dec 14 14:58:00 2017 +0800

----------------------------------------------------------------------
 .../hbase/util/DeployCoprocessorCLI.java        | 27 ++++++++++++++------
 1 file changed, 19 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/a79035a6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index c437e66..56d5497 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinVersion;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -122,7 +123,7 @@ public class DeployCoprocessorCLI {
         Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, 
fileSystem, oldJarPaths);
         logger.info("New coprocessor jar: " + hdfsCoprocessorJar);
 
-        List<String> processedTables = resetCoprocessorOnHTables(hbaseAdmin, 
hdfsCoprocessorJar, tableNames);
+        Pair<List<String>, List<String>> results = 
resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames);
 
         // Don't remove old jars, missing coprocessor jar will fail hbase
         // removeOldJars(oldJarPaths, fileSystem);
@@ -130,8 +131,10 @@ public class DeployCoprocessorCLI {
         hbaseAdmin.close();
 
         logger.info("Processed time: " + (System.currentTimeMillis() - start));
-        logger.info("Processed tables count: " + processedTables.size());
-        logger.info("Processed tables: " + processedTables);
+        logger.info("Processed tables count: " + results.getFirst().size());
+        logger.info("Processed tables: " + results.getFirst());
+        logger.error("Failed tables count: " + results.getSecond().size());
+        logger.error("Failed tables : " + results.getSecond());
         logger.info("Active coprocessor jar: " + hdfsCoprocessorJar);
     }
 
@@ -157,7 +160,7 @@ public class DeployCoprocessorCLI {
 
             ProjectInstance projectInstance = projectManager.getProject(p);
             List<RealizationEntry> cubeList = 
projectInstance.getRealizationEntries(RealizationType.CUBE);
-            for (RealizationEntry cube: cubeList) {
+            for (RealizationEntry cube : cubeList) {
                 CubeInstance cubeInstance = 
cubeManager.getCube(cube.getRealization());
                 for (CubeSegment segment : cubeInstance.getSegments()) {
                     String tableName = segment.getStorageLocationIdentifier();
@@ -279,13 +282,15 @@ public class DeployCoprocessorCLI {
         return true;
     }
 
-    private static List<String> resetCoprocessorOnHTables(final Admin 
hbaseAdmin, final Path hdfsCoprocessorJar, List<String> tableNames) throws 
IOException {
+    private static Pair<List<String>, List<String>> 
resetCoprocessorOnHTables(final Admin hbaseAdmin, final Path 
hdfsCoprocessorJar, List<String> tableNames) throws IOException {
         List<String> processedTables = Collections.synchronizedList(new 
ArrayList<String>());
+        List<String> failedTables = Collections.synchronizedList(new 
ArrayList<String>());
+
         ExecutorService coprocessorPool = 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
         CountDownLatch countDownLatch = new CountDownLatch(tableNames.size());
 
         for (final String tableName : tableNames) {
-            coprocessorPool.execute(new ResetCoprocessorWorker(countDownLatch, 
hbaseAdmin, hdfsCoprocessorJar, tableName, processedTables));
+            coprocessorPool.execute(new ResetCoprocessorWorker(countDownLatch, 
hbaseAdmin, hdfsCoprocessorJar, tableName, processedTables, failedTables));
         }
 
         try {
@@ -295,7 +300,7 @@ public class DeployCoprocessorCLI {
         }
 
         coprocessorPool.shutdown();
-        return processedTables;
+        return new Pair<>(processedTables, failedTables);
     }
 
     private static class ResetCoprocessorWorker implements Runnable {
@@ -304,13 +309,16 @@ public class DeployCoprocessorCLI {
         private final Path hdfsCoprocessorJar;
         private final String tableName;
         private final List<String> processedTables;
+        private final List<String> failedTables;
+
+        public ResetCoprocessorWorker(CountDownLatch countDownLatch, Admin 
hbaseAdmin, Path hdfsCoprocessorJar, String tableName, List<String> 
processedTables, List<String> failedTables) {
 
-        public ResetCoprocessorWorker(CountDownLatch countDownLatch, Admin 
hbaseAdmin, Path hdfsCoprocessorJar, String tableName, List<String> 
processedTables) {
             this.countDownLatch = countDownLatch;
             this.hbaseAdmin = hbaseAdmin;
             this.hdfsCoprocessorJar = hdfsCoprocessorJar;
             this.tableName = tableName;
             this.processedTables = processedTables;
+            this.failedTables = failedTables;
         }
 
         @Override
@@ -319,8 +327,11 @@ public class DeployCoprocessorCLI {
                 boolean isProcessed = resetCoprocessor(tableName, hbaseAdmin, 
hdfsCoprocessorJar);
                 if (isProcessed) {
                     processedTables.add(tableName);
+                } else {
+                    failedTables.add(tableName);
                 }
             } catch (Exception ex) {
+                failedTables.add(tableName);
                 logger.error("Error processing " + tableName, ex);
             } finally {
                 countDownLatch.countDown();

Reply via email to