scwhittle commented on code in PR #29879:
URL: https://github.com/apache/beam/pull/29879#discussion_r1455054244
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -579,72 +578,91 @@ public void start() {
sampler.start();
// Periodically report workers counters and other updates.
- globalWorkerUpdatesTimer =
executorSupplier.apply("GlobalWorkerUpdatesTimer");
- globalWorkerUpdatesTimer.scheduleWithFixedDelay(
- this::reportPeriodicWorkerUpdates,
- 0,
- options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
- TimeUnit.MILLISECONDS);
-
- refreshWorkTimer = executorSupplier.apply("RefreshWork");
+ timersMap.put("GlobalWorkerUpdatesTimer",
executorSupplier.apply("GlobalWorkerUpdates"));
+ timersMap
+ .get("GlobalWorkerUpdatesTimer")
+ .scheduleWithFixedDelay(
+ this::reportPeriodicWorkerUpdates,
+ 0,
+ options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
+ TimeUnit.MILLISECONDS);
+
+ timersMap.put("WorkerMessageTimer",
executorSupplier.apply("ReportWorkerMessage"));
Review Comment:
no need to create this executor if not actually scheduling anything on it,
see above.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -276,13 +277,11 @@ public class StreamingDataflowWorker {
private final HotKeyLogger hotKeyLogger;
// Periodic sender of debug information to the debug capture service.
private final DebugCapture.@Nullable Manager debugCaptureManager;
- private ScheduledExecutorService refreshWorkTimer;
- private ScheduledExecutorService statusPageTimer;
- private ScheduledExecutorService globalWorkerUpdatesTimer;
+ // Map from timer name to ScheduledExecutorService
+ private ConcurrentMap<String, ScheduledExecutorService> timersMap = new
ConcurrentHashMap();
Review Comment:
I think that timersMap is a little confusing given the windmill Timer
terminology.
How about just scheduledExecutors? or periodicFunctionExecutors?
Also as a nit, we're not using the key or using this concurrently and it
makes things a little more complicated. I think you could change this to an
ArrayList and avoid repeating the key. You can also only create the executor if
needed.
Example:
```
if (...) {
ScheduledExecutorService workUpdatesService =
executorSupplier.apply("GlobalWorkerUpdates");
workUpdatesService.scheduleWithFixedDelay();
scheduledExecutors.add(workUpdateService);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]