[ 
https://issues.apache.org/jira/browse/BEAM-10703?focusedWorklogId=473058&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-473058
 ]

ASF GitHub Bot logged work on BEAM-10703:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Aug/20 19:50
            Start Date: 20/Aug/20 19:50
    Worklog Time Spent: 10m 
      Work Description: nehsyc commented on a change in pull request #12578:
URL: https://github.com/apache/beam/pull/12578#discussion_r474234594



##########
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java
##########
@@ -72,25 +106,21 @@
         CacheBuilder.newBuilder()
             .expireAfterWrite(cacheDuration.getMillis(), TimeUnit.MILLISECONDS)
             .removalListener(
-                (RemovalNotification<KV<String, ByteString>, CacheEntry> 
notification) -> {
+                (RemovalNotification<CacheKey, CacheEntry> notification) -> {
                   if (notification.getCause() != RemovalCause.EXPLICIT) {
-                    LOG.info("Closing idle reader for {}", 
keyToString(notification.getKey()));
+                    LOG.info("Closing idle reader for {}", 
notification.getKey().toString());

Review comment:
       Done.

##########
File path: 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
##########
@@ -2131,8 +2215,50 @@ public void testActiveWork() throws Exception {
     Mockito.verifyNoMoreInteractions(mockExecutor);
   }
 
+  @Test
+  public void testActiveWorkForShardedKeys() throws Exception {
+    BoundedQueueExecutor mockExecutor = 
Mockito.mock(BoundedQueueExecutor.class);
+    StreamingDataflowWorker.ComputationState computationState =
+        new StreamingDataflowWorker.ComputationState(
+            "computation",
+            
defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))),
+            mockExecutor,
+            ImmutableMap.of(),
+            null);
+
+    ShardedKey key1_shard1 = new ShardedKey(ByteString.copyFromUtf8("key1"), 
1);

Review comment:
       Done.

##########
File path: 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
##########
@@ -2131,8 +2215,50 @@ public void testActiveWork() throws Exception {
     Mockito.verifyNoMoreInteractions(mockExecutor);
   }
 
+  @Test
+  public void testActiveWorkForShardedKeys() throws Exception {
+    BoundedQueueExecutor mockExecutor = 
Mockito.mock(BoundedQueueExecutor.class);
+    StreamingDataflowWorker.ComputationState computationState =
+        new StreamingDataflowWorker.ComputationState(
+            "computation",
+            
defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))),
+            mockExecutor,
+            ImmutableMap.of(),
+            null);
+
+    ShardedKey key1_shard1 = new ShardedKey(ByteString.copyFromUtf8("key1"), 
1);
+    ShardedKey key1_shard2 = new ShardedKey(ByteString.copyFromUtf8("key1"), 
2);

Review comment:
       Done.

##########
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -1095,14 +1098,46 @@ public void run() {
             }
           }
         };
-    if (!computationState.activateWork(workItem.getKey(), work)) {
+    if (!computationState.activateWork(
+        new ShardedKey(workItem.getKey(), workItem.getShardingKey()), work)) {
       // Free worker if the work was not activated.
       // This can happen if it's duplicate work or some other reason.
       sdkHarnessRegistry.completeWork(worker);
     }
   }
 
+  static class ShardedKey {

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 473058)
    Time Spent: 2h  (was: 1h 50m)

> Add support for auto-sharded GroupIntoBatches in Dataflow runner
> ----------------------------------------------------------------
>
>                 Key: BEAM-10703
>                 URL: https://issues.apache.org/jira/browse/BEAM-10703
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-dataflow
>            Reporter: Siyuan Chen
>            Assignee: Siyuan Chen
>            Priority: P2
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> The proposal of improving GroupIntoBatches transform is in BEAM-10475
> This tracks the support in Cloud Dataflow Runner.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to