This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new f307080 Better Runtime failure mgmt (#1671) f307080 is described below commit f307080e9558bf34edfffbecf9bdcdbe08fa7427 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Fri Apr 27 11:17:50 2018 -0700 Better Runtime failure mgmt (#1671) * Retooled management of Runtime failures * Removed extra code --- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 2 +- .../functions/instance/JavaInstanceRunnable.java | 7 +- .../pulsar/functions/runtime/JavaInstanceMain.java | 2 +- .../pulsar/functions/runtime/ProcessRuntime.java | 74 ++++++++++------------ .../pulsar/functions/runtime/RuntimeSpawner.java | 12 ++-- .../pulsar/functions/runtime/ThreadRuntime.java | 21 ++---- 6 files changed, 50 insertions(+), 68 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 1369a69..92bd757 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -565,7 +565,7 @@ public class CmdFunctions extends CmdBase { instanceConfig, userCodeFile, containerFactory, - null); + 0); spawners.add(runtimeSpawner); runtimeSpawner.start(); } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index ce5220f..c16bc10 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -88,8 +88,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { @Getter(AccessLevel.PACKAGE) private Table<ByteBuf, ByteBuf> stateTable; - @Getter - private Exception failureException; private JavaInstance javaInstance; private AtomicBoolean running = new AtomicBoolean(true); @@ -229,10 +227,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } } catch (Exception ex) { log.error("Uncaught exception in Java Instance", ex); - if (running.get()) { - failureException = ex; - throw new RuntimeException(ex); - } + throw new RuntimeException(ex); } finally { close(); } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java index 4b14dad..77d8e97 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java @@ -189,7 +189,7 @@ public class JavaInstanceMain { instanceConfig, jarFile, containerFactory, - null); + 0); server = ServerBuilder.forPort(port) .addService(new InstanceControlImpl(runtimeSpawner)) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index 6a46097..ba510cc 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -54,7 +54,8 @@ class ProcessRuntime implements Runtime { @Getter private List<String> processArgs; private int instancePort; - private Exception startupException; + @Getter + private Exception deathException; private ManagedChannel channel; private InstanceControlGrpc.InstanceControlFutureStub stub; @@ -227,8 +228,8 @@ class ProcessRuntime implements Runtime { public void onFailure(Throwable throwable) { FunctionStatus.Builder builder = FunctionStatus.newBuilder(); builder.setRunning(false); - if (startupException != null) { - builder.setFailureException(startupException.getMessage()); + if (deathException != null) { + builder.setFailureException(deathException.getMessage()); } else { builder.setFailureException(throwable.getMessage()); } @@ -280,19 +281,20 @@ class ProcessRuntime implements Runtime { } private void startProcess() { - startupException = null; + deathException = null; try { ProcessBuilder processBuilder = new ProcessBuilder(processArgs); log.info("ProcessBuilder starting the process with args {}", String.join(" ", processBuilder.command())); process = processBuilder.start(); } catch (Exception ex) { log.error("Starting process failed", ex); - startupException = ex; + deathException = ex; return; } try { int exitValue = process.exitValue(); log.error("Instance Process quit unexpectedly with return value " + exitValue); + tryExtractingDeathException(); } catch (IllegalThreadStateException ex) { log.info("Started process successfully"); } @@ -300,49 +302,41 @@ class ProcessRuntime implements Runtime { @Override public boolean isAlive() { - return process != null && process.isAlive(); + if (process == null) { + return false; + } + if (!process.isAlive()) { + if (deathException == null) { + tryExtractingDeathException(); + } + return false; + } + FunctionStatus status; + try { + status = getFunctionStatus().get(); + } catch (Exception ex) { + return false; + } + if (!status.getRunning()) { + if (status.getFailureException() != null && !status.getFailureException().isEmpty()) { + deathException = new Exception(status.getFailureException()); + } + return false; + } + return true; } - @Override - public Exception getDeathException() { - if (isAlive()) return null; - if (startupException != null) return startupException; + private void tryExtractingDeathException() { InputStream errorStream = process.getErrorStream(); try { byte[] errorBytes = new byte[errorStream.available()]; errorStream.read(errorBytes); String errorMessage = new String(errorBytes); - startupException = new RuntimeException(errorMessage); + deathException = new RuntimeException(errorMessage); + log.error("Extracted Process death exception", deathException); } catch (Exception ex) { - startupException = ex; + deathException = ex; + log.error("Error extracting Process death exception", deathException); } - return startupException; - } - - public static void main(String[] args) throws ExecutionException, InterruptedException { - int port = Integer.parseInt(args[0]); - - ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", port) - .usePlaintext(true) - .build(); - InstanceControlFutureStub stub = InstanceControlGrpc.newFutureStub(channel); - ListenableFuture<FunctionStatus> response = stub.getFunctionStatus(Empty.newBuilder().build()); - CompletableFuture<FunctionStatus> future = new CompletableFuture<>(); - Futures.addCallback(response, new FutureCallback<FunctionStatus>() { - @Override - public void onFailure(Throwable throwable) { - log.info("GetFunctionStatus:", throwable); - future.completeExceptionally(throwable); - } - - @Override - public void onSuccess(InstanceCommunication.FunctionStatus t) { - log.info("GetFunctionStatus: {}", t); - future.complete(t); - } - }); - FunctionStatus status = future.get(); - - log.info("Function Status : {}", status); } } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java index 272ca6a..2b020df 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java @@ -43,12 +43,13 @@ public class RuntimeSpawner implements AutoCloseable { private Runtime runtime; private Timer processLivenessCheckTimer; private int numRestarts; - private Long instanceLivenessCheckFreqMs; + private long instanceLivenessCheckFreqMs; + private Exception runtimeDeathException; public RuntimeSpawner(InstanceConfig instanceConfig, String codeFile, - RuntimeFactory containerFactory, Long instanceLivenessCheckFreqMs) { + RuntimeFactory containerFactory, long instanceLivenessCheckFreqMs) { this.instanceConfig = instanceConfig; this.runtimeFactory = containerFactory; this.codeFile = codeFile; @@ -63,7 +64,7 @@ public class RuntimeSpawner implements AutoCloseable { runtime.start(); // monitor function runtime to make sure it is running. If not, restart the function runtime - if (instanceLivenessCheckFreqMs != null) { + if (instanceLivenessCheckFreqMs > 0) { processLivenessCheckTimer = new Timer(); processLivenessCheckTimer.scheduleAtFixedRate(new TimerTask() { @Override @@ -71,6 +72,7 @@ public class RuntimeSpawner implements AutoCloseable { if (!runtime.isAlive()) { log.error("Function Container is dead with exception", runtime.getDeathException()); log.error("Restarting..."); + runtimeDeathException = runtime.getDeathException(); runtime.start(); numRestarts++; } @@ -89,8 +91,8 @@ public class RuntimeSpawner implements AutoCloseable { return runtime.getFunctionStatus().thenApply(f -> { FunctionStatus.Builder builder = FunctionStatus.newBuilder(); builder.mergeFrom(f).setNumRestarts(numRestarts).setInstanceId(instanceConfig.getInstanceId()); - if (runtime.getDeathException() != null) { - builder.setFailureException(runtime.getDeathException().getMessage()); + if (runtimeDeathException != null) { + builder.setFailureException(runtimeDeathException.getMessage()); } return builder.build(); }); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java index 136951e..a0d5fb8 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java @@ -80,14 +80,6 @@ class ThreadRuntime implements Runtime { public void uncaughtException(Thread t, Throwable e) { startupException = new Exception(e); log.error("Error occured in java instance:", e); - try { - Thread.sleep(500); - } catch (InterruptedException e1) { - //ignore - } - // restart - start(); - } }); this.fnThread.start(); @@ -117,13 +109,14 @@ class ThreadRuntime implements Runtime { @Override public CompletableFuture<FunctionStatus> getFunctionStatus() { - FunctionStatus.Builder functionStatusBuilder = javaInstanceRunnable.getFunctionStatus(); - if (javaInstanceRunnable.getFailureException() != null) { + if (!isAlive()) { + FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder(); functionStatusBuilder.setRunning(false); - functionStatusBuilder.setFailureException(javaInstanceRunnable.getFailureException().getMessage()); - } else { - functionStatusBuilder.setRunning(true); + functionStatusBuilder.setFailureException(getDeathException().getMessage()); + return CompletableFuture.completedFuture(functionStatusBuilder.build()); } + FunctionStatus.Builder functionStatusBuilder = javaInstanceRunnable.getFunctionStatus(); + functionStatusBuilder.setRunning(true); return CompletableFuture.completedFuture(functionStatusBuilder.build()); } @@ -147,8 +140,6 @@ class ThreadRuntime implements Runtime { return null; } else if (null != startupException) { return startupException; - } else if (null != javaInstanceRunnable){ - return javaInstanceRunnable.getFailureException(); } else { return null; } -- To stop receiving notification emails like this one, please contact si...@apache.org.