[ 
https://issues.apache.org/jira/browse/BEAM-8587?focusedWorklogId=340803&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-340803
 ]

ASF GitHub Bot logged work on BEAM-8587:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Nov/19 23:06
            Start Date: 08/Nov/19 23:06
    Worklog Time Spent: 10m 
      Work Description: acrites commented on pull request #10041: [BEAM-8587] 
TestStream for Dataflow runner
URL: https://github.com/apache/beam/pull/10041#discussion_r344400618
 
 

 ##########
 File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
 ##########
 @@ -1195,6 +1196,49 @@ def run__NativeWrite(self, transform_node, options):
          PropertyNames.STEP_NAME: input_step.proto.name,
          PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)})
 
+  @unittest.skip("This is not a test, despite the name.")
+  def run_TestStream(self, transform_node, options):
+    from apache_beam.portability.api import beam_runner_api_pb2
+    from apache_beam.testing.test_stream import ElementEvent
+    from apache_beam.testing.test_stream import ProcessingTimeEvent
+    from apache_beam.testing.test_stream import WatermarkEvent
+    standard_options = options.view_as(StandardOptions)
+    if not standard_options.streaming:
+      raise ValueError('TestStream is currently available for use '
+                       'only in streaming pipelines.')
+
+    transform = transform_node.transform
+    step = self._add_step(TransformNames.READ, transform_node.full_label,
+                          transform_node)
+    step.add_property(PropertyNames.FORMAT, 'test_stream')
+    test_stream_payload = beam_runner_api_pb2.TestStreamPayload()
+    # TestStream source doesn't do any decoding of elements,
+    # so we won't set test_stream_payload.coder_id.
+    output_coder = transform._infer_output_coder()  # pylint: 
disable=protected-access
+    for event in transform.events:
+      new_event = test_stream_payload.events.add()
+      if isinstance(event, ElementEvent):
+        for tv in event.timestamped_values:
+          element = new_event.element_event.elements.add()
+          element.encoded_element = output_coder.encode(tv.value)
+          element.timestamp = tv.timestamp.micros
 
 Review comment:
   I guess one path forward here would be to add new google.protobuf.Timestamp 
fields to TestStreamPayload making it something like:
   
   message TestStreamPayload {
     // (Required) the coder for elements in the TestStream events
     string coder_id = 1;
   
     repeated Event events = 2;
   
     message Event {
       oneof event {
         AdvanceWatermark watermark_event = 1;
         AdvanceProcessingTime processing_time_event = 2;
         AddElements element_event = 3;
       }
   
       message AdvanceWatermark {
         int64 new_watermark = 1;
         Timestamp new_watermark_timestamp = 2;
       }
   
       message AdvanceProcessingTime {
         int64 advance_duration = 1;
         Timestamp advance_duration_timestamp = 2;
       }
   
       message AddElements {
         repeated TimestampedElement elements = 1;
       }
     }
   
     message TimestampedElement {
       bytes encoded_element = 1;
       int64 timestamp = 2;
       Timestamp timestamp_timestamp = 3; // just kidding, not quite sure what 
to name this one
     }
   }
   
   Then I can add both the Timestamp and int64 version to the payload for now. 
Then separately I can change all the internal code that processes the 
TestStreamPayload to use the new fields. As a third step we could then 
deprecate the old fields.
   
   One thing to note is that the python Timestamp objects only have microsecond 
resolution, so I'm guessing there's still a lot of work to be done to 
transition to nanos.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 340803)
    Time Spent: 40m  (was: 0.5h)

> Add TestStream support for Dataflow runner
> ------------------------------------------
>
>                 Key: BEAM-8587
>                 URL: https://issues.apache.org/jira/browse/BEAM-8587
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-dataflow, testing
>            Reporter: Andrew Crites
>            Assignee: Andrew Crites
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> TestStream support needed to test features like late data and processing time 
> triggers on local Dataflow runner.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to