Repository: kylin Updated Branches: refs/heads/master b5a02430b -> 280f6738b
KYLIN-2996 DeployCoprocessorCLI Log failed tables info Signed-off-by: Li Yang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a79035a6 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a79035a6 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a79035a6 Branch: refs/heads/master Commit: a79035a63c6928eee6d556bd15bc8e5c50880ae8 Parents: b5a0243 Author: kangkaisen <[email protected]> Authored: Wed Sep 6 16:11:48 2017 +0800 Committer: Li Yang <[email protected]> Committed: Thu Dec 14 14:58:00 2017 +0800 ---------------------------------------------------------------------- .../hbase/util/DeployCoprocessorCLI.java | 27 ++++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/a79035a6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java index c437e66..56d5497 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinVersion; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -122,7 +123,7 @@ public class DeployCoprocessorCLI { Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths); logger.info("New coprocessor jar: " + hdfsCoprocessorJar); - List<String> processedTables = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames); + Pair<List<String>, List<String>> results = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames); // Don't remove old jars, missing coprocessor jar will fail hbase // removeOldJars(oldJarPaths, fileSystem); @@ -130,8 +131,10 @@ public class DeployCoprocessorCLI { hbaseAdmin.close(); logger.info("Processed time: " + (System.currentTimeMillis() - start)); - logger.info("Processed tables count: " + processedTables.size()); - logger.info("Processed tables: " + processedTables); + logger.info("Processed tables count: " + results.getFirst().size()); + logger.info("Processed tables: " + results.getFirst()); + logger.error("Failed tables count: " + results.getSecond().size()); + logger.error("Failed tables : " + results.getSecond()); logger.info("Active coprocessor jar: " + hdfsCoprocessorJar); } @@ -157,7 +160,7 @@ public class DeployCoprocessorCLI { ProjectInstance projectInstance = projectManager.getProject(p); List<RealizationEntry> cubeList = projectInstance.getRealizationEntries(RealizationType.CUBE); - for (RealizationEntry cube: cubeList) { + for (RealizationEntry cube : cubeList) { CubeInstance cubeInstance = cubeManager.getCube(cube.getRealization()); for (CubeSegment segment : cubeInstance.getSegments()) { String tableName = segment.getStorageLocationIdentifier(); @@ -279,13 +282,15 @@ public class DeployCoprocessorCLI { return true; } - private static List<String> resetCoprocessorOnHTables(final Admin hbaseAdmin, final Path hdfsCoprocessorJar, List<String> tableNames) throws IOException { + private static Pair<List<String>, List<String>> resetCoprocessorOnHTables(final Admin hbaseAdmin, final Path hdfsCoprocessorJar, List<String> tableNames) throws IOException { List<String> processedTables = Collections.synchronizedList(new ArrayList<String>()); + List<String> failedTables = Collections.synchronizedList(new ArrayList<String>()); + ExecutorService coprocessorPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); CountDownLatch countDownLatch = new CountDownLatch(tableNames.size()); for (final String tableName : tableNames) { - coprocessorPool.execute(new ResetCoprocessorWorker(countDownLatch, hbaseAdmin, hdfsCoprocessorJar, tableName, processedTables)); + coprocessorPool.execute(new ResetCoprocessorWorker(countDownLatch, hbaseAdmin, hdfsCoprocessorJar, tableName, processedTables, failedTables)); } try { @@ -295,7 +300,7 @@ public class DeployCoprocessorCLI { } coprocessorPool.shutdown(); - return processedTables; + return new Pair<>(processedTables, failedTables); } private static class ResetCoprocessorWorker implements Runnable { @@ -304,13 +309,16 @@ public class DeployCoprocessorCLI { private final Path hdfsCoprocessorJar; private final String tableName; private final List<String> processedTables; + private final List<String> failedTables; + + public ResetCoprocessorWorker(CountDownLatch countDownLatch, Admin hbaseAdmin, Path hdfsCoprocessorJar, String tableName, List<String> processedTables, List<String> failedTables) { - public ResetCoprocessorWorker(CountDownLatch countDownLatch, Admin hbaseAdmin, Path hdfsCoprocessorJar, String tableName, List<String> processedTables) { this.countDownLatch = countDownLatch; this.hbaseAdmin = hbaseAdmin; this.hdfsCoprocessorJar = hdfsCoprocessorJar; this.tableName = tableName; this.processedTables = processedTables; + this.failedTables = failedTables; } @Override @@ -319,8 +327,11 @@ public class DeployCoprocessorCLI { boolean isProcessed = resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar); if (isProcessed) { processedTables.add(tableName); + } else { + failedTables.add(tableName); } } catch (Exception ex) { + failedTables.add(tableName); logger.error("Error processing " + tableName, ex); } finally { countDownLatch.countDown();
