[ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=340747&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-340747
 ]

ASF GitHub Bot logged work on BEAM-8554:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Nov/19 20:50
            Start Date: 08/Nov/19 20:50
    Worklog Time Spent: 10m 
      Work Description: stevekoonce commented on pull request #10013: 
[BEAM-8554] Use WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#discussion_r344361066
 
 

 ##########
 File path: 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
 ##########
 @@ -949,63 +949,14 @@ public void testKeyCommitTooLargeException() throws 
Exception {
     assertEquals(2, result.size());
     assertEquals(makeExpectedOutput(2, 0, "key", "key").build(), 
result.get(2L));
     assertTrue(result.containsKey(1L));
-    assertEquals("large_key", result.get(1L).getKey().toStringUtf8());
-    assertTrue(result.get(1L).getSerializedSize() > 1000);
 
-    // Spam worker updates a few times.
-    int maxTries = 10;
-    while (--maxTries > 0) {
-      worker.reportPeriodicWorkerUpdates();
-      Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
-    }
+    WorkItemCommitRequest largeCommit = result.get(1L);
+    assertEquals("large_key", largeCommit.getKey().toStringUtf8());
 
-    // We should see an exception reported for the large commit but not the 
small one.
-    ArgumentCaptor<WorkItemStatus> workItemStatusCaptor =
-        ArgumentCaptor.forClass(WorkItemStatus.class);
-    verify(mockWorkUnitClient, 
atLeast(2)).reportWorkItemStatus(workItemStatusCaptor.capture());
-    List<WorkItemStatus> capturedStatuses = 
workItemStatusCaptor.getAllValues();
-    boolean foundErrors = false;
-    for (WorkItemStatus status : capturedStatuses) {
-      if (!status.getErrors().isEmpty()) {
-        assertFalse(foundErrors);
-        foundErrors = true;
-        String errorMessage = status.getErrors().get(0).getMessage();
-        assertThat(errorMessage, 
Matchers.containsString("KeyCommitTooLargeException"));
-      }
-    }
-    assertTrue(foundErrors);
-  }
-
-  @Test
-  public void testKeyCommitTooLargeException_StreamingEngine() throws 
Exception {
-    KvCoder<String, String> kvCoder = KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of());
-
-    List<ParallelInstruction> instructions =
-        Arrays.asList(
-            makeSourceInstruction(kvCoder),
-            makeDoFnInstruction(new LargeCommitFn(), 0, kvCoder),
-            makeSinkInstruction(kvCoder, 1));
-
-    FakeWindmillServer server = new FakeWindmillServer(errorCollector);
-    server.setExpectedExceptionCount(1);
-
-    StreamingDataflowWorkerOptions options =
-        createTestingPipelineOptions(server, 
"--experiments=enable_streaming_engine");
-    StreamingDataflowWorker worker = makeWorker(instructions, options, true /* 
publishCounters */);
-    worker.setMaxWorkItemCommitBytes(1000);
-    worker.start();
-
-    server.addWorkToOffer(makeInput(1, 0, "large_key"));
-    server.addWorkToOffer(makeInput(2, 0, "key"));
-    server.waitForEmptyWorkQueue();
-
-    Map<Long, Windmill.WorkItemCommitRequest> result = 
server.waitForAndGetCommits(1);
-
-    assertEquals(2, result.size());
-    assertEquals(makeExpectedOutput(2, 0, "key", "key").build(), 
result.get(2L));
-    assertTrue(result.containsKey(1L));
-    assertEquals("large_key", result.get(1L).getKey().toStringUtf8());
-    assertTrue(result.get(1L).getSerializedSize() > 1000);
+    // The large commit should have its flags set marking it for truncation
+    assertTrue(largeCommit.getExceedsMaxWorkItemCommitBytes());
+    assertTrue(largeCommit.getSerializedSize() < 100);
 
 Review comment:
   Yes.  I needed to keep the estimated bytes field check given the method for 
generating the expected truncated commit, but should have cleaned up the 
others.  Doing that now.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 340747)
    Time Spent: 2h 20m  (was: 2h 10m)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -----------------------------------------------------------------------------------------
>
>                 Key: BEAM-8554
>                 URL: https://issues.apache.org/jira/browse/BEAM-8554
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-dataflow
>            Reporter: Steve Koonce
>            Priority: Minor
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request is still sent to the service.  The service rejects the commit, 
> but breaks up input messages that were bundled together and adds them to new, 
> smaller work items that will later be pulled and re-tried - likely without 
> generating another commit that is too large.
> When a WorkItemCommitRequest is generated that's too large to be sent back to 
> the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is 
> sent back to the service.
>  
> +Proposed Improvement+
> In both cases, prevent the doomed, large commit item from being sent back to 
> the service.  Instead send flags in the commit request signaling that the 
> current work item led to a commit that is too large and the work item should 
> be broken up.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to