[ 
https://issues.apache.org/jira/browse/BEAM-8587?focusedWorklogId=341509&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-341509
 ]

ASF GitHub Bot logged work on BEAM-8587:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Nov/19 22:05
            Start Date: 11/Nov/19 22:05
    Worklog Time Spent: 10m 
      Work Description: acrites commented on pull request #10041: [BEAM-8587] 
TestStream for Dataflow runner
URL: https://github.com/apache/beam/pull/10041#discussion_r344931199
 
 

 ##########
 File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
 ##########
 @@ -1195,6 +1196,49 @@ def run__NativeWrite(self, transform_node, options):
          PropertyNames.STEP_NAME: input_step.proto.name,
          PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)})
 
+  @unittest.skip("This is not a test, despite the name.")
+  def run_TestStream(self, transform_node, options):
+    from apache_beam.portability.api import beam_runner_api_pb2
+    from apache_beam.testing.test_stream import ElementEvent
+    from apache_beam.testing.test_stream import ProcessingTimeEvent
+    from apache_beam.testing.test_stream import WatermarkEvent
+    standard_options = options.view_as(StandardOptions)
+    if not standard_options.streaming:
+      raise ValueError('TestStream is currently available for use '
+                       'only in streaming pipelines.')
+
+    transform = transform_node.transform
+    step = self._add_step(TransformNames.READ, transform_node.full_label,
+                          transform_node)
+    step.add_property(PropertyNames.FORMAT, 'test_stream')
+    test_stream_payload = beam_runner_api_pb2.TestStreamPayload()
+    # TestStream source doesn't do any decoding of elements,
+    # so we won't set test_stream_payload.coder_id.
+    output_coder = transform._infer_output_coder()  # pylint: 
disable=protected-access
+    for event in transform.events:
+      new_event = test_stream_payload.events.add()
+      if isinstance(event, ElementEvent):
+        for tv in event.timestamped_values:
+          element = new_event.element_event.elements.add()
+          element.encoded_element = output_coder.encode(tv.value)
+          element.timestamp = tv.timestamp.micros
 
 Review comment:
   We're punting on this for now due to differences in what ranges of times 
protobuf Timestamp and Duration allow vs. what Beam allows.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 341509)
    Time Spent: 1h 10m  (was: 1h)

> Add TestStream support for Dataflow runner
> ------------------------------------------
>
>                 Key: BEAM-8587
>                 URL: https://issues.apache.org/jira/browse/BEAM-8587
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-dataflow, testing
>            Reporter: Andrew Crites
>            Assignee: Andrew Crites
>            Priority: Major
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> TestStream support needed to test features like late data and processing time 
> triggers on local Dataflow runner.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to