ASF GitHub Bot logged work on BEAM-8587:

                Author: ASF GitHub Bot
            Created on: 08/Nov/19 21:57
            Start Date: 08/Nov/19 21:57
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on pull request #10041: [BEAM-8587] 
TestStream for Dataflow runner
URL: https://github.com/apache/beam/pull/10041#discussion_r344380809

 File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
 @@ -1195,6 +1196,49 @@ def run__NativeWrite(self, transform_node, options):
          PropertyNames.STEP_NAME: input_step.proto.name,
          PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)})
+  @unittest.skip("This is not a test, despite the name.")
+  def run_TestStream(self, transform_node, options):
+    from apache_beam.portability.api import beam_runner_api_pb2
+    from apache_beam.testing.test_stream import ElementEvent
+    from apache_beam.testing.test_stream import ProcessingTimeEvent
+    from apache_beam.testing.test_stream import WatermarkEvent
+    standard_options = options.view_as(StandardOptions)
+    if not standard_options.streaming:
+      raise ValueError('TestStream is currently available for use '
+                       'only in streaming pipelines.')
+    transform = transform_node.transform
+    step = self._add_step(TransformNames.READ, transform_node.full_label,
+                          transform_node)
+    step.add_property(PropertyNames.FORMAT, 'test_stream')
+    test_stream_payload = beam_runner_api_pb2.TestStreamPayload()
+    # TestStream source doesn't do any decoding of elements,
+    # so we won't set test_stream_payload.coder_id.
+    output_coder = transform._infer_output_coder()  # pylint: 
+    for event in transform.events:
+      new_event = test_stream_payload.events.add()
+      if isinstance(event, ElementEvent):
+        for tv in event.timestamped_values:
+          element = new_event.element_event.elements.add()
+          element.encoded_element = output_coder.encode(tv.value)
+          element.timestamp = tv.timestamp.micros
 Review comment:
   TestStreamPayload should really be using google.protobuf.Timestamp instead 
of int64 so we can get support for nanos. (here and below in processing time)
   @robertwb has been working to migrate to use nanos everywhere and wouldn't 
want to add another place to migrate.
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

Issue Time Tracking

    Worklog Id:     (was: 340764)
    Time Spent: 20m  (was: 10m)

> Add TestStream support for Dataflow runner
> ------------------------------------------
>                 Key: BEAM-8587
>                 URL: https://issues.apache.org/jira/browse/BEAM-8587
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-dataflow, testing
>            Reporter: Andrew Crites
>            Assignee: Andrew Crites
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
> TestStream support needed to test features like late data and processing time 
> triggers on local Dataflow runner.

This message was sent by Atlassian Jira

Reply via email to