Repository: beam Updated Branches: refs/heads/master 1a6f2e8f6 -> c01ed083e
Adding metrics() to DataflowPipelineResult. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ac4185ed Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ac4185ed Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ac4185ed Branch: refs/heads/master Commit: ac4185ed3952397590947d79832fa0b1507cce19 Parents: 1a6f2e8 Author: Pablo <[email protected]> Authored: Thu Feb 2 13:42:58 2017 -0800 Committer: Ahmet Altay <[email protected]> Committed: Thu Feb 2 15:08:22 2017 -0800 ---------------------------------------------------------------------- .../runners/dataflow/dataflow_metrics.py | 33 ++++++++++++++++++++ .../runners/dataflow/dataflow_metrics_test.py | 20 ++++++++++++ .../apache_beam/runners/dataflow_runner.py | 4 +++ .../apache_beam/runners/dataflow_runner_test.py | 5 +++ sdks/python/apache_beam/runners/runner.py | 8 +++++ 5 files changed, 70 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ac4185ed/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py new file mode 100644 index 0000000..1d86f2f --- /dev/null +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +DataflowRunner implementation of MetricResults. It is in charge of +responding to queries of current metrics by going to the dataflow +service. +""" + +from apache_beam.metrics.metric import MetricResults + + +# TODO(pabloem)(JIRA-1381) Implement this once metrics are queriable from +# dataflow service +class DataflowMetrics(MetricResults): + + def query(self, filter=None): + return {'counters': [], + 'distributions': []} http://git-wip-us.apache.org/repos/asf/beam/blob/ac4185ed/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py new file mode 100644 index 0000000..5475ac7 --- /dev/null +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Tests corresponding to the DataflowRunner implementation of MetricsResult, +the DataflowMetrics class. +""" http://git-wip-us.apache.org/repos/asf/beam/blob/ac4185ed/sdks/python/apache_beam/runners/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py index abcc764..f02e24b 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow_runner.py @@ -32,6 +32,7 @@ from apache_beam import pvalue from apache_beam.internal import json_value from apache_beam.internal import pickler from apache_beam.pvalue import PCollectionView +from apache_beam.runners.dataflow.dataflow_metrics import DataflowMetrics from apache_beam.runners.runner import PipelineResult from apache_beam.runners.runner import PipelineRunner from apache_beam.runners.runner import PipelineState @@ -648,6 +649,9 @@ class DataflowPipelineResult(PipelineResult): def job_id(self): return self._job.id + def metrics(self): + return DataflowMetrics() + @property def has_job(self): return self._job is not None http://git-wip-us.apache.org/repos/asf/beam/blob/ac4185ed/sdks/python/apache_beam/runners/dataflow_runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow_runner_test.py index 4983899..946e6a8 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow_runner_test.py @@ -27,6 +27,11 @@ from apache_beam.runners.dataflow_runner import DataflowPipelineResult class DataflowRunnerTest(unittest.TestCase): + def test_dataflow_runner_has_metrics(self): + df_result = DataflowPipelineResult('somejob', 'somerunner') + self.assertTrue(df_result.metrics()) + self.assertTrue(df_result.metrics().query()) + @mock.patch('time.sleep', return_value=None) def test_wait_until_finish(self, patched_time_sleep): values_enum = dataflow_api.Job.CurrentStateValueValuesEnum http://git-wip-us.apache.org/repos/asf/beam/blob/ac4185ed/sdks/python/apache_beam/runners/runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index 1a50df4..e14acb1 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -343,6 +343,14 @@ class PipelineResult(object): """ raise NotImplementedError + def metrics(self): + """Returns MetricsResult object to query metrics from the runner. + + Raises: + NotImplementedError: If the runner does not support this operation. + """ + raise NotImplementedError + # pylint: disable=unused-argument def aggregated_values(self, aggregator_or_name): """Return a dict of step names to values of the Aggregator."""
