scwhittle commented on code in PR #30439:
URL: https://github.com/apache/beam/pull/30439#discussion_r1510932832
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -500,8 +503,10 @@ static StreamingDataflowWorker forTesting(
boolean publishCounters,
HotKeyLogger hotKeyLogger,
Supplier<Instant> clock,
- Function<String, ScheduledExecutorService> executorSupplier) {
- BoundedQueueExecutor boundedQueueExecutor =
createWorkUnitExecutor(options);
+ Function<String, ScheduledExecutorService> executorSupplier,
+ BoundedQueueExecutor executor) {
Review Comment:
how about naming workUnitExecutor and can just use same variable below to
avoid proliferation of names
if (workUnitExeuctor == null) workUnitExecutor =
createWorkUnitExecutor(options);
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1918,6 +1926,26 @@ private Optional<WorkerMessage>
createWorkerMessageForPerWorkerMetrics() {
return
Optional.of(workUnitClient.createWorkerMessageFromPerWorkerMetrics(perWorkerMetrics));
}
+ private void
readAndSaveWorkerMessageResponseForStreamingScalingReportResponse(
+ List<WorkerMessageResponse> responses) {
+ for (WorkerMessageResponse response : responses) {
+ if (response.getStreamingScalingReportResponse() != null) {
Review Comment:
would it be possible to get multiple responses? If so perhaps it would be
best to find the last value in the loop and then just set once instead of
possibly setting multiple times.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -59,8 +69,8 @@ public BoundedQueueExecutor(
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
- synchronized (this) {
- if (activeCount.getAndIncrement() >= maximumPoolSize - 1) {
+ synchronized (BoundedQueueExecutor.this) {
+ if (activeCount++ >= maximumThreadCount - 1) {
startTimeMaxActiveThreadsUsed = System.currentTimeMillis();
Review Comment:
only reset if zero? I think it could be racy since
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -37,8 +37,17 @@ public class BoundedQueueExecutor {
private final Monitor monitor = new Monitor();
private int elementsOutstanding = 0;
Review Comment:
comment that elementsOutstanding and bytesOutstanding are guarded by the
monitor
##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/BoundedQueueExecutorTest.java:
##########
@@ -0,0 +1,189 @@
+/*
Review Comment:
I think this file should be in the directory path matching other util files
like:
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/MemoryMonitorTest.java
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]