scwhittle commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1237012279


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java:
##########
@@ -211,7 +211,11 @@ public CommitWorkResponse 
commitWork(Windmill.CommitWorkRequest request) {
     validateCommitWorkRequest(request);
     for (ComputationCommitWorkRequest computationRequest : 
request.getRequestsList()) {
       for (WorkItemCommitRequest commit : 
computationRequest.getRequestsList()) {
-        commitsReceived.put(commit.getWorkToken(), commit);
+        // Throw away per work item latency attributions because they are not 
deterministic in tests

Review Comment:
   should we have an option or separate method to do this after pulling off of 
commitsReceived?
   That would let us have a test that verified that things were reasonable.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java:
##########
@@ -273,7 +277,11 @@ public boolean awaitTermination(int time, TimeUnit unit) 
throws InterruptedExcep
                     computationWork.getInputDataWatermark());
             for (Windmill.WorkItem workItem : computationWork.getWorkList()) {
               receiver.receiveWork(
-                  computationWork.getComputationId(), inputDataWatermark, 
Instant.now(), workItem);
+                  computationWork.getComputationId(),
+                  inputDataWatermark,
+                  Instant.now(),
+                  workItem,
+                  new ArrayList<>());

Review Comment:
   nit: Collections.emptyList()?



##########
runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto:
##########
@@ -538,6 +574,10 @@ message StreamingGetWorkResponseChunk {
   // from other stream_ids may be interleaved on the physical stream.
   optional fixed64 stream_id = 4;
 
+  // Timing infos for the work item. Windmill Dispatcher and user worker should
+  // propagate critical event timings if the list is not empty.
+  repeated GetWorkStreamTimingInfo per_work_item_timing_infos = 8;

Review Comment:
   below in StreamingCommitWorkRequest should we have similar
   repeated CommitWorkStreamingTimingInfo timing_infos = 8;
   
   That would let us attribute latency on the commit path similarly (fine if we 
want to do this as follow up too)



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java:
##########
@@ -367,6 +375,9 @@ public boolean commitWorkItem(
         errorCollector.checkThat(
             request.getShardingKey(), allOf(greaterThan(0L), 
lessThan(Long.MAX_VALUE)));
         errorCollector.checkThat(request.getCacheToken(), not(equalTo(0L)));
+        // Throw away per work item latency attributions because they are not 
deterministic in tests
+        // for valid comparison.
+        request = 
request.toBuilder().clearPerWorkItemLatencyAttributions().build();

Review Comment:
   ditto (or perhaps just here in the stream case since we're not doing this 
for appliance)



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -3067,8 +3069,14 @@ public void testExceptionInvalidatesCache() throws 
Exception {
 
       assertThat(
           // The commit will include a timer to clean up state - this timer is 
irrelevant
-          // for the current test. Also remove source_bytes_processed because 
it's dynamic.
-          
setValuesTimestamps(commit.toBuilder().clearOutputTimers().clearSourceBytesProcessed())
+          // for the current test. Also remove source_bytes_processed and
+          // per_work_item_latecy_attributions because they're dynamic.

Review Comment:
   ie you could extract to a helper method
   removeDynamicFields(CommitWorkRequest request) 
   
   and use that here and other places instead of dropping in the "server"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to