This is an automated email from the ASF dual-hosted git repository.
rskraba pushed a commit to branch branch-1.10
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/branch-1.10 by this push:
new 950a14b AVRO-2889: Fix Test Threading Race Condition (#932)
950a14b is described below
commit 950a14b3d9abd97d0879aa3b66eb677036c818eb
Author: Michael A. Smith <[email protected]>
AuthorDate: Thu Jul 16 08:52:51 2020 -0400
AVRO-2889: Fix Test Threading Race Condition (#932)
---
lang/py/avro/test/test_tether_task_runner.py | 6 ++++++
lang/py/avro/tether/tether_task_runner.py | 14 ++++++--------
2 files changed, 12 insertions(+), 8 deletions(-)
diff --git a/lang/py/avro/test/test_tether_task_runner.py
b/lang/py/avro/test/test_tether_task_runner.py
index 10582d3..90e01cb 100644
--- a/lang/py/avro/test/test_tether_task_runner.py
+++ b/lang/py/avro/test/test_tether_task_runner.py
@@ -66,6 +66,12 @@ class TestTetherTaskRunner(unittest.TestCase):
runner =
avro.tether.tether_task_runner.TaskRunner(avro.test.word_count_task.WordCountTask())
runner.start(outputport=parent_port, join=False)
+ for _ in range(12):
+ if runner.server is not None:
+ break
+ time.sleep(1)
+ else:
+ raise RuntimeError("Server never started")
# Test sending various messages to the server and ensuring they
are processed correctly
requestor = avro.tether.tether_task.HTTPRequestor(
diff --git a/lang/py/avro/tether/tether_task_runner.py
b/lang/py/avro/tether/tether_task_runner.py
index 602625c..2e00c61 100644
--- a/lang/py/avro/tether/tether_task_runner.py
+++ b/lang/py/avro/tether/tether_task_runner.py
@@ -141,6 +141,9 @@ class TaskRunner(object):
implements the logic for the mapper and reducer phases
"""
+ server = None
+ sthread = None
+
def __init__(self, task):
"""
Construct the runner
@@ -149,15 +152,11 @@ class TaskRunner(object):
---------------------------------------------------------------
task - An instance of tether task
"""
-
self.log = logging.getLogger("TaskRunner:")
-
- if not(isinstance(task, avro.tether.tether_task.TetherTask)):
- raise ValueError("task must be an instance of tether task")
self.task = task
- self.server = None
- self.sthread = None
+ if not isinstance(task, avro.tether.tether_task.TetherTask):
+ raise ValueError("task must be an instance of tether task")
def start(self, outputport=None, join=True):
"""
@@ -175,7 +174,6 @@ class TaskRunner(object):
we can resume execution in this thread so that we can do
additional
testing
"""
-
port = avro.tether.util.find_port()
address = ("localhost", port)
@@ -189,7 +187,7 @@ class TaskRunner(object):
sthread.start()
self.sthread = sthread
- # This needs to run in a separat thread b\c serve_forever() blocks
+ # This needs to run in a separate thread because serve_forever()
blocks.
self.task.open(port, clientPort=outputport)
# wait for the other thread to finish