scwhittle commented on code in PR #32408:
URL: https://github.com/apache/beam/pull/32408#discussion_r1762681609


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -510,13 +500,14 @@ static StreamingDataflowWorker forTesting(
                 /* hasReceivedGlobalConfig= */ true,
                 options.getGlobalConfigRefreshPeriod().getMillis(),
                 workUnitClient,
-                executorSupplier,
-                config ->
-                    onPipelineConfig(
-                        config,
-                        windmillServer::setWindmillServiceEndpoints,
-                        operationalLimits::set))
-            : new 
StreamingApplianceComputationConfigFetcher(windmillServer::getConfig);
+                globalConfigHandle,
+                executorSupplier)
+            : new StreamingApplianceComputationConfigFetcher(
+                windmillServer::getConfig, globalConfigHandle);
+    StreamingGlobalConfig config = 
configFetcher.getGlobalConfigHandle().getConfig();

Review Comment:
   this seems a little racy, can we just registher with the handle to call 
setWindmillServiceEndpoints?



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImplTest.java:
##########
@@ -147,6 +232,45 @@ public void 
registerConfigObserver_shouldNotCallCallbackForIfConfigRemainsSame()
     // call setter again with same config
     globalConfigHandle.setConfig(configToSet.get());
     assertTrue(latch.await(10, TimeUnit.SECONDS));
+    Thread.sleep(TimeUnit.SECONDS.toMillis(10));
     assertEquals(1, callbackCount.get());
   }
+
+  @Test
+  public void registerConfigObserver_updateConfigWhenCallbackIsRunning()
+      throws InterruptedException {
+    CountDownLatch latch = new CountDownLatch(2);
+    StreamingGlobalConfigHandleImpl globalConfigHandle = new 
StreamingGlobalConfigHandleImpl();
+    StreamingGlobalConfig initialConfig =
+        StreamingGlobalConfig.builder()
+            
.setOperationalLimits(OperationalLimits.builder().setMaxOutputValueBytes(4569).build())
+            .build();
+    StreamingGlobalConfig updatedConfig =
+        StreamingGlobalConfig.builder()
+            .setOperationalLimits(
+                OperationalLimits.builder()
+                    .setMaxOutputValueBytes(123)
+                    .setMaxOutputKeyBytes(324)
+                    .setMaxWorkItemCommitBytes(456)
+                    .build())
+            
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
+            .setUserWorkerJobSettings(
+                UserWorkerRunnerV1Settings.newBuilder()
+                    .setUseSeparateWindmillHeartbeatStreams(false)
+                    .build())
+            .build();
+    CopyOnWriteArrayList<StreamingGlobalConfig> configsFromCallback = new 
CopyOnWriteArrayList<>();
+    globalConfigHandle.registerConfigObserver(
+        config -> {
+          configsFromCallback.add(config);
+          if (globalConfigHandle.getConfig().equals(config)) {

Review Comment:
   How about:
   if (config.equals(initialConfig))
   
   seems clearer



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImpl.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+@ThreadSafe
+public class StreamingGlobalConfigHandleImpl implements 
StreamingGlobalConfigHandle {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingGlobalConfigHandleImpl.class);
+
+  private final AtomicReference<StreamingGlobalConfig> streamingEngineConfig =
+      new AtomicReference<>();
+
+  private final CopyOnWriteArrayList<ConfigCallback> configCallbacks = new 
CopyOnWriteArrayList<>();
+
+  @Override
+  public StreamingGlobalConfig getConfig() {

Review Comment:
   mark Nullable



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImplTest.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import org.apache.beam.runners.dataflow.worker.OperationalLimits;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class StreamingGlobalConfigHandleImplTest {
+
+  @Test
+  public void getConfig() {
+    StreamingGlobalConfigHandleImpl globalConfigHandle = new 
StreamingGlobalConfigHandleImpl();
+    StreamingGlobalConfig config =
+        StreamingGlobalConfig.builder()
+            .setOperationalLimits(
+                OperationalLimits.builder()
+                    .setMaxOutputValueBytes(123)
+                    .setMaxOutputKeyBytes(324)
+                    .setMaxWorkItemCommitBytes(456)
+                    .build())
+            
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
+            .setUserWorkerJobSettings(
+                UserWorkerRunnerV1Settings.newBuilder()
+                    .setUseSeparateWindmillHeartbeatStreams(false)
+                    .build())
+            .build();
+    globalConfigHandle.setConfig(config);

Review Comment:
   set a modified config and verify that getConfig reflects it.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -275,6 +277,8 @@ public Long get() {
   @Rule public TestRule restoreMDC = new RestoreDataflowLoggingMDC();
   @Rule public ErrorCollector errorCollector = new ErrorCollector();
   WorkUnitClient mockWorkUnitClient = mock(WorkUnitClient.class);
+  StreamingGlobalConfigHandleImpl mockglobalConfigHandle =

Review Comment:
   nit: camel-case is off



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java:
##########
@@ -146,6 +148,13 @@ public boolean hasInitializedEndpoints() {
     return dispatcherStubs.get().hasInitializedEndpoints();
   }
 
+  public void onJobConfig(StreamingGlobalConfig config) {
+    if (config.windmillServiceEndpoints().isEmpty()) {
+      return;

Review Comment:
   should we log a warning/error?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java:
##########
@@ -150,6 +157,17 @@ private void addStreamingEngineStatusPages() {
     statusPages.addCapturePage(Preconditions.checkNotNull(channelzServlet));
     statusPages.addStatusDataProvider(
         "streaming", "Streaming RPCs", 
Preconditions.checkNotNull(windmillStreamFactory));
+    statusPages.addStatusDataProvider(
+        "jobSettings",
+        "User Worker Job Settings",
+        writer -> {
+          StreamingGlobalConfig config = globalConfig.get();

Review Comment:
   mark nullable



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImplTest.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import org.apache.beam.runners.dataflow.worker.OperationalLimits;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class StreamingGlobalConfigHandleImplTest {
+
+  @Test
+  public void getConfig() {
+    StreamingGlobalConfigHandleImpl globalConfigHandle = new 
StreamingGlobalConfigHandleImpl();

Review Comment:
   verify getConfig is null before setconfig



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to