This is an automated email from the ASF dual-hosted git repository.
kojiromike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new ce91b2e AVRO-2603: Context Managers to Manage Resources (#689)
ce91b2e is described below
commit ce91b2e67031bd0c0c6fd899ab76fc28d8d5dda4
Author: Michael A. Smith <[email protected]>
AuthorDate: Mon Oct 28 15:27:46 2019 -0400
AVRO-2603: Context Managers to Manage Resources (#689)
Use the Python `with` block better to ensure file descriptors and other
resources get closed.
---
lang/py/test/test_tether_word_count.py | 63 +++++++++++++++-------------------
1 file changed, 27 insertions(+), 36 deletions(-)
diff --git a/lang/py/test/test_tether_word_count.py
b/lang/py/test/test_tether_word_count.py
index 38cd2d9..355621f 100644
--- a/lang/py/test/test_tether_word_count.py
+++ b/lang/py/test/test_tether_word_count.py
@@ -33,6 +33,17 @@ import avro.schema
import avro.tether.tether_task_runner
import set_avro_test_path
+_IN_SCHEMA = '"string"'
+
+# The schema for the output of the mapper and reducer
+_OUT_SCHEMA = """{
+ "type": "record",
+ "name": "Pair",
+ "namespace": "org.apache.avro.mapred",
+ "fields": [{"name": "key", "type": "string"},
+ {"name": "value", "type": "long", "order": "ignore"}]
+}"""
+
class TestTetherWordCount(unittest.TestCase):
"""unittest for a python tethered map-reduce job."""
@@ -53,18 +64,12 @@ class TestTetherWordCount(unittest.TestCase):
if not(os.path.exists(pdir)):
os.mkdir(pdir)
-
- with file(fname,'w') as hf:
- inschema="""{"type":"string"}"""
- writer = avro.datafile.DataFileWriter(hf, avro.io.DatumWriter(inschema),
writers_schema=avro.schema.parse(inschema))
+ datum_writer = avro.io.DatumWriter(_IN_SCHEMA)
+ writers_schema = avro.schema.parse(_IN_SCHEMA)
+ with avro.datafile.DataFileWriter(open(fname, 'wb'), datum_writer,
writers_schema) as writer:
for datum in lines:
writer.append(datum)
- writer.close()
-
-
-
-
def _count_words(self,lines):
"""Return a dictionary counting the words in lines
"""
@@ -91,8 +96,6 @@ class TestTetherWordCount(unittest.TestCase):
exfile = None
try:
-
-
# TODO we use the tempfile module to generate random names
# for the files
base_dir = "/tmp/test_tether_word_count"
@@ -112,22 +115,13 @@ class TestTetherWordCount(unittest.TestCase):
if not(os.path.exists(infile)):
self.fail("Missing the input file {0}".format(infile))
-
- # The schema for the output of the mapper and reducer
- oschema="""
-{"type":"record",
- "name":"Pair","namespace":"org.apache.avro.mapred","fields":[
- {"name":"key","type":"string"},
- {"name":"value","type":"long","order":"ignore"}
- ]
-}
-"""
-
# write the schema to a temporary file
-
osfile=tempfile.NamedTemporaryFile(mode='w',suffix=".avsc",prefix="wordcount",delete=False)
- outschema=osfile.name
- osfile.write(oschema)
- osfile.close()
+ with tempfile.NamedTemporaryFile(mode='wb',
+ suffix=".avsc",
+ prefix="wordcount",
+ delete=False) as osfile:
+ osfile.write(_OUT_SCHEMA)
+ outschema = osfile.name
if not(os.path.exists(outschema)):
self.fail("Missing the schema file")
@@ -166,10 +160,11 @@ python -m avro.tether.tether_task_runner
word_count_task.WordCountTask
# path to where the tests lie
tpath=os.path.split(__file__)[0]
-
exhf=tempfile.NamedTemporaryFile(mode='w',prefix="exec_word_count_",delete=False)
+ with tempfile.NamedTemporaryFile(mode='wb',
+ prefix="exec_word_count_",
+ delete=False) as exhf:
+ exhf.write(script.format((os.pathsep).join([apath,tpath]),srcfile))
exfile=exhf.name
- exhf.write(script.format((os.pathsep).join([apath,tpath]),srcfile))
- exhf.close()
# make it world executable
os.chmod(exfile,0o755)
@@ -183,15 +178,11 @@ python -m avro.tether.tether_task_runner
word_count_task.WordCountTask
proc.wait()
# read the output
- with file(os.path.join(outpath,"part-00000.avro")) as hf:
- reader = avro.datafile.DataFileReader(hf, avro.io.DatumReader())
+ datum_reader = avro.io.DatumReader()
+ outfile = os.path.join(outpath, "part-00000.avro")
+ with avro.datafile.DataFileReader(open(outfile, 'rb'), datum_reader) as
reader:
for record in reader:
self.assertEqual(record["value"],true_counts[record["key"]])
-
- reader.close()
-
- except Exception as e:
- raise
finally:
# close the process
if proc is not None and proc.returncode is None: