parveensania commented on code in PR #35901:
URL: https://github.com/apache/beam/pull/35901#discussion_r2370953292


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java:
##########
@@ -80,7 +80,7 @@ public final class StreamingWorkerStatusPages {
   private final @Nullable GrpcWindmillStreamFactory windmillStreamFactory;
   private final DebugCapture.@Nullable Manager debugCapture;
   private final @Nullable ChannelzServlet channelzServlet;
-  private final @Nullable ChannelCache channelCache;

Review Comment:
   No it wasn't. Reverted it. 



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -4058,6 +4067,106 @@ public void testStuckCommit() throws Exception {
         removeDynamicFields(result.get(1L)));
   }
 
+  @Test
+  public void testSwitchStreamingWorkerHarness() throws Exception {
+    if (!streamingEngine) {
+      return;
+    }
+
+    List<ParallelInstruction> instructions =
+        Arrays.asList(
+            makeSourceInstruction(StringUtf8Coder.of()),
+            makeSinkInstruction(StringUtf8Coder.of(), 0));
+
+    // Start with CloudPath.
+    DataflowWorkerHarnessOptions options =
+        
createTestingPipelineOptions("--isWindmillServiceDirectPathEnabled=false");
+
+    StreamingDataflowWorker worker =
+        makeWorker(
+            defaultWorkerParams()
+                .setOptions(options)
+                .setInstructions(instructions)
+                .publishCounters()
+                .build());
+
+    GrpcDispatcherClient mockDispatcherClient = 
mock(GrpcDispatcherClient.class);
+
+    // FanOutStreamingEngineWorkerHarness creates
+    // CloudWindmillMetadataServiceV1Alpha1Stub and expects the stream to
+    // successfully start. Mocking it here.
+    Channel mockChannel = mock(Channel.class);
+    ClientCall<WorkerMetadataRequest, WorkerMetadataResponse> mockClientCall =
+        mock(ClientCall.class);
+    when(mockChannel.newCall(
+            
eq(CloudWindmillMetadataServiceV1Alpha1Grpc.getGetWorkerMetadataMethod()), 
any()))
+        .thenReturn(mockClientCall);
+    when(mockDispatcherClient.getWindmillMetadataServiceStubBlocking())
+        
.thenReturn(CloudWindmillMetadataServiceV1Alpha1Grpc.newStub(mockChannel));
+    java.lang.reflect.Field dispatcherClientField =
+        StreamingDataflowWorker.class.getDeclaredField("dispatcherClient");
+    dispatcherClientField.setAccessible(true);
+    dispatcherClientField.set(worker, mockDispatcherClient);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to