Repository: zeppelin Updated Branches: refs/heads/master a791fad59 -> de03a21ba
ZEPPELIN-3236. Make grpc framesize configurable ### What is this PR for? Add one new property `zeppelin.ipython.grpc.framesize` which is an advanced configuration. By default it is 32M which should be sufficient for most of scenarios. ### What type of PR is it? [ Improvement ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3236 ### How should this be tested? * Unit test is added. ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #2802 from zjffdu/ZEPPELIN-3236 and squashes the following commits: ffce774 [Jeff Zhang] ZEPPELIN-3236. Make grpc framesize configurable Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/de03a21b Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/de03a21b Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/de03a21b Branch: refs/heads/master Commit: de03a21ba62084a37fd4ef9bba8c00e33b6644cb Parents: a791fad Author: Jeff Zhang <zjf...@apache.org> Authored: Thu Feb 15 10:30:10 2018 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Wed Feb 21 13:27:27 2018 +0800 ---------------------------------------------------------------------- .../apache/zeppelin/python/IPythonClient.java | 8 ++++ .../zeppelin/python/IPythonInterpreter.java | 6 ++- .../src/main/resources/interpreter-setting.json | 6 +++ .../zeppelin/python/IPythonInterpreterTest.java | 40 ++++++++++++++++++-- 4 files changed, 56 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/de03a21b/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java ---------------------------------------------------------------------- diff --git a/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java b/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java index ac10204..b3bc7fd 100644 --- a/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java +++ b/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java @@ -20,6 +20,7 @@ package org.apache.zeppelin.python; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; import org.apache.zeppelin.python.proto.CancelRequest; import org.apache.zeppelin.python.proto.CancelResponse; @@ -131,11 +132,18 @@ public class IPythonClient { @Override public void onError(Throwable throwable) { try { + interpreterOutput.getInterpreterOutput().write(ExceptionUtils.getStackTrace(throwable)); interpreterOutput.getInterpreterOutput().flush(); } catch (IOException e) { LOGGER.error("Unexpected IOException", e); } LOGGER.error("Fail to call IPython grpc", throwable); + finalResponseBuilder.setStatus(ExecuteStatus.ERROR); + + completedFlag.set(true); + synchronized (completedFlag) { + completedFlag.notify(); + } } @Override http://git-wip-us.apache.org/repos/asf/zeppelin/blob/de03a21b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java ---------------------------------------------------------------------- diff --git a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java index 8078670..10bf530 100644 --- a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java +++ b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.python; +import io.grpc.ManagedChannelBuilder; import org.apache.commons.exec.CommandLine; import org.apache.commons.exec.DefaultExecutor; import org.apache.commons.exec.ExecuteException; @@ -142,7 +143,10 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand int jvmGatewayPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); LOGGER.info("Launching IPython Kernel at port: " + ipythonPort); LOGGER.info("Launching JVM Gateway at port: " + jvmGatewayPort); - ipythonClient = new IPythonClient("127.0.0.1", ipythonPort); + int framesize = Integer.parseInt(getProperty("zeppelin.ipython.grpc.framesize", + 32 * 1024 * 1024 + "")); + ipythonClient = new IPythonClient(ManagedChannelBuilder.forAddress("127.0.0.1", ipythonPort) + .usePlaintext(true).maxInboundMessageSize(framesize)); launchIPythonKernel(ipythonPort); setupJVMGateway(jvmGatewayPort); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/de03a21b/python/src/main/resources/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/python/src/main/resources/interpreter-setting.json b/python/src/main/resources/interpreter-setting.json index d6b3538..3257e58 100644 --- a/python/src/main/resources/interpreter-setting.json +++ b/python/src/main/resources/interpreter-setting.json @@ -40,6 +40,12 @@ "defaultValue": "30000", "description": "time out for ipython launch", "type": "number" + }, + "zeppelin.ipython.grpc.framesize": { + "propertyName": "zeppelin.ipython.grpc.framesize", + "defaultValue": "33554432", + "description": "grpc framesize, default is 32M", + "type": "number" } }, "editor": { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/de03a21b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java ---------------------------------------------------------------------- diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java index ec59482..dfc8c36 100644 --- a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java @@ -56,9 +56,7 @@ public class IPythonInterpreterTest { private static final Logger LOGGER = LoggerFactory.getLogger(IPythonInterpreterTest.class); private IPythonInterpreter interpreter; - @Before - public void setUp() throws InterpreterException { - Properties properties = new Properties(); + public void startInterpreter(Properties properties) throws InterpreterException { interpreter = new IPythonInterpreter(properties); InterpreterGroup mockInterpreterGroup = mock(InterpreterGroup.class); interpreter.setInterpreterGroup(mockInterpreterGroup); @@ -73,9 +71,45 @@ public class IPythonInterpreterTest { @Test public void testIPython() throws IOException, InterruptedException, InterpreterException { + startInterpreter(new Properties()); testInterpreter(interpreter); } + @Test + public void testGrpcFrameSize() throws InterpreterException, IOException { + Properties properties = new Properties(); + properties.setProperty("zeppelin.ipython.grpc.framesize", "4"); + startInterpreter(properties); + + // to make this test can run under both python2 and python3 + InterpreterResult result = interpreter.interpret("from __future__ import print_function", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + InterpreterContext context = getInterpreterContext(); + result = interpreter.interpret("print(11111111111111111111111111111)", context); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + List<InterpreterResultMessage> interpreterResultMessages = context.out.getInterpreterResultMessages(); + assertEquals(1, interpreterResultMessages.size()); + assertTrue(interpreterResultMessages.get(0).getData().contains("Frame size 32 exceeds maximum: 4")); + + // next call continue work + result = interpreter.interpret("print(1)", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + close(); + + // increase framesize to make it work + properties.setProperty("zeppelin.ipython.grpc.framesize", "40"); + startInterpreter(properties); + // to make this test can run under both python2 and python3 + result = interpreter.interpret("from __future__ import print_function", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + context = getInterpreterContext(); + result = interpreter.interpret("print(11111111111111111111111111111)", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + } + public static void testInterpreter(final Interpreter interpreter) throws IOException, InterruptedException, InterpreterException { // to make this test can run under both python2 and python3 InterpreterResult result = interpreter.interpret("from __future__ import print_function", getInterpreterContext());