This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 7ccf608 [ZEPPELIN-4091] Fix concurrent autocomplete and execute for
Ipython
7ccf608 is described below
commit 7ccf608ee72470a38cf07ea16cba2cd292b6f39c
Author: marc hurabielle <[email protected]>
AuthorDate: Sat Sep 7 15:22:32 2019 +0900
[ZEPPELIN-4091] Fix concurrent autocomplete and execute for Ipython
### What is this PR for?
The pr is to fix a bug that will make the **ipython** `execute_interactive`
hang forever if a auto `complete` call is make at the same time. (see unit test
for example that is failing on master).
For now the fix is to synchronize those method : `execute` / `complete`. It
will not bring regression because anyway, the kernel does not support
concurrent execute and auto complete (see
https://github.com/jupyter/notebook/issues/3763)
### What type of PR is it?
Bug Fix
### Todos
* [x] - unit test failing in master / succeed on this branch
* [x] - fix with lock
### What is the Jira issue?
It is one part of the jira issue. Other fix will come soon
https://issues.apache.org/jira/browse/ZEPPELIN-4091
### How should this be tested?
* First time? Setup Travis CI as described on
https://zeppelin.apache.org/contribution/contributions.html#continuous-integration
* Strongly recommended: add automated unit tests for any new or changed
behavior
* Outline any manual steps to test the PR here.
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no
Author: marc hurabielle <[email protected]>
Closes #3336 from AyWa/fix/concurrent-auto-complete and squashes the
following commits:
86dab7345 [marc hurabielle] fix rebase
5bed19496 [marc hurabielle] fix lint
6e48c1380 [marc hurabielle] try single threaded
f14d8b242 [marc hurabielle] Revert "just test ci behavior"
be6663f89 [marc hurabielle] just test ci behavior
bc2b4f6e6 [marc hurabielle] bring back test
d43f03da9 [marc hurabielle] use initIntpProperties instead of empty one
c37414cc2 [marc hurabielle] increase timeout
f7cae9538 [marc hurabielle] move synchronize near the thread check
616f0122f [marc hurabielle] add test to ensure that autocomplete and
interpret can be call concurrently
409b75f0f [marc hurabielle] add lock to ensure ipython execute will not be
stuck forever when complete is call
---
.../main/resources/grpc/python/ipython_server.py | 74 ++++++++++++----------
.../zeppelin/python/IPythonInterpreterTest.java | 50 +++++++++++++++
2 files changed, 89 insertions(+), 35 deletions(-)
diff --git a/python/src/main/resources/grpc/python/ipython_server.py
b/python/src/main/resources/grpc/python/ipython_server.py
index 3fd0a8c..47f67b7 100644
--- a/python/src/main/resources/grpc/python/ipython_server.py
+++ b/python/src/main/resources/grpc/python/ipython_server.py
@@ -38,6 +38,10 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
def __init__(self, server):
self._status = ipython_pb2.STARTING
self._server = server
+ # issue with execute_interactive and auto completion:
https://github.com/jupyter/jupyter_client/issues/429
+ # in all case because ipython does not support run and auto completion
at the same time: https://github.com/jupyter/notebook/issues/3763
+ # For now we will lock to ensure that there is no concurrent bug that
can "hang" the kernel
+ self._lock = threading.Lock()
def start(self):
print("starting...")
@@ -83,43 +87,42 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
payload_reply = []
def execute_worker():
reply = self._kc.execute_interactive(request.code,
- output_hook=_output_hook,
- timeout=None)
+ output_hook=_output_hook,
+ timeout=None)
payload_reply.append(reply)
t = threading.Thread(name="ConsumerThread", target=execute_worker)
- t.start()
-
- # We want to ensure that the kernel is alive because in case of OOM or
other errors
- # Execution might be stuck there:
- #
https://github.com/jupyter/jupyter_client/blob/master/jupyter_client/blocking/client.py#L32
- while t.is_alive() and self.isKernelAlive():
- while not text_queue.empty():
- output = text_queue.get()
- yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
- type=ipython_pb2.TEXT,
- output=output)
- while not html_queue.empty():
- output = html_queue.get()
- yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
- type=ipython_pb2.HTML,
- output=output)
- while not stderr_queue.empty():
- output = stderr_queue.get()
- yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
- type=ipython_pb2.TEXT,
- output=output)
- while not png_queue.empty():
- output = png_queue.get()
- yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
- type=ipython_pb2.PNG,
- output=output)
- while not jpeg_queue.empty():
- output = jpeg_queue.get()
- yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
- type=ipython_pb2.JPEG,
- output=output)
-
+ with self._lock:
+ t.start()
+ # We want to ensure that the kernel is alive because in case of
OOM or other errors
+ # Execution might be stuck there:
+ #
https://github.com/jupyter/jupyter_client/blob/master/jupyter_client/blocking/client.py#L32
+ while t.is_alive() and self.isKernelAlive():
+ while not text_queue.empty():
+ output = text_queue.get()
+ yield
ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
+ type=ipython_pb2.TEXT,
+ output=output)
+ while not html_queue.empty():
+ output = html_queue.get()
+ yield
ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
+ type=ipython_pb2.HTML,
+ output=output)
+ while not stderr_queue.empty():
+ output = stderr_queue.get()
+ yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
+ type=ipython_pb2.TEXT,
+ output=output)
+ while not png_queue.empty():
+ output = png_queue.get()
+ yield
ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
+ type=ipython_pb2.PNG,
+ output=output)
+ while not jpeg_queue.empty():
+ output = jpeg_queue.get()
+ yield
ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
+ type=ipython_pb2.JPEG,
+ output=output)
# if kernel is not alive (should be same as thread is still alive),
means that we face
# an unexpected issue.
@@ -169,7 +172,8 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
return ipython_pb2.CancelResponse()
def complete(self, request, context):
- reply = self._kc.complete(request.code, request.cursor, reply=True,
timeout=None)
+ with self._lock:
+ reply = self._kc.complete(request.code, request.cursor,
reply=True, timeout=None)
return
ipython_pb2.CompletionResponse(matches=reply['content']['matches'])
def status(self, request, context):
diff --git
a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
index e084bfe..87e5071 100644
---
a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
+++
b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
@@ -27,12 +27,19 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static junit.framework.TestCase.assertTrue;
@@ -280,6 +287,49 @@ public class IPythonInterpreterTest extends
BasePythonInterpreterTest {
}
@Test
+ public void
testIpython_shouldNotHang_whenCallingAutoCompleteAndInterpretConcurrently()
+ throws InterpreterException,
+ InterruptedException, TimeoutException, ExecutionException {
+ tearDown();
+ Properties properties = initIntpProperties();
+ startInterpreter(properties);
+ final String code = "import time\n"
+ + "print(1)\n"
+ + "time.sleep(10)\n"
+ + "print(2)";
+ final String base = "time.";
+
+ // The goal of this test is to ensure that concurrent interpret and
complete
+ // will not make execute hang forever.
+ ExecutorService pool = Executors.newFixedThreadPool(2);
+ FutureTask<InterpreterResult> interpretFuture =
+ new FutureTask(new Callable() {
+ @Override
+ public Object call() throws Exception {
+ return interpreter.interpret(code, getInterpreterContext());
+ }
+ });
+ FutureTask<List<InterpreterCompletion>> completionFuture =
+ new FutureTask(new Callable() {
+ @Override
+ public Object call() throws Exception {
+ return interpreter.completion(base, base.length(),
getInterpreterContext());
+ }
+ });
+
+ pool.execute(interpretFuture);
+ // we sleep to ensure that the paragraph is running
+ Thread.sleep(3000);
+ pool.execute(completionFuture);
+
+ // We ensure that running and auto completion are not hanging.
+ InterpreterResult res = interpretFuture.get(20000, TimeUnit.MILLISECONDS);
+ List<InterpreterCompletion> autoRes = completionFuture.get(1000,
TimeUnit.MILLISECONDS);
+ assertTrue(res.code().name().equals("SUCCESS"));
+ assertTrue(autoRes.size() > 0);
+ }
+
+ @Test
public void testGrpcFrameSize() throws InterpreterException, IOException {
tearDown();