This is an automated email from the ASF dual-hosted git repository.

mikhail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dd6db3  [BEAM-7969] Report FnAPI counters as deltas in streaming 
jobs. (#9330)
8dd6db3 is described below

commit 8dd6db354df29c8a03f78e03185d087c4f028011
Author: Mikhail Gryzykhin <[email protected]>
AuthorDate: Thu Aug 22 11:24:11 2019 -0700

    [BEAM-7969] Report FnAPI counters as deltas in streaming jobs. (#9330)
    
    * Report FnAPI counters as deltas in streaming jobs.
    
    FnAPI counters were not reported in streaming worker.
    Streaming Dataflow jobs do not tie counters to workitems,
    but report them as deltas on same workitem instead.
    
    This change picks only latest counters per workitem
    from FnAPI harness, treats them as updates and reports
    to DFE.
---
 .../dataflow/worker/StreamingDataflowWorker.java   |  31 +++-
 ...dataflow_exercise_streaming_metrics_pipeline.py | 100 +++++++++++++
 ...low_exercise_streaming_metrics_pipeline_test.py | 162 +++++++++++++++++++++
 3 files changed, 292 insertions(+), 1 deletion(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 57fd8d3..070212e 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -400,9 +400,13 @@ public class StreamingDataflowWorker {
   private final StreamingDataflowWorkerOptions options;
   private final boolean windmillServiceEnabled;
   private final long clientId;
+
   private final MetricTrackingWindmillServerStub metricTrackingWindmillServer;
   private final CounterSet pendingDeltaCounters = new CounterSet();
   private final CounterSet pendingCumulativeCounters = new CounterSet();
+  private final java.util.concurrent.ConcurrentLinkedQueue<CounterUpdate> 
pendingMonitoringInfos =
+      new ConcurrentLinkedQueue<>();
+
   // Map from stage name to StageInfo containing metrics container registry 
and per stage counters.
   private final ConcurrentMap<String, StageInfo> stageInfoMap = new 
ConcurrentHashMap();
 
@@ -1311,6 +1315,9 @@ public class StreamingDataflowWorker {
       // Blocks while executing work.
       executionState.getWorkExecutor().execute();
 
+      Iterables.addAll(
+          this.pendingMonitoringInfos, 
executionState.getWorkExecutor().extractMetricUpdates());
+
       commitCallbacks.putAll(executionState.getContext().flushState());
 
       // Release the execution state for another thread to use.
@@ -1869,7 +1876,6 @@ public class StreamingDataflowWorker {
   /** Sends counter updates to Dataflow backend. */
   private void sendWorkerUpdatesToDataflowService(
       CounterSet deltaCounters, CounterSet cumulativeCounters) throws 
IOException {
-
     // Throttle time is tracked by the windmillServer but is reported to DFE 
here.
     windmillQuotaThrottling.addValue(windmillServer.getAndResetThrottleTime());
     if (memoryMonitor.isThrashing()) {
@@ -1884,6 +1890,28 @@ public class StreamingDataflowWorker {
           cumulativeCounters.extractUpdates(false, 
DataflowCounterUpdateExtractor.INSTANCE));
       counterUpdates.addAll(
           
deltaCounters.extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE));
+      if (hasExperiment(options, "beam_fn_api")) {
+        while (!this.pendingMonitoringInfos.isEmpty()) {
+          final CounterUpdate item = this.pendingMonitoringInfos.poll();
+
+          // This change will treat counter as delta.
+          // This is required because we receive cumulative results from FnAPI 
harness,
+          // while streaming job is expected to receive delta updates to 
counters on same
+          // WorkItem.
+          if (item.getCumulative()) {
+            item.setCumulative(false);
+          } else {
+            // In current world all counters coming from FnAPI are cumulative.
+            // This is a safety check in case new counter type appears in 
FnAPI.
+            throw new UnsupportedOperationException(
+                "FnApi counters are expected to provide cumulative values."
+                    + " Please, update convertion to delta logic"
+                    + " if non-cumulative counter type is required.");
+          }
+
+          counterUpdates.add(item);
+        }
+      }
     }
 
     // Handle duplicate counters from different stages. Store all the counters 
in a multi-map and
@@ -1943,6 +1971,7 @@ public class StreamingDataflowWorker {
             .setWorkItemId(WINDMILL_COUNTER_UPDATE_WORK_ID)
             .setErrors(errors)
             .setCounterUpdates(counterUpdates);
+
     workUnitClient.reportWorkItemStatus(workItemStatus);
 
     // Send any counters appearing more than once in subsequent RPCs:
diff --git 
a/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_streaming_metrics_pipeline.py
 
b/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_streaming_metrics_pipeline.py
new file mode 100644
index 0000000..fffd413
--- /dev/null
+++ 
b/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_streaming_metrics_pipeline.py
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A word-counting workflow."""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import time
+
+import apache_beam as beam
+from apache_beam.metrics import Metrics
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+SLEEP_TIME_SECS = 1
+
+
+class StreamingUserMetricsDoFn(beam.DoFn):
+  """Generates user metrics and outputs same element."""
+
+  def __init__(self):
+    self.double_message_counter = Metrics.counter(self.__class__,
+                                                  'double_msg_counter_name')
+    self.msg_len_dist_metric = Metrics.distribution(
+        self.__class__, 'msg_len_dist_metric_name')
+
+  def start_bundle(self):
+    time.sleep(SLEEP_TIME_SECS)
+
+  def process(self, element):
+    """Returns the processed element and increments the metrics."""
+
+    text_line = element.strip()
+
+    self.double_message_counter.inc()
+    self.double_message_counter.inc()
+    self.msg_len_dist_metric.update(len(text_line))
+
+    logging.debug("Done processing returning element array: '%s'", element)
+
+    return [element]
+
+  def finish_bundle(self):
+    time.sleep(SLEEP_TIME_SECS)
+
+
+def run(argv=None):
+  """Given an initialized Pipeline applies transforms and runs it."""
+
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--output_topic', required=True,
+      help=('Output PubSub topic of the form '
+            '"projects/<PROJECT>/topic/<TOPIC>".'))
+  group = parser.add_mutually_exclusive_group(required=True)
+  group.add_argument(
+      '--input_topic',
+      help=('Input PubSub topic of the form '
+            '"projects/<PROJECT>/topics/<TOPIC>".'))
+  group.add_argument(
+      '--input_subscription',
+      help=('Input PubSub subscription of the form '
+            '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'))
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  pipeline_options.view_as(StandardOptions).streaming = True
+  pipeline = beam.Pipeline(options=pipeline_options)
+
+  _ = (pipeline
+       | beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
+       | 'generate_metrics' >> (beam.ParDo(StreamingUserMetricsDoFn()))
+       | 'dump_to_pub' >> beam.io.WriteToPubSub(known_args.output_topic))
+
+  result = pipeline.run()
+  result.wait_until_finish()
+  return result
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()
diff --git 
a/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_streaming_metrics_pipeline_test.py
 
b/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_streaming_metrics_pipeline_test.py
new file mode 100644
index 0000000..3e6db0e
--- /dev/null
+++ 
b/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_streaming_metrics_pipeline_test.py
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A word-counting workflow."""
+
+from __future__ import absolute_import
+
+import logging
+import unittest
+import uuid
+
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
+from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
+from apache_beam.runners.dataflow import 
dataflow_exercise_streaming_metrics_pipeline
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing import metric_result_matchers
+from apache_beam.testing import test_utils
+from apache_beam.testing.metric_result_matchers import DistributionMatcher
+from apache_beam.testing.metric_result_matchers import MetricResultMatcher
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+
+INPUT_TOPIC = 'exercise_streaming_metrics_topic_input'
+INPUT_SUB = 'exercise_streaming_metrics_subscription_input'
+OUTPUT_TOPIC = 'exercise_streaming_metrics_topic_output'
+OUTPUT_SUB = 'exercise_streaming_metrics_subscription_output'
+
+WAIT_UNTIL_FINISH_DURATION = 1 * 60 * 1000  # in milliseconds
+MESSAGES_TO_PUBLISH = ["message a", "message b b", "message c"]
+
+SLEEP_TIME_SECS = 1
+
+
+class ExerciseStreamingMetricsPipelineTest(unittest.TestCase):
+
+  def setUp(self):
+    """Creates all required topics and subs."""
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.project = self.test_pipeline.get_option('project')
+    self.uuid = str(uuid.uuid4())
+
+    # Set up PubSub environment.
+    from google.cloud import pubsub
+    self.pub_client = pubsub.PublisherClient()
+    self.input_topic_name = INPUT_TOPIC + self.uuid
+    self.input_topic = self.pub_client.create_topic(
+        self.pub_client.topic_path(self.project, self.input_topic_name))
+
+    self.output_topic_name = OUTPUT_TOPIC + self.uuid
+    self.output_topic = self.pub_client.create_topic(
+        self.pub_client.topic_path(self.project, self.output_topic_name))
+
+    self.sub_client = pubsub.SubscriberClient()
+    self.input_sub_name = INPUT_SUB + self.uuid
+    self.input_sub = self.sub_client.create_subscription(
+        self.sub_client.subscription_path(self.project, self.input_sub_name),
+        self.input_topic.name)
+    self.output_sub_name = OUTPUT_SUB + self.uuid
+    self.output_sub = self.sub_client.create_subscription(
+        self.sub_client.subscription_path(self.project, self.output_sub_name),
+        self.output_topic.name,
+        ack_deadline_seconds=60)
+
+  def _inject_words(self, topic, messages):
+    """Inject messages as test data to PubSub."""
+    logging.debug('Injecting messages to topic %s', topic.name)
+    for msg in messages:
+      self.pub_client.publish(self.input_topic.name, msg.encode('utf-8'))
+    logging.debug('Done. Injecting messages to topic %s', topic.name)
+
+  def tearDown(self):
+    """Delete all created topics and subs."""
+    test_utils.cleanup_subscriptions(self.sub_client,
+                                     [self.input_sub, self.output_sub])
+    test_utils.cleanup_topics(self.pub_client,
+                              [self.input_topic, self.output_topic])
+
+  def run_pipeline(self):
+    # Waits for messages to appear in output topic.
+    expected_msg = [msg.encode('utf-8') for msg in MESSAGES_TO_PUBLISH]
+    pubsub_msg_verifier = PubSubMessageMatcher(self.project,
+                                               self.output_sub.name,
+                                               expected_msg,
+                                               timeout=600)
+
+    # Checks that pipeline initializes to RUNNING state.
+    state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
+
+    extra_opts = {'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION,
+                  'on_success_matcher': all_of(state_verifier,
+                                               pubsub_msg_verifier),
+                  'experiment': 'beam_fn_api',
+                  'input_subscription': self.input_sub.name,
+                  'output_topic': self.output_topic.name,
+                 }
+
+    argv = self.test_pipeline.get_full_options_as_args(**extra_opts)
+    return dataflow_exercise_streaming_metrics_pipeline.run(argv)
+
+  @attr('IT', 'ValidatesRunner')
+  def test_streaming_pipeline_returns_expected_user_metrics_fnapi_it(self):
+    """
+    Runs streaming Dataflow job and verifies that user metrics are reported
+    correctly.
+    """
+    self._inject_words(self.input_topic, MESSAGES_TO_PUBLISH)
+    result = self.run_pipeline()
+
+    METRIC_NAMESPACE = \
+      ('apache_beam.runners.dataflow.'
+       'dataflow_exercise_streaming_metrics_pipeline.StreamingUserMetricsDoFn')
+    matchers = [
+        # User Counter Metrics.
+        MetricResultMatcher(
+            name='double_msg_counter_name',
+            namespace=METRIC_NAMESPACE,
+            step='generate_metrics',
+            attempted=len(MESSAGES_TO_PUBLISH) * 2,
+            committed=len(MESSAGES_TO_PUBLISH) * 2
+        ),
+        MetricResultMatcher(
+            name='msg_len_dist_metric_name',
+            namespace=METRIC_NAMESPACE,
+            step='generate_metrics',
+            attempted=DistributionMatcher(
+                sum_value=len(''.join(MESSAGES_TO_PUBLISH)),
+                count_value=len(MESSAGES_TO_PUBLISH),
+                min_value=len(MESSAGES_TO_PUBLISH[0]),
+                max_value=len(MESSAGES_TO_PUBLISH[1])
+            ),
+            committed=DistributionMatcher(
+                sum_value=len(''.join(MESSAGES_TO_PUBLISH)),
+                count_value=len(MESSAGES_TO_PUBLISH),
+                min_value=len(MESSAGES_TO_PUBLISH[0]),
+                max_value=len(MESSAGES_TO_PUBLISH[1])
+            )
+        ),
+    ]
+
+    metrics = result.metrics().all_metrics()
+    errors = metric_result_matchers.verify_all(metrics, matchers)
+    self.assertFalse(errors, str(errors))
+
+
+if __name__ == '__main__':
+  unittest.main()

Reply via email to