nehsyc commented on a change in pull request #12578:
URL: https://github.com/apache/beam/pull/12578#discussion_r474234594



##########
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java
##########
@@ -72,25 +106,21 @@
         CacheBuilder.newBuilder()
             .expireAfterWrite(cacheDuration.getMillis(), TimeUnit.MILLISECONDS)
             .removalListener(
-                (RemovalNotification<KV<String, ByteString>, CacheEntry> 
notification) -> {
+                (RemovalNotification<CacheKey, CacheEntry> notification) -> {
                   if (notification.getCause() != RemovalCause.EXPLICIT) {
-                    LOG.info("Closing idle reader for {}", 
keyToString(notification.getKey()));
+                    LOG.info("Closing idle reader for {}", 
notification.getKey().toString());

Review comment:
       Done.

##########
File path: 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
##########
@@ -2131,8 +2215,50 @@ public void testActiveWork() throws Exception {
     Mockito.verifyNoMoreInteractions(mockExecutor);
   }
 
+  @Test
+  public void testActiveWorkForShardedKeys() throws Exception {
+    BoundedQueueExecutor mockExecutor = 
Mockito.mock(BoundedQueueExecutor.class);
+    StreamingDataflowWorker.ComputationState computationState =
+        new StreamingDataflowWorker.ComputationState(
+            "computation",
+            
defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))),
+            mockExecutor,
+            ImmutableMap.of(),
+            null);
+
+    ShardedKey key1_shard1 = new ShardedKey(ByteString.copyFromUtf8("key1"), 
1);

Review comment:
       Done.

##########
File path: 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
##########
@@ -2131,8 +2215,50 @@ public void testActiveWork() throws Exception {
     Mockito.verifyNoMoreInteractions(mockExecutor);
   }
 
+  @Test
+  public void testActiveWorkForShardedKeys() throws Exception {
+    BoundedQueueExecutor mockExecutor = 
Mockito.mock(BoundedQueueExecutor.class);
+    StreamingDataflowWorker.ComputationState computationState =
+        new StreamingDataflowWorker.ComputationState(
+            "computation",
+            
defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))),
+            mockExecutor,
+            ImmutableMap.of(),
+            null);
+
+    ShardedKey key1_shard1 = new ShardedKey(ByteString.copyFromUtf8("key1"), 
1);
+    ShardedKey key1_shard2 = new ShardedKey(ByteString.copyFromUtf8("key1"), 
2);

Review comment:
       Done.

##########
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -1095,14 +1098,46 @@ public void run() {
             }
           }
         };
-    if (!computationState.activateWork(workItem.getKey(), work)) {
+    if (!computationState.activateWork(
+        new ShardedKey(workItem.getKey(), workItem.getShardingKey()), work)) {
       // Free worker if the work was not activated.
       // This can happen if it's duplicate work or some other reason.
       sdkHarnessRegistry.completeWork(worker);
     }
   }
 
+  static class ShardedKey {

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to