Repository: beam Updated Branches: refs/heads/master 69343a609 -> 75d7b273c
[BEAM-1708] Improve error message when GCP not installed Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/233a9bd3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/233a9bd3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/233a9bd3 Branch: refs/heads/master Commit: 233a9bd3c503182dee004ac52acf37ace4eeac12 Parents: 69343a6 Author: Sourabh Bajaj <[email protected]> Authored: Mon Apr 10 14:45:01 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Thu Apr 13 14:54:37 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/filesystems_util.py | 7 ++++++- sdks/python/apache_beam/io/gcp/bigquery.py | 19 +++++++++++++++++++ .../io/gcp/datastore/v1/datastoreio.py | 20 ++++++++++++++++++++ .../runners/dataflow/dataflow_runner.py | 9 +++++++-- sdks/python/apache_beam/runners/runner.py | 10 +++++++++- 5 files changed, 61 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/233a9bd3/sdks/python/apache_beam/io/filesystems_util.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filesystems_util.py b/sdks/python/apache_beam/io/filesystems_util.py index 47c2361..6d21298 100644 --- a/sdks/python/apache_beam/io/filesystems_util.py +++ b/sdks/python/apache_beam/io/filesystems_util.py @@ -25,7 +25,12 @@ def get_filesystem(path): provided in the input. """ if path.startswith('gs://'): - from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem + try: + from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem + except ImportError: + raise ImportError( + 'Google Cloud Platform IO not available, ' + 'please install apache_beam[gcp]') return GCSFileSystem() else: return LocalFileSystem() http://git-wip-us.apache.org/repos/asf/beam/blob/233a9bd3/sdks/python/apache_beam/io/gcp/bigquery.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 788c069..9a8174a 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -367,6 +367,15 @@ class BigQuerySource(dataflow_io.NativeSource): (3) both a table and a query is specified. """ + # Import here to avoid adding the dependency for local running scenarios. + try: + # pylint: disable=wrong-import-order, wrong-import-position + from apitools.base.py import * + except ImportError: + raise ImportError( + 'Google Cloud IO not available, ' + 'please install apache_beam[gcp]') + if table is not None and query is not None: raise ValueError('Both a BigQuery table and a query were specified.' ' Please specify only one of these.') @@ -467,6 +476,16 @@ class BigQuerySink(dataflow_io.NativeSink): ValueError: if the table reference as a string does not match the expected format. """ + + # Import here to avoid adding the dependency for local running scenarios. + try: + # pylint: disable=wrong-import-order, wrong-import-position + from apitools.base.py import * + except ImportError: + raise ImportError( + 'Google Cloud IO not available, ' + 'please install apache_beam[gcp]') + self.table_reference = _parse_table_reference(table, dataset, project) # Transform the table schema into a bigquery.TableSchema instance. if isinstance(schema, basestring): http://git-wip-us.apache.org/repos/asf/beam/blob/233a9bd3/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py index af0c72b..e8ca05d 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py @@ -92,6 +92,16 @@ class ReadFromDatastore(PTransform): namespace: An optional namespace. num_splits: Number of splits for the query. """ + + # Import here to avoid adding the dependency for local running scenarios. + try: + # pylint: disable=wrong-import-order, wrong-import-position + from apitools.base.py import * + except ImportError: + raise ImportError( + 'Google Cloud IO not available, ' + 'please install apache_beam[gcp]') + logging.warning('datastoreio read transform is experimental.') super(ReadFromDatastore, self).__init__() @@ -368,6 +378,16 @@ class _Mutate(PTransform): class WriteToDatastore(_Mutate): """A ``PTransform`` to write a ``PCollection[Entity]`` to Cloud Datastore.""" def __init__(self, project): + + # Import here to avoid adding the dependency for local running scenarios. + try: + # pylint: disable=wrong-import-order, wrong-import-position + from apitools.base.py import * + except ImportError: + raise ImportError( + 'Google Cloud IO not available, ' + 'please install apache_beam[gcp]') + super(WriteToDatastore, self).__init__( project, WriteToDatastore.to_upsert_mutation) http://git-wip-us.apache.org/repos/asf/beam/blob/233a9bd3/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 1a935c1..1a92c26 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -152,8 +152,13 @@ class DataflowRunner(PipelineRunner): def run(self, pipeline): """Remotely executes entire pipeline or parts reachable from node.""" # Import here to avoid adding the dependency for local running scenarios. - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.runners.dataflow.internal import apiclient + try: + # pylint: disable=wrong-import-order, wrong-import-position + from apache_beam.runners.dataflow.internal import apiclient + except ImportError: + raise ImportError( + 'Google Cloud Dataflow runner not available, ' + 'please install apache_beam[gcp]') self.job = apiclient.Job(pipeline.options) # The superclass's run will trigger a traversal of all reachable nodes. http://git-wip-us.apache.org/repos/asf/beam/blob/233a9bd3/sdks/python/apache_beam/runners/runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index de9c892..7e7ec24 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -78,7 +78,15 @@ def create_runner(runner_name): if '.' in runner_name: module, runner = runner_name.rsplit('.', 1) - return getattr(__import__(module, {}, {}, [runner], -1), runner)() + try: + return getattr(__import__(module, {}, {}, [runner], -1), runner)() + except ImportError: + if runner_name in _KNOWN_DATAFLOW_RUNNERS: + raise ImportError( + 'Google Cloud Dataflow runner not available, ' + 'please install apache_beam[gcp]') + else: + raise else: raise ValueError( 'Unexpected pipeline runner: %s. Valid values are %s '
