m-trieu commented on code in PR #28835: URL: https://github.com/apache/beam/pull/28835#discussion_r1369265970
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamingEngineClient.java: ########## @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList.toImmutableList; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap; + +import com.google.errorprone.annotations.CheckReturnValue; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; +import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkerMetadataStream; +import org.apache.beam.runners.dataflow.worker.windmill.grpcclient.ThrottleTimer; +import org.apache.beam.runners.dataflow.worker.windmill.util.WindmillGrpcStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; +import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget; +import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributor; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Client for StreamingEngine. Given a {@link GetWorkBudget}, divides the budget and starts the + * {@link org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream}(s). + */ +@CheckReturnValue +@ThreadSafe +public class StreamingEngineClient { + @VisibleForTesting static final int SCHEDULED_BUDGET_REFRESH_MILLIS = 100; + private static final Logger LOG = LoggerFactory.getLogger(StreamingEngineClient.class); + private static final String SCHEDULED_BUDGET_REFRESH_THREAD = "ScheduledBudgetRefreshThread"; + private static final String TRIGGERED_BUDGET_REFRESH_THREAD = "TriggeredBudgetRefreshThread"; + private static final String CONSUMER_WORKER_METADATA_THREAD = "ConsumeWorkerMetadataThread"; + private final AtomicBoolean started; + private final AtomicBoolean running; + private final GetWorkBudget totalGetWorkBudget; + private final StreamingEngineStreamFactory streamingEngineStreamFactory; + private final WorkItemReceiver workItemReceiver; + private final WindmillGrpcStubFactory windmillGrpcStubFactory; + private final GetWorkRequest getWorkRequest; + private final GetWorkBudgetDistributor getWorkBudgetDistributor; + private final DispatcherClient dispatcherClient; + /** + * Reference to {@link GetWorkerMetadataStream} that is lazily initialized, with its initial value + * being null. + */ + private final AtomicReference<GetWorkerMetadataStream> getWorkerMetadataStream; + + private final AtomicBoolean isBudgetRefreshPaused; + /** Writes are guarded by synchronization, reads are lock free. */ + private final AtomicReference<StreamEngineConnectionState> connections; + + private final ExecutorService triggeredBudgetRefreshListener; + /** + * Used to implement publish/subscribe behavior for triggering budget refreshes/redistribution. + * Subscriber {@link #triggeredBudgetRefreshListener} waits for messages to be written to the + * queue, blocking if necessary. + */ + private final SynchronousQueue<Boolean> budgetRefreshTrigger; + + /** Redistributes the budget on a timed cadence. */ + private final ScheduledExecutorService scheduledBudgetRefreshExecutor; + + private final ThrottleTimer getWorkerMetadataThrottleTimer; + private final CountDownLatch getWorkerMetadataReady; + private final ExecutorService consumeWorkerMetadataExecutor; + + private StreamingEngineClient( + AtomicBoolean running, + GetWorkBudget totalGetWorkBudget, + AtomicReference<StreamEngineConnectionState> connections, + StreamingEngineStreamFactory streamingEngineStreamFactory, + WorkItemReceiver workItemReceiver, + WindmillGrpcStubFactory windmillGrpcStubFactory, + GetWorkRequest getWorkRequest, + GetWorkBudgetDistributor getWorkBudgetDistributor, + DispatcherClient dispatcherClient) { + this.started = new AtomicBoolean(); + this.running = running; + this.totalGetWorkBudget = totalGetWorkBudget; + this.streamingEngineStreamFactory = streamingEngineStreamFactory; + this.workItemReceiver = workItemReceiver; + this.connections = connections; + this.windmillGrpcStubFactory = windmillGrpcStubFactory; + this.getWorkRequest = getWorkRequest; + this.getWorkBudgetDistributor = getWorkBudgetDistributor; + this.dispatcherClient = dispatcherClient; + this.isBudgetRefreshPaused = new AtomicBoolean(false); + this.budgetRefreshTrigger = new SynchronousQueue<>(/* fair= */ true); + this.getWorkerMetadataThrottleTimer = new ThrottleTimer(); + this.scheduledBudgetRefreshExecutor = + createSingleThreadedExecutor(SCHEDULED_BUDGET_REFRESH_THREAD); + this.consumeWorkerMetadataExecutor = + createSingleThreadedExecutor(CONSUMER_WORKER_METADATA_THREAD); + this.triggeredBudgetRefreshListener = + createSingleThreadedExecutor(TRIGGERED_BUDGET_REFRESH_THREAD); + this.getWorkerMetadataStream = new AtomicReference<>(); + this.getWorkerMetadataReady = new CountDownLatch(1); + } + + public static StreamingEngineClient create( Review Comment: added some comments about the initial state and what the method does besides create an instance (i.e starts GetWorkerMetadataStream) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
