scwhittle commented on code in PR #28835: URL: https://github.com/apache/beam/pull/28835#discussion_r1372987582
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/util/StreamingEngineThrottleTimers.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.util; + +import com.google.auto.value.AutoValue; +import org.apache.beam.runners.dataflow.worker.windmill.grpcclient.ThrottleTimer; Review Comment: seems odd that ThrottleTimer is in grpc package but the collection of them is in a different package. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributor.java: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work.budget; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap; + +import java.util.Map.Entry; +import java.util.function.Function; +import java.util.function.Supplier; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillStreamSender; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Evenly distributes the provided budget across the available {@link WindmillStreamSender}(s). */ +public final class EvenGetWorkBudgetDistributor implements GetWorkBudgetDistributor { + private static final Logger LOG = LoggerFactory.getLogger(EvenGetWorkBudgetDistributor.class); + private final Supplier<GetWorkBudget> activeWorkBudgetSupplier; + + public EvenGetWorkBudgetDistributor(Supplier<GetWorkBudget> activeWorkBudgetSupplier) { + this.activeWorkBudgetSupplier = activeWorkBudgetSupplier; + } + + @Override + public void distributeBudget( + ImmutableCollection<WindmillStreamSender> streams, GetWorkBudget getWorkBudget) { + if (streams.isEmpty()) { + LOG.warn("Cannot distribute budget to no streams."); + return; + } + + if (getWorkBudget.equals(GetWorkBudget.noBudget())) { + LOG.warn("Cannot distribute 0 budget."); Review Comment: ditto, this one in particular could happen more regularly if we have enough work already on this worker. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributor.java: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work.budget; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap; + +import java.util.Map.Entry; +import java.util.function.Function; +import java.util.function.Supplier; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillStreamSender; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Evenly distributes the provided budget across the available {@link WindmillStreamSender}(s). */ +public final class EvenGetWorkBudgetDistributor implements GetWorkBudgetDistributor { + private static final Logger LOG = LoggerFactory.getLogger(EvenGetWorkBudgetDistributor.class); + private final Supplier<GetWorkBudget> activeWorkBudgetSupplier; + + public EvenGetWorkBudgetDistributor(Supplier<GetWorkBudget> activeWorkBudgetSupplier) { + this.activeWorkBudgetSupplier = activeWorkBudgetSupplier; + } + + @Override + public void distributeBudget( + ImmutableCollection<WindmillStreamSender> streams, GetWorkBudget getWorkBudget) { + if (streams.isEmpty()) { + LOG.warn("Cannot distribute budget to no streams."); + return; + } + + if (getWorkBudget.equals(GetWorkBudget.noBudget())) { + LOG.warn("Cannot distribute 0 budget."); + return; + } + + ImmutableMap<WindmillStreamSender, GetWorkBudget> desiredBudgets = + computeDesiredBudgets(streams, getWorkBudget); + + for (Entry<WindmillStreamSender, GetWorkBudget> streamAndDesiredBudget : + desiredBudgets.entrySet()) { + WindmillStreamSender stream = streamAndDesiredBudget.getKey(); + GetWorkBudget desired = streamAndDesiredBudget.getValue(); + GetWorkBudget remaining = stream.remainingGetWorkBudget(); + // Increase budget when it drops below 50% of the target. + if (remaining.isLessThanFiftyPercentOf(desired)) { + long itemAdjustment = desired.items() - remaining.items(); Review Comment: use a budget subtraction method? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributor.java: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work.budget; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap; + +import java.util.Map.Entry; +import java.util.function.Function; +import java.util.function.Supplier; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillStreamSender; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Evenly distributes the provided budget across the available {@link WindmillStreamSender}(s). */ +public final class EvenGetWorkBudgetDistributor implements GetWorkBudgetDistributor { + private static final Logger LOG = LoggerFactory.getLogger(EvenGetWorkBudgetDistributor.class); + private final Supplier<GetWorkBudget> activeWorkBudgetSupplier; + + public EvenGetWorkBudgetDistributor(Supplier<GetWorkBudget> activeWorkBudgetSupplier) { + this.activeWorkBudgetSupplier = activeWorkBudgetSupplier; + } + + @Override + public void distributeBudget( + ImmutableCollection<WindmillStreamSender> streams, GetWorkBudget getWorkBudget) { + if (streams.isEmpty()) { + LOG.warn("Cannot distribute budget to no streams."); + return; + } + + if (getWorkBudget.equals(GetWorkBudget.noBudget())) { + LOG.warn("Cannot distribute 0 budget."); + return; + } + + ImmutableMap<WindmillStreamSender, GetWorkBudget> desiredBudgets = Review Comment: nit: just use Map interface here? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributor.java: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work.budget; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap; + +import java.util.Map.Entry; +import java.util.function.Function; +import java.util.function.Supplier; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillStreamSender; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Evenly distributes the provided budget across the available {@link WindmillStreamSender}(s). */ +public final class EvenGetWorkBudgetDistributor implements GetWorkBudgetDistributor { + private static final Logger LOG = LoggerFactory.getLogger(EvenGetWorkBudgetDistributor.class); + private final Supplier<GetWorkBudget> activeWorkBudgetSupplier; + + public EvenGetWorkBudgetDistributor(Supplier<GetWorkBudget> activeWorkBudgetSupplier) { + this.activeWorkBudgetSupplier = activeWorkBudgetSupplier; + } + + @Override + public void distributeBudget( + ImmutableCollection<WindmillStreamSender> streams, GetWorkBudget getWorkBudget) { + if (streams.isEmpty()) { + LOG.warn("Cannot distribute budget to no streams."); + return; + } + + if (getWorkBudget.equals(GetWorkBudget.noBudget())) { + LOG.warn("Cannot distribute 0 budget."); + return; + } + + ImmutableMap<WindmillStreamSender, GetWorkBudget> desiredBudgets = + computeDesiredBudgets(streams, getWorkBudget); + + for (Entry<WindmillStreamSender, GetWorkBudget> streamAndDesiredBudget : + desiredBudgets.entrySet()) { + WindmillStreamSender stream = streamAndDesiredBudget.getKey(); + GetWorkBudget desired = streamAndDesiredBudget.getValue(); + GetWorkBudget remaining = stream.remainingGetWorkBudget(); + // Increase budget when it drops below 50% of the target. + if (remaining.isLessThanFiftyPercentOf(desired)) { Review Comment: seems like an odd method on Budget. How about a private function in this class boolean shouldSendBudgetRefresh(remaining, desired); ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/ProcessWorkItemClient.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work; + +import com.google.auto.value.AutoValue; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.CommitWorkStream; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream; + +/** + * A client context to process {@link WorkItem} and route all subsequent Windmill WorkItem API calls + * to the same backend worker. Wraps the {@link WorkItem}. + */ +@AutoValue +public abstract class ProcessWorkItemClient { + public static ProcessWorkItemClient create( + WorkItem workItem, GetDataStream getDataStream, CommitWorkStream commitWorkStream) { + return new AutoValue_ProcessWorkItemClient(workItem, getDataStream, commitWorkStream); + } + + /** {@link WorkItem} being processed. */ + public abstract WorkItem workItem(); + + public abstract GetDataStream getDataStream(); Review Comment: The GetDataStream that connects to windmill worker handling the work item. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributor.java: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work.budget; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap; + +import java.util.Map.Entry; +import java.util.function.Function; +import java.util.function.Supplier; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillStreamSender; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Evenly distributes the provided budget across the available {@link WindmillStreamSender}(s). */ +public final class EvenGetWorkBudgetDistributor implements GetWorkBudgetDistributor { + private static final Logger LOG = LoggerFactory.getLogger(EvenGetWorkBudgetDistributor.class); + private final Supplier<GetWorkBudget> activeWorkBudgetSupplier; + + public EvenGetWorkBudgetDistributor(Supplier<GetWorkBudget> activeWorkBudgetSupplier) { + this.activeWorkBudgetSupplier = activeWorkBudgetSupplier; + } + + @Override + public void distributeBudget( + ImmutableCollection<WindmillStreamSender> streams, GetWorkBudget getWorkBudget) { + if (streams.isEmpty()) { + LOG.warn("Cannot distribute budget to no streams."); + return; + } + + if (getWorkBudget.equals(GetWorkBudget.noBudget())) { + LOG.warn("Cannot distribute 0 budget."); + return; + } + + ImmutableMap<WindmillStreamSender, GetWorkBudget> desiredBudgets = + computeDesiredBudgets(streams, getWorkBudget); + + for (Entry<WindmillStreamSender, GetWorkBudget> streamAndDesiredBudget : + desiredBudgets.entrySet()) { + WindmillStreamSender stream = streamAndDesiredBudget.getKey(); + GetWorkBudget desired = streamAndDesiredBudget.getValue(); + GetWorkBudget remaining = stream.remainingGetWorkBudget(); + // Increase budget when it drops below 50% of the target. + if (remaining.isLessThanFiftyPercentOf(desired)) { + long itemAdjustment = desired.items() - remaining.items(); + long byteAdjustment = desired.bytes() - remaining.bytes(); + + LOG.info( + "Adjusting budget for stream={} by items={} and bytes{}", + stream, + itemAdjustment, + byteAdjustment); + + stream.adjustBudget(itemAdjustment, byteAdjustment); + } + } + } + + private ImmutableMap<WindmillStreamSender, GetWorkBudget> computeDesiredBudgets( + ImmutableCollection<WindmillStreamSender> streams, GetWorkBudget totalGetWorkBudget) { + GetWorkBudget activeWorkBudget = activeWorkBudgetSupplier.get(); + LOG.info("Current active work budget: {}", activeWorkBudget); + GetWorkBudget budgetPerStream = + GetWorkBudget.builder() + .setItems( + divideAndRoundUp( + totalGetWorkBudget.items() - activeWorkBudget.items(), streams.size())) + .setBytes( + divideAndRoundUp( + totalGetWorkBudget.bytes() - activeWorkBudget.bytes(), streams.size())) + .build(); + LOG.info("Desired budgets per stream: {}; stream count: {}", budgetPerStream, streams.size()); + return streams.stream().collect(toImmutableMap(Function.identity(), unused -> budgetPerStream)); + } + + private long divideAndRoundUp(long n, int denominator) { + return (long) Math.ceil(n / (denominator * 1.0)); Review Comment: could use https://guava.dev/releases/23.0/api/docs/com/google/common/math/LongMath.html#divide-long-long-java.math.RoundingMode- ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/ProcessWorkItem.java: ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work; + +import com.google.errorprone.annotations.CheckReturnValue; +import java.util.Collection; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; + +/** + * Receives and processes {@link Review Comment: put on the method? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamingEngineClient.java: ########## @@ -0,0 +1,477 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList.toImmutableList; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap; + +import com.google.auto.value.AutoValue; +import com.google.errorprone.annotations.CheckReturnValue; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; +import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkerMetadataStream; +import org.apache.beam.runners.dataflow.worker.windmill.grpcclient.ThrottleTimer; +import org.apache.beam.runners.dataflow.worker.windmill.util.WindmillGrpcStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; +import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget; +import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributor; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Client for StreamingEngine. Given a {@link GetWorkBudget}, divides the budget and starts the + * {@link org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream}(s). + */ +@CheckReturnValue +@ThreadSafe +public class StreamingEngineClient { + @VisibleForTesting static final int SCHEDULED_BUDGET_REFRESH_MILLIS = 100; + private static final Logger LOG = LoggerFactory.getLogger(StreamingEngineClient.class); + private static final String BUDGET_REFRESH_THREAD = "BudgetRefreshThread"; + private static final String CONSUMER_WORKER_METADATA_THREAD = "ConsumeWorkerMetadataThread"; + + private final AtomicBoolean started; + private final JobHeader jobHeader; + private final GetWorkBudget totalGetWorkBudget; + private final StreamingEngineStreamFactory streamingEngineStreamFactory; + private final WorkItemReceiver workItemReceiver; + private final WindmillGrpcStubFactory windmillGrpcStubFactory; + private final GetWorkBudgetDistributor getWorkBudgetDistributor; + private final DispatcherClient dispatcherClient; + private final AtomicBoolean isBudgetRefreshPaused; + /** Writes are guarded by synchronization, reads are lock free. */ + private final AtomicReference<StreamEngineConnectionState> connections; + + /** + * Used to implement publish/subscribe behavior for triggering budget refreshes/redistribution. + * Subscriber {@link #budgetRefreshExecutor} will either redistribute the budget if a value has + * been triggered or more than {@link #SCHEDULED_BUDGET_REFRESH_MILLIS} time has passed. + */ + private final BlockingDeque<TimeStampedTriggeredBudgetRefresh> budgetRefreshTrigger; + + private final ScheduledExecutorService budgetRefreshExecutor; + private final AtomicReference<Instant> lastBudgetRefresh; + private final ThrottleTimer getWorkerMetadataThrottleTimer; + private final CountDownLatch getWorkerMetadataReady; + private final ExecutorService consumeWorkerMetadataExecutor; + private final long clientId; + /** + * Reference to {@link GetWorkerMetadataStream} that is lazily initialized, with its initial value + * being null. + */ + private volatile @Nullable GetWorkerMetadataStream getWorkerMetadataStream; + + private StreamingEngineClient( + JobHeader jobHeader, + GetWorkBudget totalGetWorkBudget, + AtomicReference<StreamEngineConnectionState> connections, + StreamingEngineStreamFactory streamingEngineStreamFactory, + WorkItemReceiver workItemReceiver, + WindmillGrpcStubFactory windmillGrpcStubFactory, + GetWorkBudgetDistributor getWorkBudgetDistributor, + DispatcherClient dispatcherClient) { + this.jobHeader = jobHeader; + this.totalGetWorkBudget = totalGetWorkBudget; + this.started = new AtomicBoolean(); + this.streamingEngineStreamFactory = streamingEngineStreamFactory; + this.workItemReceiver = workItemReceiver; + this.connections = connections; + this.windmillGrpcStubFactory = windmillGrpcStubFactory; + this.getWorkBudgetDistributor = getWorkBudgetDistributor; + this.dispatcherClient = dispatcherClient; + this.isBudgetRefreshPaused = new AtomicBoolean(false); + this.budgetRefreshTrigger = new LinkedBlockingDeque<>(); + this.getWorkerMetadataThrottleTimer = new ThrottleTimer(); + this.budgetRefreshExecutor = createSingleThreadedExecutor(BUDGET_REFRESH_THREAD); + this.consumeWorkerMetadataExecutor = + createSingleThreadedExecutor(CONSUMER_WORKER_METADATA_THREAD); + this.getWorkerMetadataStream = null; + this.getWorkerMetadataReady = new CountDownLatch(1); + this.clientId = new Random().nextLong(); + this.lastBudgetRefresh = new AtomicReference<>(Instant.EPOCH); + } + + /** + * Creates an instance of {@link StreamingEngineClient} and starts the {@link + * GetWorkerMetadataStream} with an RPC to the StreamingEngine backend. {@link + * GetWorkerMetadataStream} will populate {@link #connections} when a response is received. Calls + * to {@link #startAndCacheStreams()} will block until {@link #connections} are populated. + */ + public static StreamingEngineClient create( + JobHeader jobHeader, + GetWorkBudget totalGetWorkBudget, + StreamingEngineStreamFactory streamingEngineStreamFactory, + WorkItemReceiver workItemReceiver, + WindmillGrpcStubFactory windmillGrpcStubFactory, + GetWorkBudgetDistributor getWorkBudgetDistributor, + DispatcherClient dispatcherClient) { + StreamingEngineClient streamingEngineClient = + new StreamingEngineClient( + jobHeader, + totalGetWorkBudget, + new AtomicReference<>(StreamEngineConnectionState.EMPTY), + streamingEngineStreamFactory, + workItemReceiver, + windmillGrpcStubFactory, + getWorkBudgetDistributor, + dispatcherClient); + streamingEngineClient.startGetWorkerMetadataStream(); + return streamingEngineClient; + } + + @VisibleForTesting + static StreamingEngineClient forTesting( + JobHeader jobHeader, + GetWorkBudget totalGetWorkBudget, + AtomicReference<StreamEngineConnectionState> connections, + StreamingEngineStreamFactory streamingEngineStreamFactory, + WorkItemReceiver workItemReceiver, + WindmillGrpcStubFactory windmillGrpcStubFactory, + GetWorkBudgetDistributor getWorkBudgetDistributor, + DispatcherClient dispatcherClient) { + StreamingEngineClient streamingEngineClient = + new StreamingEngineClient( + jobHeader, + totalGetWorkBudget, + connections, + streamingEngineStreamFactory, + workItemReceiver, + windmillGrpcStubFactory, + getWorkBudgetDistributor, + dispatcherClient); + streamingEngineClient.startGetWorkerMetadataStream(); + return streamingEngineClient; + } + + private static ScheduledExecutorService createSingleThreadedExecutor(String threadName) { + return Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat(threadName) + .setUncaughtExceptionHandler( + (t, e) -> + LOG.error( + "{} failed due to uncaught exception during execution. ", t.getName(), e)) + .build()); + } + + /** + * Starts the streams with the {@link #connections} values. Does nothing if this has already been + * called. + * + * @throws IllegalArgumentException if trying to start before {@link #connections} are set with + * {@link GetWorkerMetadataStream}. + */ + public void startAndCacheStreams() { + // Do nothing if we have already initialized the initial streams. + if (!started.compareAndSet(false, true)) { + return; + } + waitForFirstStreamingEngineEndpoints(); + StreamEngineConnectionState currentConnectionsState = connections.get(); + Preconditions.checkState( + !StreamEngineConnectionState.EMPTY.equals(currentConnectionsState), + "Cannot start streams without connections."); + LOG.info("Starting initial GetWorkStreams with connections={}", currentConnectionsState); + ImmutableCollection<WindmillStreamSender> windmillStreamSenders = + currentConnectionsState.windmillStreams().values(); + getWorkBudgetDistributor.distributeBudget( + currentConnectionsState.windmillStreams().values(), totalGetWorkBudget); + lastBudgetRefresh.compareAndSet(Instant.EPOCH, Instant.now()); + windmillStreamSenders.forEach(WindmillStreamSender::startStreams); + startBudgetRefreshThreads(); + } + + private void waitForFirstStreamingEngineEndpoints() { + try { + getWorkerMetadataReady.await(); + } catch (InterruptedException e) { + throw new StreamingEngineClientException( + "Error occurred waiting for StreamingEngine backend endpoints.", e); + } + } + + /** + * {@link java.util.function.Consumer<WindmillEndpoints>} used to update {@link #connections} on + * new backend worker metadata. + */ + private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWindmillEndpoints) { + isBudgetRefreshPaused.set(true); + LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints); + ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections = + createNewWindmillConnections(ImmutableSet.copyOf(newWindmillEndpoints.windmillEndpoints())); + ImmutableMap<WindmillConnection, WindmillStreamSender> newWindmillStreams = + closeStaleStreamsAndCreateNewStreams(ImmutableSet.copyOf(newWindmillConnections.values())); + ImmutableMap<Endpoint, Supplier<GetDataStream>> newGlobalDataStreams = + createNewGlobalDataStreams( + ImmutableSet.copyOf(newWindmillEndpoints.globalDataEndpoints().values())); + + StreamEngineConnectionState newConnectionsState = + StreamEngineConnectionState.builder() + .setWindmillConnections(newWindmillConnections) + .setWindmillStreams(newWindmillStreams) + .setGlobalDataEndpoints(newWindmillEndpoints.globalDataEndpoints()) + .setGlobalDataStreams(newGlobalDataStreams) + .build(); + + LOG.info( + "Setting new connections: {}. Previous connections: {}.", + newConnectionsState, + connections.get()); + connections.set(newConnectionsState); + isBudgetRefreshPaused.set(false); + + // On first worker metadata. Trigger + if (getWorkerMetadataReady.getCount() > 0) { + getWorkerMetadataReady.countDown(); + } else { + requestBudgetRefresh(TimeStampedTriggeredBudgetRefresh.Event.NEW_ENDPOINTS); + } + } + + public ImmutableList<Long> getAndResetThrottleTimes() { + StreamEngineConnectionState currentConnections = connections.get(); + + ImmutableList<Long> keyedWorkStreamThrottleTimes = + currentConnections.windmillStreams().values().stream() + .map(WindmillStreamSender::getAndResetThrottleTime) + .collect(toImmutableList()); + + return ImmutableList.<Long>builder() + .add(getWorkerMetadataThrottleTimer.getAndResetThrottleTime()) + .addAll(keyedWorkStreamThrottleTimes) + .build(); + } + + /** Starts {@link GetWorkerMetadataStream}. */ + @SuppressWarnings({ + "FutureReturnValueIgnored", // ignoring Future returned from Executor.submit() + "nullness" // Uninitialized value of getWorkerMetadataStream is null. + }) + private void startGetWorkerMetadataStream() { + // We only want to set and start this value once. + if (getWorkerMetadataStream == null) { + synchronized (this) { + if (getWorkerMetadataStream == null) { + getWorkerMetadataStream = + streamingEngineStreamFactory.createGetWorkerMetadataStream( + dispatcherClient.getDispatcherStub(), + getWorkerMetadataThrottleTimer, + endpoints -> + consumeWorkerMetadataExecutor.submit( + () -> consumeWindmillWorkerEndpoints(endpoints))); + } + } + } + } + + @SuppressWarnings("FutureReturnValueIgnored") + private void startBudgetRefreshThreads() { + budgetRefreshExecutor.scheduleAtFixedRate( + this::refreshBudget, + SCHEDULED_BUDGET_REFRESH_MILLIS, + SCHEDULED_BUDGET_REFRESH_MILLIS, + TimeUnit.MILLISECONDS); + } + + private void refreshBudget() { Review Comment: What is the benefit of the fixedRate versus a loop if its running continuously? I think we still want some way to exit early on shutdown so we don't have to wait for the SCHEDULED_BUDGET_REFRESH for the method to finish. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamingEngineClient.java: ########## @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList.toImmutableList; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap; + +import com.google.errorprone.annotations.CheckReturnValue; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; +import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkerMetadataStream; +import org.apache.beam.runners.dataflow.worker.windmill.grpcclient.ThrottleTimer; +import org.apache.beam.runners.dataflow.worker.windmill.util.WindmillGrpcStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; +import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget; +import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributor; +import org.apache.beam.sdk.fn.stream.AdvancingPhaser; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Client for StreamingEngine. Given a {@link GetWorkBudget}, divides the budget and starts the + * {@link org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream}(s). + */ +@CheckReturnValue +@ThreadSafe +public class StreamingEngineClient { + @VisibleForTesting static final int SCHEDULED_BUDGET_REFRESH_MILLIS = 100; + private static final Logger LOG = LoggerFactory.getLogger(StreamingEngineClient.class); + private static final String BUDGET_REFRESH_THREAD = "BudgetRefreshThread"; + private static final String CONSUMER_WORKER_METADATA_THREAD = "ConsumeWorkerMetadataThread"; + + private final AtomicBoolean started; + private final JobHeader jobHeader; + private final GetWorkBudget totalGetWorkBudget; + private final StreamingEngineStreamFactory streamingEngineStreamFactory; + private final WorkItemReceiver workItemReceiver; + private final WindmillGrpcStubFactory windmillGrpcStubFactory; + private final GetWorkBudgetDistributor getWorkBudgetDistributor; + private final DispatcherClient dispatcherClient; + private final AtomicBoolean isBudgetRefreshPaused; + /** Writes are guarded by synchronization, reads are lock free. */ + private final AtomicReference<StreamEngineConnectionState> connections; + + /** + * Used to implement publish/subscribe behavior for triggering budget refreshes/redistribution. + * Subscriber {@link #budgetRefreshExecutor} will either redistribute the budget if manually + * triggered or more than {@link #SCHEDULED_BUDGET_REFRESH_MILLIS} time has passed. + */ + private final AdvancingPhaser budgetRefreshTrigger; + + private final ScheduledExecutorService budgetRefreshExecutor; + private final AtomicReference<Instant> lastBudgetRefresh; + private final ThrottleTimer getWorkerMetadataThrottleTimer; + private final CountDownLatch getWorkerMetadataReady; + private final ExecutorService consumeWorkerMetadataExecutor; + private final long clientId; + /** + * Reference to {@link GetWorkerMetadataStream} that is lazily initialized, with its initial value + * being null. + */ + private volatile @Nullable GetWorkerMetadataStream getWorkerMetadataStream; + + private StreamingEngineClient( + JobHeader jobHeader, + GetWorkBudget totalGetWorkBudget, + AtomicReference<StreamEngineConnectionState> connections, + StreamingEngineStreamFactory streamingEngineStreamFactory, + WorkItemReceiver workItemReceiver, + WindmillGrpcStubFactory windmillGrpcStubFactory, + GetWorkBudgetDistributor getWorkBudgetDistributor, + DispatcherClient dispatcherClient) { + this.jobHeader = jobHeader; + this.totalGetWorkBudget = totalGetWorkBudget; + this.started = new AtomicBoolean(); + this.streamingEngineStreamFactory = streamingEngineStreamFactory; + this.workItemReceiver = workItemReceiver; + this.connections = connections; + this.windmillGrpcStubFactory = windmillGrpcStubFactory; + this.getWorkBudgetDistributor = getWorkBudgetDistributor; + this.dispatcherClient = dispatcherClient; + this.isBudgetRefreshPaused = new AtomicBoolean(false); + this.budgetRefreshTrigger = new AdvancingPhaser(1); + this.getWorkerMetadataThrottleTimer = new ThrottleTimer(); + this.budgetRefreshExecutor = createSingleThreadedExecutor(BUDGET_REFRESH_THREAD); + this.consumeWorkerMetadataExecutor = + createSingleThreadedExecutor(CONSUMER_WORKER_METADATA_THREAD); + this.getWorkerMetadataStream = null; + this.getWorkerMetadataReady = new CountDownLatch(1); + this.clientId = new Random().nextLong(); + this.lastBudgetRefresh = new AtomicReference<>(Instant.EPOCH); + } + + /** + * Creates an instance of {@link StreamingEngineClient} and starts the {@link + * GetWorkerMetadataStream} with an RPC to the StreamingEngine backend. {@link + * GetWorkerMetadataStream} will populate {@link #connections} when a response is received. Calls + * to {@link #startAndCacheStreams()} will block until {@link #connections} are populated. + */ + public static StreamingEngineClient create( + JobHeader jobHeader, + GetWorkBudget totalGetWorkBudget, + StreamingEngineStreamFactory streamingEngineStreamFactory, + WorkItemReceiver workItemReceiver, + WindmillGrpcStubFactory windmillGrpcStubFactory, + GetWorkBudgetDistributor getWorkBudgetDistributor, + DispatcherClient dispatcherClient) { + StreamingEngineClient streamingEngineClient = + new StreamingEngineClient( + jobHeader, + totalGetWorkBudget, + new AtomicReference<>(StreamEngineConnectionState.EMPTY), + streamingEngineStreamFactory, + workItemReceiver, + windmillGrpcStubFactory, + getWorkBudgetDistributor, + dispatcherClient); + streamingEngineClient.startGetWorkerMetadataStream(); + return streamingEngineClient; + } + + @VisibleForTesting + static StreamingEngineClient forTesting( + JobHeader jobHeader, + GetWorkBudget totalGetWorkBudget, + AtomicReference<StreamEngineConnectionState> connections, + StreamingEngineStreamFactory streamingEngineStreamFactory, + WorkItemReceiver workItemReceiver, + WindmillGrpcStubFactory windmillGrpcStubFactory, + GetWorkBudgetDistributor getWorkBudgetDistributor, + DispatcherClient dispatcherClient) { + StreamingEngineClient streamingEngineClient = + new StreamingEngineClient( + jobHeader, + totalGetWorkBudget, + connections, + streamingEngineStreamFactory, + workItemReceiver, + windmillGrpcStubFactory, + getWorkBudgetDistributor, + dispatcherClient); + streamingEngineClient.startGetWorkerMetadataStream(); + return streamingEngineClient; + } + + private static ScheduledExecutorService createSingleThreadedExecutor(String threadName) { + return Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat(threadName) + .setUncaughtExceptionHandler( + (t, e) -> + LOG.error( + "{} failed due to uncaught exception during execution. ", t.getName(), e)) + .build()); + } + + /** + * Starts the streams with the {@link #connections} values. Does nothing if this has already been + * called. + * + * @throws IllegalArgumentException if trying to start before {@link #connections} are set with + * {@link GetWorkerMetadataStream}. + */ + public void startAndCacheStreams() { + // Do nothing if we have already initialized the initial streams. + if (!started.compareAndSet(false, true)) { + return; + } + waitForFirstStreamingEngineEndpoints(); + StreamEngineConnectionState currentConnectionsState = connections.get(); + Preconditions.checkState( + !StreamEngineConnectionState.EMPTY.equals(currentConnectionsState), + "Cannot start streams without connections."); + LOG.info("Starting initial GetWorkStreams with connections={}", currentConnectionsState); + ImmutableCollection<WindmillStreamSender> windmillStreamSenders = + currentConnectionsState.windmillStreams().values(); + getWorkBudgetDistributor.distributeBudget( + currentConnectionsState.windmillStreams().values(), totalGetWorkBudget); + lastBudgetRefresh.compareAndSet(Instant.EPOCH, Instant.now()); + windmillStreamSenders.forEach(WindmillStreamSender::startStreams); + startBudgetRefreshThread(); + } + + private void waitForFirstStreamingEngineEndpoints() { + try { + getWorkerMetadataReady.await(); + } catch (InterruptedException e) { + throw new StreamingEngineClientException( + "Error occurred waiting for StreamingEngine backend endpoints.", e); + } + } + + /** + * {@link java.util.function.Consumer<WindmillEndpoints>} used to update {@link #connections} on + * new backend worker metadata. + */ + private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWindmillEndpoints) { + isBudgetRefreshPaused.set(true); + LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints); + ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections = + createNewWindmillConnections(ImmutableSet.copyOf(newWindmillEndpoints.windmillEndpoints())); + ImmutableMap<WindmillConnection, WindmillStreamSender> newWindmillStreams = + closeStaleStreamsAndCreateNewStreams(ImmutableSet.copyOf(newWindmillConnections.values())); + ImmutableMap<Endpoint, Supplier<GetDataStream>> newGlobalDataStreams = + createNewGlobalDataStreams( + ImmutableSet.copyOf(newWindmillEndpoints.globalDataEndpoints().values())); + + StreamEngineConnectionState newConnectionsState = + StreamEngineConnectionState.builder() + .setWindmillConnections(newWindmillConnections) + .setWindmillStreams(newWindmillStreams) + .setGlobalDataEndpoints(newWindmillEndpoints.globalDataEndpoints()) + .setGlobalDataStreams(newGlobalDataStreams) + .build(); + + LOG.info( + "Setting new connections: {}. Previous connections: {}.", + newConnectionsState, + connections.get()); + connections.set(newConnectionsState); + isBudgetRefreshPaused.set(false); + + // On first worker metadata. Trigger + if (getWorkerMetadataReady.getCount() > 0) { + getWorkerMetadataReady.countDown(); + } else { + requestBudgetRefresh(); + } + } + + public ImmutableList<Long> getAndResetThrottleTimes() { + StreamEngineConnectionState currentConnections = connections.get(); + + ImmutableList<Long> keyedWorkStreamThrottleTimes = + currentConnections.windmillStreams().values().stream() + .map(WindmillStreamSender::getAndResetThrottleTime) + .collect(toImmutableList()); + + return ImmutableList.<Long>builder() + .add(getWorkerMetadataThrottleTimer.getAndResetThrottleTime()) + .addAll(keyedWorkStreamThrottleTimes) + .build(); + } + + /** Starts {@link GetWorkerMetadataStream}. */ + @SuppressWarnings({ + "FutureReturnValueIgnored", // ignoring Future returned from Executor.submit() + "nullness" // Uninitialized value of getWorkerMetadataStream is null. + }) + private void startGetWorkerMetadataStream() { + // We only want to set and start this value once. + if (getWorkerMetadataStream == null) { + synchronized (this) { + if (getWorkerMetadataStream == null) { + getWorkerMetadataStream = + streamingEngineStreamFactory.createGetWorkerMetadataStream( + dispatcherClient.getDispatcherStub(), + getWorkerMetadataThrottleTimer, + endpoints -> + consumeWorkerMetadataExecutor.submit( + () -> consumeWindmillWorkerEndpoints(endpoints))); + } + } + } + } + + @SuppressWarnings("FutureReturnValueIgnored") + private void startBudgetRefreshThread() { + // Only 1 task will run at a time from the ScheduledExecutor documentation: If any + // execution of this task takes longer than its period, then subsequent executions may start + // late, but will not concurrently execute. + budgetRefreshExecutor.scheduleAtFixedRate( + this::refreshBudget, + // No need to initially wait since refreshBudget will wait before redistributing the budget. + 0, + // Do not wait between successive executions. This should be constantly running. + 0, + TimeUnit.MILLISECONDS); + } + + private void refreshBudget() { + if (isBudgetRefreshPaused.get() || !started.get()) { + return; + } + + // Phaser.awaitAdvanceInterruptibly() returns a negative value if the phaser is + // terminated. + if (waitForBudgetRefreshTrigger() < 0) { + return; + } + + redistributeBudget(); + } + + private int waitForBudgetRefreshTrigger() { + try { + return budgetRefreshTrigger.awaitAdvanceInterruptibly( + 0, SCHEDULED_BUDGET_REFRESH_MILLIS, TimeUnit.MILLISECONDS); Review Comment: you need to remember what the last phase was not just use 0, so that this will only signal if the phase increases (via arrive() triggering) from the last waiting. This could just be kept on the stack if you change to a while loop. Or you could create a helper class to make this easier to use and keep the phase internal to itself. IF you do make it a separate file with tests. class Trigger { void trigger(); class Observer { private phase = 0; // Will return if observed Trigger has trigger() called or deadline reached. // Guaranteed to be triggered if any trigger() call since last await returned. // Multiple triggers may be collapsed into single observation. Observers are independent from each other. boolean awaitWithDeadline(deadline) { } } Observer createObserver(); } ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributor.java: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work.budget; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap; + +import java.util.Map.Entry; +import java.util.function.Function; +import java.util.function.Supplier; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillStreamSender; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Evenly distributes the provided budget across the available {@link WindmillStreamSender}(s). */ +public final class EvenGetWorkBudgetDistributor implements GetWorkBudgetDistributor { + private static final Logger LOG = LoggerFactory.getLogger(EvenGetWorkBudgetDistributor.class); + private final Supplier<GetWorkBudget> activeWorkBudgetSupplier; + + public EvenGetWorkBudgetDistributor(Supplier<GetWorkBudget> activeWorkBudgetSupplier) { + this.activeWorkBudgetSupplier = activeWorkBudgetSupplier; + } + + @Override + public void distributeBudget( + ImmutableCollection<WindmillStreamSender> streams, GetWorkBudget getWorkBudget) { + if (streams.isEmpty()) { + LOG.warn("Cannot distribute budget to no streams."); Review Comment: can we change this to periodically log? if not maybe better just as debug ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/ProcessWorkItemClient.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work; + +import com.google.auto.value.AutoValue; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.CommitWorkStream; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream; + +/** + * A client context to process {@link WorkItem} and route all subsequent Windmill WorkItem API calls + * to the same backend worker. Wraps the {@link WorkItem}. + */ +@AutoValue +public abstract class ProcessWorkItemClient { + public static ProcessWorkItemClient create( + WorkItem workItem, GetDataStream getDataStream, CommitWorkStream commitWorkStream) { + return new AutoValue_ProcessWorkItemClient(workItem, getDataStream, commitWorkStream); + } + + /** {@link WorkItem} being processed. */ + public abstract WorkItem workItem(); + + public abstract GetDataStream getDataStream(); + + public abstract CommitWorkStream commitWorkStream(); Review Comment: ditto ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/util/WindmillChannelFactory.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.util; + +import java.net.Inet6Address; +import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; +import javax.net.ssl.SSLException; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessChannelBuilder; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.GrpcSslContexts; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NegotiationType; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NettyChannelBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; + +/** Utilities for creating {@link Channel} for gRPC stubs. */ +public final class WindmillChannelFactory { Review Comment: Ping on this, this PR is very big and it would be helpful for reviewing to have this separated. With force pushes the diffs to latest reviewed files seem to not work making it difficult to review especially with so many files. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamingEngineClient.java: ########## @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList.toImmutableList; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap; + +import com.google.errorprone.annotations.CheckReturnValue; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; +import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkerMetadataStream; +import org.apache.beam.runners.dataflow.worker.windmill.grpcclient.ThrottleTimer; +import org.apache.beam.runners.dataflow.worker.windmill.util.WindmillGrpcStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; +import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget; +import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributor; +import org.apache.beam.sdk.fn.stream.AdvancingPhaser; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Client for StreamingEngine. Given a {@link GetWorkBudget}, divides the budget and starts the + * {@link org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream}(s). + */ +@CheckReturnValue +@ThreadSafe +public class StreamingEngineClient { + @VisibleForTesting static final int SCHEDULED_BUDGET_REFRESH_MILLIS = 100; + private static final Logger LOG = LoggerFactory.getLogger(StreamingEngineClient.class); + private static final String BUDGET_REFRESH_THREAD = "BudgetRefreshThread"; + private static final String CONSUMER_WORKER_METADATA_THREAD = "ConsumeWorkerMetadataThread"; + + private final AtomicBoolean started; + private final JobHeader jobHeader; + private final GetWorkBudget totalGetWorkBudget; + private final StreamingEngineStreamFactory streamingEngineStreamFactory; + private final WorkItemReceiver workItemReceiver; + private final WindmillGrpcStubFactory windmillGrpcStubFactory; + private final GetWorkBudgetDistributor getWorkBudgetDistributor; + private final DispatcherClient dispatcherClient; + private final AtomicBoolean isBudgetRefreshPaused; + /** Writes are guarded by synchronization, reads are lock free. */ + private final AtomicReference<StreamEngineConnectionState> connections; + + /** + * Used to implement publish/subscribe behavior for triggering budget refreshes/redistribution. + * Subscriber {@link #budgetRefreshExecutor} will either redistribute the budget if manually + * triggered or more than {@link #SCHEDULED_BUDGET_REFRESH_MILLIS} time has passed. + */ + private final AdvancingPhaser budgetRefreshTrigger; + + private final ScheduledExecutorService budgetRefreshExecutor; + private final AtomicReference<Instant> lastBudgetRefresh; + private final ThrottleTimer getWorkerMetadataThrottleTimer; + private final CountDownLatch getWorkerMetadataReady; + private final ExecutorService consumeWorkerMetadataExecutor; + private final long clientId; + /** + * Reference to {@link GetWorkerMetadataStream} that is lazily initialized, with its initial value + * being null. + */ + private volatile @Nullable GetWorkerMetadataStream getWorkerMetadataStream; + + private StreamingEngineClient( + JobHeader jobHeader, + GetWorkBudget totalGetWorkBudget, + AtomicReference<StreamEngineConnectionState> connections, + StreamingEngineStreamFactory streamingEngineStreamFactory, + WorkItemReceiver workItemReceiver, + WindmillGrpcStubFactory windmillGrpcStubFactory, + GetWorkBudgetDistributor getWorkBudgetDistributor, + DispatcherClient dispatcherClient) { + this.jobHeader = jobHeader; + this.totalGetWorkBudget = totalGetWorkBudget; + this.started = new AtomicBoolean(); + this.streamingEngineStreamFactory = streamingEngineStreamFactory; + this.workItemReceiver = workItemReceiver; + this.connections = connections; + this.windmillGrpcStubFactory = windmillGrpcStubFactory; + this.getWorkBudgetDistributor = getWorkBudgetDistributor; + this.dispatcherClient = dispatcherClient; + this.isBudgetRefreshPaused = new AtomicBoolean(false); + this.budgetRefreshTrigger = new AdvancingPhaser(1); + this.getWorkerMetadataThrottleTimer = new ThrottleTimer(); + this.budgetRefreshExecutor = createSingleThreadedExecutor(BUDGET_REFRESH_THREAD); + this.consumeWorkerMetadataExecutor = + createSingleThreadedExecutor(CONSUMER_WORKER_METADATA_THREAD); + this.getWorkerMetadataStream = null; + this.getWorkerMetadataReady = new CountDownLatch(1); + this.clientId = new Random().nextLong(); + this.lastBudgetRefresh = new AtomicReference<>(Instant.EPOCH); + } + + /** + * Creates an instance of {@link StreamingEngineClient} and starts the {@link + * GetWorkerMetadataStream} with an RPC to the StreamingEngine backend. {@link + * GetWorkerMetadataStream} will populate {@link #connections} when a response is received. Calls + * to {@link #startAndCacheStreams()} will block until {@link #connections} are populated. + */ + public static StreamingEngineClient create( + JobHeader jobHeader, + GetWorkBudget totalGetWorkBudget, + StreamingEngineStreamFactory streamingEngineStreamFactory, + WorkItemReceiver workItemReceiver, + WindmillGrpcStubFactory windmillGrpcStubFactory, + GetWorkBudgetDistributor getWorkBudgetDistributor, + DispatcherClient dispatcherClient) { + StreamingEngineClient streamingEngineClient = + new StreamingEngineClient( + jobHeader, + totalGetWorkBudget, + new AtomicReference<>(StreamEngineConnectionState.EMPTY), + streamingEngineStreamFactory, + workItemReceiver, + windmillGrpcStubFactory, + getWorkBudgetDistributor, + dispatcherClient); + streamingEngineClient.startGetWorkerMetadataStream(); + return streamingEngineClient; + } + + @VisibleForTesting + static StreamingEngineClient forTesting( + JobHeader jobHeader, + GetWorkBudget totalGetWorkBudget, + AtomicReference<StreamEngineConnectionState> connections, + StreamingEngineStreamFactory streamingEngineStreamFactory, + WorkItemReceiver workItemReceiver, + WindmillGrpcStubFactory windmillGrpcStubFactory, + GetWorkBudgetDistributor getWorkBudgetDistributor, + DispatcherClient dispatcherClient) { + StreamingEngineClient streamingEngineClient = + new StreamingEngineClient( + jobHeader, + totalGetWorkBudget, + connections, + streamingEngineStreamFactory, + workItemReceiver, + windmillGrpcStubFactory, + getWorkBudgetDistributor, + dispatcherClient); + streamingEngineClient.startGetWorkerMetadataStream(); + return streamingEngineClient; + } + + private static ScheduledExecutorService createSingleThreadedExecutor(String threadName) { + return Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat(threadName) + .setUncaughtExceptionHandler( + (t, e) -> + LOG.error( + "{} failed due to uncaught exception during execution. ", t.getName(), e)) + .build()); + } + + /** + * Starts the streams with the {@link #connections} values. Does nothing if this has already been + * called. + * + * @throws IllegalArgumentException if trying to start before {@link #connections} are set with + * {@link GetWorkerMetadataStream}. + */ + public void startAndCacheStreams() { + // Do nothing if we have already initialized the initial streams. + if (!started.compareAndSet(false, true)) { + return; + } + waitForFirstStreamingEngineEndpoints(); + StreamEngineConnectionState currentConnectionsState = connections.get(); + Preconditions.checkState( + !StreamEngineConnectionState.EMPTY.equals(currentConnectionsState), + "Cannot start streams without connections."); + LOG.info("Starting initial GetWorkStreams with connections={}", currentConnectionsState); + ImmutableCollection<WindmillStreamSender> windmillStreamSenders = + currentConnectionsState.windmillStreams().values(); + getWorkBudgetDistributor.distributeBudget( + currentConnectionsState.windmillStreams().values(), totalGetWorkBudget); + lastBudgetRefresh.compareAndSet(Instant.EPOCH, Instant.now()); + windmillStreamSenders.forEach(WindmillStreamSender::startStreams); + startBudgetRefreshThread(); + } + + private void waitForFirstStreamingEngineEndpoints() { + try { + getWorkerMetadataReady.await(); + } catch (InterruptedException e) { + throw new StreamingEngineClientException( + "Error occurred waiting for StreamingEngine backend endpoints.", e); + } + } + + /** + * {@link java.util.function.Consumer<WindmillEndpoints>} used to update {@link #connections} on + * new backend worker metadata. + */ + private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWindmillEndpoints) { + isBudgetRefreshPaused.set(true); + LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints); + ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections = + createNewWindmillConnections(ImmutableSet.copyOf(newWindmillEndpoints.windmillEndpoints())); + ImmutableMap<WindmillConnection, WindmillStreamSender> newWindmillStreams = + closeStaleStreamsAndCreateNewStreams(ImmutableSet.copyOf(newWindmillConnections.values())); + ImmutableMap<Endpoint, Supplier<GetDataStream>> newGlobalDataStreams = + createNewGlobalDataStreams( + ImmutableSet.copyOf(newWindmillEndpoints.globalDataEndpoints().values())); + + StreamEngineConnectionState newConnectionsState = + StreamEngineConnectionState.builder() + .setWindmillConnections(newWindmillConnections) + .setWindmillStreams(newWindmillStreams) + .setGlobalDataEndpoints(newWindmillEndpoints.globalDataEndpoints()) + .setGlobalDataStreams(newGlobalDataStreams) + .build(); + + LOG.info( + "Setting new connections: {}. Previous connections: {}.", + newConnectionsState, + connections.get()); + connections.set(newConnectionsState); + isBudgetRefreshPaused.set(false); + + // On first worker metadata. Trigger + if (getWorkerMetadataReady.getCount() > 0) { + getWorkerMetadataReady.countDown(); + } else { + requestBudgetRefresh(); + } + } + + public ImmutableList<Long> getAndResetThrottleTimes() { + StreamEngineConnectionState currentConnections = connections.get(); + + ImmutableList<Long> keyedWorkStreamThrottleTimes = + currentConnections.windmillStreams().values().stream() + .map(WindmillStreamSender::getAndResetThrottleTime) + .collect(toImmutableList()); + + return ImmutableList.<Long>builder() + .add(getWorkerMetadataThrottleTimer.getAndResetThrottleTime()) + .addAll(keyedWorkStreamThrottleTimes) + .build(); + } + + /** Starts {@link GetWorkerMetadataStream}. */ + @SuppressWarnings({ + "FutureReturnValueIgnored", // ignoring Future returned from Executor.submit() + "nullness" // Uninitialized value of getWorkerMetadataStream is null. + }) + private void startGetWorkerMetadataStream() { + // We only want to set and start this value once. + if (getWorkerMetadataStream == null) { + synchronized (this) { + if (getWorkerMetadataStream == null) { + getWorkerMetadataStream = + streamingEngineStreamFactory.createGetWorkerMetadataStream( + dispatcherClient.getDispatcherStub(), + getWorkerMetadataThrottleTimer, + endpoints -> + consumeWorkerMetadataExecutor.submit( + () -> consumeWindmillWorkerEndpoints(endpoints))); + } + } + } + } + + @SuppressWarnings("FutureReturnValueIgnored") + private void startBudgetRefreshThread() { + // Only 1 task will run at a time from the ScheduledExecutor documentation: If any + // execution of this task takes longer than its period, then subsequent executions may start + // late, but will not concurrently execute. + budgetRefreshExecutor.scheduleAtFixedRate( + this::refreshBudget, + // No need to initially wait since refreshBudget will wait before redistributing the budget. + 0, + // Do not wait between successive executions. This should be constantly running. + 0, + TimeUnit.MILLISECONDS); + } + + private void refreshBudget() { + if (isBudgetRefreshPaused.get() || !started.get()) { + return; + } + + // Phaser.awaitAdvanceInterruptibly() returns a negative value if the phaser is + // terminated. + if (waitForBudgetRefreshTrigger() < 0) { + return; + } + + redistributeBudget(); + } + + private int waitForBudgetRefreshTrigger() { + try { + return budgetRefreshTrigger.awaitAdvanceInterruptibly( + 0, SCHEDULED_BUDGET_REFRESH_MILLIS, TimeUnit.MILLISECONDS); + } catch (InterruptedException | TimeoutException e) { + // If there is an error, we will proceed with refreshing the budget. + LOG.error("Error occurred waiting for budget refresh.", e); Review Comment: timeout should not be considered an error, we don't necessarily expect to be triggered more often than timeout. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
