Updates places in SDK that creates thread pools. Moves ThreadPool creation to a util function. Records and resets logging level due to this being reset by apitools when used with a ThreadPool.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/51afc1cc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/51afc1cc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/51afc1cc Branch: refs/heads/master Commit: 51afc1ccfe78a0657b5f9bc139d1d4e7938ed672 Parents: f29527f Author: Chamikara Jayalath <chamik...@google.com> Authored: Sat Jan 28 08:54:33 2017 -0800 Committer: Davor Bonaci <da...@google.com> Committed: Mon Jan 30 12:43:37 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/internal/util.py | 33 ++++++++++++++++++++++ sdks/python/apache_beam/io/filebasedsource.py | 17 +++-------- sdks/python/apache_beam/io/fileio.py | 11 ++------ 3 files changed, 40 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/51afc1cc/sdks/python/apache_beam/internal/util.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/util.py b/sdks/python/apache_beam/internal/util.py index 2d12d49..5b31e88 100644 --- a/sdks/python/apache_beam/internal/util.py +++ b/sdks/python/apache_beam/internal/util.py @@ -17,6 +17,11 @@ """Utility functions used throughout the package.""" +import logging +from multiprocessing.pool import ThreadPool +import threading +import weakref + class ArgumentPlaceholder(object): """A place holder object replacing PValues in argument lists. @@ -92,3 +97,31 @@ def insert_values_in_args(args, kwargs, values): (k, v_iter.next()) if isinstance(v, ArgumentPlaceholder) else (k, v) for k, v in sorted(kwargs.iteritems())) return (new_args, new_kwargs) + + +def run_using_threadpool(fn_to_execute, inputs, pool_size): + """Runs the given function on given inputs using a thread pool. + + Args: + fn_to_execute: Function to execute + inputs: Inputs on which given function will be executed in parallel. + pool_size: Size of thread pool. + Returns: + Results retrieved after executing the given function on given inputs. + """ + + # ThreadPool crashes in old versions of Python (< 2.7.5) if created + # from a child thread. (http://bugs.python.org/issue10015) + if not hasattr(threading.current_thread(), '_children'): + threading.current_thread()._children = weakref.WeakKeyDictionary() + pool = ThreadPool(min(pool_size, len(inputs))) + try: + # We record and reset logging level here since 'apitools' library Beam + # depends on updates the logging level when used with a threadpool - + # https://github.com/google/apitools/issues/141 + # TODO: Remove this once above issue in 'apitools' is fixed. + old_level = logging.getLogger().level + return pool.map(fn_to_execute, inputs) + finally: + pool.terminate() + logging.getLogger().setLevel(old_level) http://git-wip-us.apache.org/repos/asf/beam/blob/51afc1cc/sdks/python/apache_beam/io/filebasedsource.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index 1bfde25..582d673 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -26,11 +26,9 @@ For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``. """ import random -import threading -import weakref -from multiprocessing.pool import ThreadPool from apache_beam.internal import pickler +from apache_beam.internal import util from apache_beam.io import concat_source from apache_beam.io import fileio from apache_beam.io import iobase @@ -158,16 +156,9 @@ class FileBasedSource(iobase.BoundedSource): return [fileio.ChannelFactory.size_in_bytes(file_names[0])] else: if pattern is None: - # ThreadPool crashes in old versions of Python (< 2.7.5) if created - # from a child thread. (http://bugs.python.org/issue10015) - if not hasattr(threading.current_thread(), '_children'): - threading.current_thread()._children = weakref.WeakKeyDictionary() - pool = ThreadPool( - min(MAX_NUM_THREADS_FOR_SIZE_ESTIMATION, len(file_names))) - try: - return pool.map(fileio.ChannelFactory.size_in_bytes, file_names) - finally: - pool.terminate() + return util.run_using_threadpool( + fileio.ChannelFactory.size_in_bytes, file_names, + MAX_NUM_THREADS_FOR_SIZE_ESTIMATION) else: file_sizes = fileio.ChannelFactory.size_of_files_in_glob(pattern, file_names) http://git-wip-us.apache.org/repos/asf/beam/blob/51afc1cc/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index f67dca9..97cf387 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -22,16 +22,14 @@ import bz2 import cStringIO import glob import logging -from multiprocessing.pool import ThreadPool import os import re import shutil -import threading import time import zlib -import weakref from apache_beam import coders +from apache_beam.internal import util from apache_beam.io import gcsio from apache_beam.io import iobase from apache_beam.transforms.display import DisplayDataItem @@ -663,11 +661,8 @@ class FileSink(iobase.Sink): logging.debug('Rename successful: %s -> %s', src, dest) return exceptions - # ThreadPool crashes in old versions of Python (< 2.7.5) if created from a - # child thread. (http://bugs.python.org/issue10015) - if not hasattr(threading.current_thread(), '_children'): - threading.current_thread()._children = weakref.WeakKeyDictionary() - exception_batches = ThreadPool(num_threads).map(_rename_batch, batches) + exception_batches = util.run_using_threadpool( + _rename_batch, batches, num_threads) all_exceptions = [] for exceptions in exception_batches: