scwhittle commented on code in PR #30312: URL: https://github.com/apache/beam/pull/30312#discussion_r1507507623
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.commits; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Supplier; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; +import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey; +import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue; +import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.WorkId; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkRequest; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Streaming appliance implementation of {@link WorkCommitter}. */ +@ThreadSafe +final class StreamingApplianceWorkCommitter implements WorkCommitter { + private static final Logger LOG = LoggerFactory.getLogger(StreamingApplianceWorkCommitter.class); + private static final long TARGET_COMMIT_BUNDLE_BYTES = 32 << 20; + + private final Consumer<CommitWorkRequest> commitWork; + private final WeightedBoundedQueue<Commit> commitQueue; + private final ExecutorService commitWorkers; + private final AtomicLong activeCommitBytes; + private final Supplier<Boolean> shouldCommitWork; + private final int numCommitWorkers; + + private StreamingApplianceWorkCommitter( + Consumer<CommitWorkRequest> commitWork, + int numCommitWorkers, + Supplier<Boolean> shouldCommitWork) { + this.commitWork = commitWork; + this.commitQueue = + WeightedBoundedQueue.create( + MAX_COMMIT_QUEUE_BYTES, commit -> Math.min(MAX_COMMIT_QUEUE_BYTES, commit.getSize())); + this.commitWorkers = + Executors.newFixedThreadPool( + numCommitWorkers, + new ThreadFactoryBuilder() + .setDaemon(true) + .setPriority(Thread.MAX_PRIORITY) + .setNameFormat("CommitThread-%d") + .build()); + this.activeCommitBytes = new AtomicLong(); + this.shouldCommitWork = shouldCommitWork; + this.numCommitWorkers = numCommitWorkers; + } + + static StreamingApplianceWorkCommitter create( + Consumer<CommitWorkRequest> commitWork, + int numCommitWorkers, + Supplier<Boolean> shouldCommitWork, + CountDownLatch ready) { + StreamingApplianceWorkCommitter workCommitter = + new StreamingApplianceWorkCommitter(commitWork, numCommitWorkers, shouldCommitWork); + workCommitter.startCommitWorkers(ready); + return workCommitter; + } + + @Override + public void commit(Commit commit) { + commitQueue.put(commit); + } + + @Override + public long currentActiveCommitBytes() { + return activeCommitBytes.get(); + } + + @Override + public void stop() { + commitWorkers.shutdownNow(); + } + + @Override + public int parallelism() { + return numCommitWorkers; + } + + @SuppressWarnings("FutureReturnValueIgnored") + private void startCommitWorkers(CountDownLatch ready) { + for (int i = 0; i < numCommitWorkers; i++) { + commitWorkers.submit( + () -> { + while (true) { + try { + ready.await(); Review Comment: can we get rid of this countdownlatch? It seems that we'd just fall through to blocking on the commitQueue which is contained in this class. Alternatively if that doesn't work for some reason, having some explicit start method on the committer interface seems better than passing in this latch that does so out-of-band. ditto for the SE impl ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/CloseableStream.java: ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client; + +import com.google.auto.value.AutoValue; +import java.util.function.Supplier; + +/** + * Wrapper for a {@link WindmillStream} that allows callers to tie an action after the stream is + * finished being used. Has an option for closing code to be a no-op. + */ +@AutoValue +public abstract class CloseableStream<StreamT extends WindmillStream> implements AutoCloseable { + public static <StreamT extends WindmillStream> CloseableStream<StreamT> create( + Supplier<StreamT> stream, Runnable onClose) { + return new AutoValue_CloseableStream<>(stream, onClose); + } + + public static <StreamT extends WindmillStream> CloseableStream<StreamT> create( + Supplier<StreamT> stream) { + return create(stream, () -> {}); + } + + public abstract Supplier<StreamT> stream(); Review Comment: seems like this should be StreamT, otherwise we could bind a supplier that creates distinct streams on each call, and we're only closing it once. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -446,6 +422,26 @@ public void run() { .build(); this.sideInputStateFetcher = new SideInputStateFetcher(metricTrackingWindmillServer, options); + int numCommitThreads = 1; + if (windmillServiceEnabled && options.getWindmillServiceCommitThreads() > 0) { + numCommitThreads = options.getWindmillServiceCommitThreads(); + } + + this.started = new CountDownLatch(1); + this.workCommitter = + windmillServiceEnabled + ? WorkCommitters.createStreamingEngineWorkCommitter( + WindmillStreamPool.create( + NUM_COMMIT_STREAMS, COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream) + ::getCloseableStream, + numCommitThreads, + running::get, + this::onStreamingCommitFailed, + this::onStreamingCommitComplete, + started) + : WorkCommitters.createApplianceWorkCommitter( + windmillServer::commitWork, numCommitThreads, running::get, started); Review Comment: the commit threads is always 1 for appliance, should we remove the param? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/WorkCommitters.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.commits; + +import java.util.concurrent.CountDownLatch; +import java.util.function.Consumer; +import java.util.function.Supplier; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream; +import org.apache.beam.sdk.annotations.Internal; + +/** Utility class for creating {@link WorkCommitter} implementations. */ +@Internal +public final class WorkCommitters { Review Comment: This class doesn't seem to add much compared to calling the create methods directly IMO. Since these are internal things I don't think we have to worry much about increasing visibility by grouping different implementations. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.commits; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Supplier; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; +import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey; +import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue; +import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.WorkId; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkRequest; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Streaming appliance implementation of {@link WorkCommitter}. */ +@ThreadSafe Review Comment: mark all these classes internal ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.commits; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Supplier; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; +import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey; +import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue; +import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.WorkId; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkRequest; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Streaming appliance implementation of {@link WorkCommitter}. */ +@ThreadSafe +final class StreamingApplianceWorkCommitter implements WorkCommitter { + private static final Logger LOG = LoggerFactory.getLogger(StreamingApplianceWorkCommitter.class); + private static final long TARGET_COMMIT_BUNDLE_BYTES = 32 << 20; + + private final Consumer<CommitWorkRequest> commitWork; + private final WeightedBoundedQueue<Commit> commitQueue; + private final ExecutorService commitWorkers; + private final AtomicLong activeCommitBytes; + private final Supplier<Boolean> shouldCommitWork; + private final int numCommitWorkers; + + private StreamingApplianceWorkCommitter( + Consumer<CommitWorkRequest> commitWork, + int numCommitWorkers, + Supplier<Boolean> shouldCommitWork) { + this.commitWork = commitWork; + this.commitQueue = + WeightedBoundedQueue.create( + MAX_COMMIT_QUEUE_BYTES, commit -> Math.min(MAX_COMMIT_QUEUE_BYTES, commit.getSize())); + this.commitWorkers = + Executors.newFixedThreadPool( + numCommitWorkers, + new ThreadFactoryBuilder() + .setDaemon(true) + .setPriority(Thread.MAX_PRIORITY) + .setNameFormat("CommitThread-%d") + .build()); + this.activeCommitBytes = new AtomicLong(); + this.shouldCommitWork = shouldCommitWork; + this.numCommitWorkers = numCommitWorkers; + } + + static StreamingApplianceWorkCommitter create( + Consumer<CommitWorkRequest> commitWork, + int numCommitWorkers, + Supplier<Boolean> shouldCommitWork, + CountDownLatch ready) { + StreamingApplianceWorkCommitter workCommitter = + new StreamingApplianceWorkCommitter(commitWork, numCommitWorkers, shouldCommitWork); + workCommitter.startCommitWorkers(ready); + return workCommitter; + } + + @Override + public void commit(Commit commit) { + commitQueue.put(commit); + } + + @Override + public long currentActiveCommitBytes() { + return activeCommitBytes.get(); + } + + @Override + public void stop() { + commitWorkers.shutdownNow(); + } + + @Override + public int parallelism() { + return numCommitWorkers; + } + + @SuppressWarnings("FutureReturnValueIgnored") + private void startCommitWorkers(CountDownLatch ready) { + for (int i = 0; i < numCommitWorkers; i++) { + commitWorkers.submit( + () -> { + while (true) { + try { + ready.await(); + break; + } catch (InterruptedException ignore) { + } + } + commitLoop(); + }); + } + } + + private void commitLoop() { + Map<ComputationState, Windmill.ComputationCommitWorkRequest.Builder> computationRequestMap = + new HashMap<>(); + while (shouldCommitWork.get()) { + computationRequestMap.clear(); + CommitWorkRequest.Builder commitRequestBuilder = CommitWorkRequest.newBuilder(); + long commitBytes = 0; + // Block until we have a commit, then batch with additional commits. + Commit commit; + try { + commit = commitQueue.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + continue; + } + while (commit != null) { + ComputationState computationState = commit.computationState(); + commit.work().setState(Work.State.COMMITTING); + Windmill.ComputationCommitWorkRequest.Builder computationRequestBuilder = + computationRequestMap.get(computationState); + if (computationRequestBuilder == null) { + computationRequestBuilder = commitRequestBuilder.addRequestsBuilder(); + computationRequestBuilder.setComputationId(computationState.getComputationId()); + computationRequestMap.put(computationState, computationRequestBuilder); + } + computationRequestBuilder.addRequests(commit.request()); + // Send the request if we've exceeded the bytes or there is no more + // pending work. commitBytes is a long, so this cannot overflow. + commitBytes += commit.getSize(); + if (commitBytes >= TARGET_COMMIT_BUNDLE_BYTES) { + break; + } + commit = commitQueue.poll(); + } + commitWork(commitRequestBuilder.build(), commitBytes); + completeWork(computationRequestMap); + } + } + + private void commitWork(CommitWorkRequest commitRequest, long commitBytes) { + LOG.trace("Commit: {}", commitRequest); + activeCommitBytes.addAndGet(commitBytes); + commitWork.accept(commitRequest); + activeCommitBytes.addAndGet(-commitBytes); + } + + private void completeWork( Review Comment: it seems like it woudl be better if this went through somethign like StreamingDataflowWorker.onStreamingCommitComplete handler instead of poking into the computationstate directly. I think you could reuse the existing method, doesn't really seem streaming specific. not using the computationState seems like it will make it easier to test as well. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/WorkCommitter.java: ########## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.commits; + +import org.apache.beam.sdk.annotations.Internal; + +/** + * Commits {@link org.apache.beam.runners.dataflow.worker.streaming.Work} that has completed user + * processing back to persistence layer. + */ +@Internal +public interface WorkCommitter { + int MAX_COMMIT_QUEUE_BYTES = 500 << 20; // 500MB + + void commit(Commit commit); Review Comment: add comment, notably that this may block ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/WorkCommitter.java: ########## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.commits; + +import org.apache.beam.sdk.annotations.Internal; + +/** + * Commits {@link org.apache.beam.runners.dataflow.worker.streaming.Work} that has completed user + * processing back to persistence layer. + */ +@Internal +public interface WorkCommitter { + int MAX_COMMIT_QUEUE_BYTES = 500 << 20; // 500MB Review Comment: remove? seems like this can be up to implementations and we might not want a constant across implementaions longer-term ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/WorkCommitter.java: ########## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.commits; + +import org.apache.beam.sdk.annotations.Internal; + +/** + * Commits {@link org.apache.beam.runners.dataflow.worker.streaming.Work} that has completed user + * processing back to persistence layer. + */ +@Internal Review Comment: mark threadsafe ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.commits; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Supplier; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; +import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey; +import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue; +import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.WorkId; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkRequest; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Streaming appliance implementation of {@link WorkCommitter}. */ +@ThreadSafe +final class StreamingApplianceWorkCommitter implements WorkCommitter { + private static final Logger LOG = LoggerFactory.getLogger(StreamingApplianceWorkCommitter.class); + private static final long TARGET_COMMIT_BUNDLE_BYTES = 32 << 20; + + private final Consumer<CommitWorkRequest> commitWork; + private final WeightedBoundedQueue<Commit> commitQueue; + private final ExecutorService commitWorkers; + private final AtomicLong activeCommitBytes; + private final Supplier<Boolean> shouldCommitWork; + private final int numCommitWorkers; + + private StreamingApplianceWorkCommitter( + Consumer<CommitWorkRequest> commitWork, + int numCommitWorkers, + Supplier<Boolean> shouldCommitWork) { + this.commitWork = commitWork; + this.commitQueue = + WeightedBoundedQueue.create( + MAX_COMMIT_QUEUE_BYTES, commit -> Math.min(MAX_COMMIT_QUEUE_BYTES, commit.getSize())); + this.commitWorkers = + Executors.newFixedThreadPool( Review Comment: if we hard-code the single thread, this coudl be a singlethreadedexecutor ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/WorkCommitter.java: ########## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.commits; + +import org.apache.beam.sdk.annotations.Internal; + +/** + * Commits {@link org.apache.beam.runners.dataflow.worker.streaming.Work} that has completed user + * processing back to persistence layer. + */ +@Internal +public interface WorkCommitter { + int MAX_COMMIT_QUEUE_BYTES = 500 << 20; // 500MB + + void commit(Commit commit); + + long currentActiveCommitBytes(); + + void stop(); Review Comment: comment that this may cancel/drop added commits? ########## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java: ########## @@ -73,7 +73,7 @@ import org.slf4j.LoggerFactory; /** An in-memory Windmill server that offers provided work and data. */ -final class FakeWindmillServer extends WindmillServerStub { +public class FakeWindmillServer extends WindmillServerStub { Review Comment: can you keep this final? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.commits; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Supplier; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue; +import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Streaming engine implementation of {@link WorkCommitter}. Commits work back to Streaming Engine + * backend. + */ +@ThreadSafe +final class StreamingEngineWorkCommitter implements WorkCommitter { + private static final Logger LOG = LoggerFactory.getLogger(StreamingEngineWorkCommitter.class); + private static final int COMMIT_BATCH_SIZE = 5; Review Comment: TARGET_COMMIT_BATCH_SIZE? we can generate larger batches we just sleep a little if we're below this size. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.commits; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Supplier; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue; +import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Streaming engine implementation of {@link WorkCommitter}. Commits work back to Streaming Engine + * backend. + */ +@ThreadSafe +final class StreamingEngineWorkCommitter implements WorkCommitter { + private static final Logger LOG = LoggerFactory.getLogger(StreamingEngineWorkCommitter.class); + private static final int COMMIT_BATCH_SIZE = 5; + + private final Supplier<CloseableStream<CommitWorkStream>> commitWorkStreamFactory; + private final WeightedBoundedQueue<Commit> commitQueue; + private final ExecutorService commitSenders; + private final AtomicLong activeCommitBytes; + private final Supplier<Boolean> shouldCommitWork; + private final Consumer<Commit> onFailedCommit; + private final Consumer<CompleteCommit> onCommitComplete; + private final int numCommitSenders; + + private StreamingEngineWorkCommitter( + Supplier<CloseableStream<CommitWorkStream>> commitWorkStreamFactory, + int numCommitSenders, + Supplier<Boolean> shouldCommitWork, + Consumer<Commit> onFailedCommit, + Consumer<CompleteCommit> onCommitComplete) { + this.commitWorkStreamFactory = commitWorkStreamFactory; + this.commitQueue = + WeightedBoundedQueue.create( + MAX_COMMIT_QUEUE_BYTES, commit -> Math.min(MAX_COMMIT_QUEUE_BYTES, commit.getSize())); + this.commitSenders = + Executors.newFixedThreadPool( + numCommitSenders, + new ThreadFactoryBuilder() + .setDaemon(true) + .setPriority(Thread.MAX_PRIORITY) + .setNameFormat("CommitThread-%d") + .build()); + this.activeCommitBytes = new AtomicLong(); + this.shouldCommitWork = shouldCommitWork; + this.onFailedCommit = onFailedCommit; + this.onCommitComplete = onCommitComplete; + this.numCommitSenders = numCommitSenders; + } + + static StreamingEngineWorkCommitter create( + Supplier<CloseableStream<CommitWorkStream>> commitWorkStreamFactory, + int numCommitSenders, + Supplier<Boolean> shouldCommitWork, + Consumer<Commit> onFailedCommit, + Consumer<CompleteCommit> onCommitComplete, + CountDownLatch ready) { + StreamingEngineWorkCommitter workCommitter = + new StreamingEngineWorkCommitter( + commitWorkStreamFactory, + numCommitSenders, + shouldCommitWork, + onFailedCommit, + onCommitComplete); + workCommitter.startCommitSenders(ready); + return workCommitter; + } + + @Override + public void commit(Commit commit) { + commitQueue.put(commit); + } + + @Override + public long currentActiveCommitBytes() { + return activeCommitBytes.get(); + } + + @Override + public void stop() { + commitSenders.shutdownNow(); + } + + @Override + public int parallelism() { + return numCommitSenders; + } + + @SuppressWarnings("FutureReturnValueIgnored") + private void startCommitSenders(CountDownLatch ready) { + for (int i = 0; i < numCommitSenders; i++) { + commitSenders.submit( + () -> { + while (true) { + try { + ready.await(); + break; + } catch (InterruptedException ignore) { + } + } + streamingCommitLoop(); + }); + } + } + + private void streamingCommitLoop() { + Commit initialCommit = null; + while (shouldCommitWork.get()) { + if (initialCommit == null) { + try { + initialCommit = commitQueue.take(); + } catch (InterruptedException e) { + continue; + } + } + + try (CloseableStream<CommitWorkStream> closeableCommitStream = + commitWorkStreamFactory.get()) { + CommitWorkStream commitStream = closeableCommitStream.stream().get(); + if (!tryAddToCommitStream(initialCommit, commitStream)) { + throw new AssertionError("Initial commit on flushed stream should always be accepted."); + } + // Batch additional commits to the stream and possibly make an un-batched commit the next + // initial commit. + initialCommit = batchCommitsToStream(commitStream); + commitStream.flush(); + } catch (Exception e) { + LOG.error("Error occurred fetching a CommitWorkStream.", e); + } + } + } + + /** Adds the commit to the commitStream if it fits, returning true if it is consumed. */ + private boolean tryAddToCommitStream(Commit commit, CommitWorkStream commitStream) { + Preconditions.checkNotNull(commit); + // Drop commits for failed work. Such commits will be dropped by Windmill anyway. + if (commit.work().isFailed()) { Review Comment: seems better to do this when getting the initial commit in the streamingCommitLoop and in batchCommitsToStream otherwise we go onto batchCommitsToStream in a weird state that there was no commit added to the stream or we may count commits in batchCommitsToStream which weren't actually added. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/CompleteCommit.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.commits; + +import com.google.auto.value.AutoValue; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitStatus; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; + +/** + * A {@link Commit} is marked as complete when it has been attempted to be committed back to + * Streaming Engine/Appliance via {@link + * org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub#commitWorkStream(StreamObserver)} + * for Streaming Engine or {@link + * org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub#commitWork(Windmill.CommitWorkRequest, + * StreamObserver)} for Streaming Appliance. + */ +@AutoValue +public abstract class CompleteCommit { + + public static CompleteCommit create(Commit commit, CommitStatus commitStatus) { Review Comment: it might be better if this didn't have the full commit. If we just need things like tokens and the computationState we can just have those. The commit itself may have large protos in it and if we may be able to drop those from memory earlier if we don't need to save them for this completion. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.commits; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Supplier; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; +import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey; +import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue; +import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.WorkId; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkRequest; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Streaming appliance implementation of {@link WorkCommitter}. */ +@ThreadSafe +final class StreamingApplianceWorkCommitter implements WorkCommitter { + private static final Logger LOG = LoggerFactory.getLogger(StreamingApplianceWorkCommitter.class); + private static final long TARGET_COMMIT_BUNDLE_BYTES = 32 << 20; + + private final Consumer<CommitWorkRequest> commitWork; + private final WeightedBoundedQueue<Commit> commitQueue; + private final ExecutorService commitWorkers; + private final AtomicLong activeCommitBytes; + private final Supplier<Boolean> shouldCommitWork; + private final int numCommitWorkers; + + private StreamingApplianceWorkCommitter( + Consumer<CommitWorkRequest> commitWork, + int numCommitWorkers, + Supplier<Boolean> shouldCommitWork) { + this.commitWork = commitWork; + this.commitQueue = + WeightedBoundedQueue.create( + MAX_COMMIT_QUEUE_BYTES, commit -> Math.min(MAX_COMMIT_QUEUE_BYTES, commit.getSize())); + this.commitWorkers = + Executors.newFixedThreadPool( + numCommitWorkers, + new ThreadFactoryBuilder() + .setDaemon(true) + .setPriority(Thread.MAX_PRIORITY) + .setNameFormat("CommitThread-%d") + .build()); + this.activeCommitBytes = new AtomicLong(); + this.shouldCommitWork = shouldCommitWork; + this.numCommitWorkers = numCommitWorkers; + } + + static StreamingApplianceWorkCommitter create( + Consumer<CommitWorkRequest> commitWork, + int numCommitWorkers, + Supplier<Boolean> shouldCommitWork, + CountDownLatch ready) { + StreamingApplianceWorkCommitter workCommitter = + new StreamingApplianceWorkCommitter(commitWork, numCommitWorkers, shouldCommitWork); + workCommitter.startCommitWorkers(ready); + return workCommitter; + } + + @Override + public void commit(Commit commit) { + commitQueue.put(commit); + } + + @Override + public long currentActiveCommitBytes() { + return activeCommitBytes.get(); + } + + @Override + public void stop() { + commitWorkers.shutdownNow(); + } + + @Override + public int parallelism() { + return numCommitWorkers; + } + + @SuppressWarnings("FutureReturnValueIgnored") + private void startCommitWorkers(CountDownLatch ready) { + for (int i = 0; i < numCommitWorkers; i++) { + commitWorkers.submit( + () -> { + while (true) { + try { + ready.await(); + break; + } catch (InterruptedException ignore) { + } + } + commitLoop(); + }); + } + } + + private void commitLoop() { + Map<ComputationState, Windmill.ComputationCommitWorkRequest.Builder> computationRequestMap = + new HashMap<>(); + while (shouldCommitWork.get()) { Review Comment: Instead of injecting this bool to poll just have some internal state that is set by stop() or better yet just rely on the interrupt that stop triggers. ditto for the SE impl ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.commits; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Supplier; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue; +import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Streaming engine implementation of {@link WorkCommitter}. Commits work back to Streaming Engine + * backend. + */ +@ThreadSafe +final class StreamingEngineWorkCommitter implements WorkCommitter { + private static final Logger LOG = LoggerFactory.getLogger(StreamingEngineWorkCommitter.class); + private static final int COMMIT_BATCH_SIZE = 5; + + private final Supplier<CloseableStream<CommitWorkStream>> commitWorkStreamFactory; + private final WeightedBoundedQueue<Commit> commitQueue; + private final ExecutorService commitSenders; + private final AtomicLong activeCommitBytes; + private final Supplier<Boolean> shouldCommitWork; + private final Consumer<Commit> onFailedCommit; + private final Consumer<CompleteCommit> onCommitComplete; + private final int numCommitSenders; + + private StreamingEngineWorkCommitter( + Supplier<CloseableStream<CommitWorkStream>> commitWorkStreamFactory, + int numCommitSenders, + Supplier<Boolean> shouldCommitWork, + Consumer<Commit> onFailedCommit, + Consumer<CompleteCommit> onCommitComplete) { + this.commitWorkStreamFactory = commitWorkStreamFactory; + this.commitQueue = + WeightedBoundedQueue.create( + MAX_COMMIT_QUEUE_BYTES, commit -> Math.min(MAX_COMMIT_QUEUE_BYTES, commit.getSize())); + this.commitSenders = + Executors.newFixedThreadPool( + numCommitSenders, + new ThreadFactoryBuilder() + .setDaemon(true) + .setPriority(Thread.MAX_PRIORITY) + .setNameFormat("CommitThread-%d") + .build()); + this.activeCommitBytes = new AtomicLong(); + this.shouldCommitWork = shouldCommitWork; + this.onFailedCommit = onFailedCommit; + this.onCommitComplete = onCommitComplete; + this.numCommitSenders = numCommitSenders; + } + + static StreamingEngineWorkCommitter create( + Supplier<CloseableStream<CommitWorkStream>> commitWorkStreamFactory, + int numCommitSenders, + Supplier<Boolean> shouldCommitWork, + Consumer<Commit> onFailedCommit, + Consumer<CompleteCommit> onCommitComplete, + CountDownLatch ready) { + StreamingEngineWorkCommitter workCommitter = + new StreamingEngineWorkCommitter( + commitWorkStreamFactory, + numCommitSenders, + shouldCommitWork, + onFailedCommit, + onCommitComplete); + workCommitter.startCommitSenders(ready); + return workCommitter; + } + + @Override + public void commit(Commit commit) { + commitQueue.put(commit); + } + + @Override + public long currentActiveCommitBytes() { + return activeCommitBytes.get(); + } + + @Override + public void stop() { + commitSenders.shutdownNow(); + } + + @Override + public int parallelism() { + return numCommitSenders; + } + + @SuppressWarnings("FutureReturnValueIgnored") + private void startCommitSenders(CountDownLatch ready) { + for (int i = 0; i < numCommitSenders; i++) { + commitSenders.submit( + () -> { + while (true) { + try { + ready.await(); + break; + } catch (InterruptedException ignore) { + } + } + streamingCommitLoop(); + }); + } + } + + private void streamingCommitLoop() { + Commit initialCommit = null; + while (shouldCommitWork.get()) { + if (initialCommit == null) { + try { + initialCommit = commitQueue.take(); Review Comment: // Block until we have a commit or are shutting down. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.commits; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Supplier; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; +import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey; +import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue; +import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.WorkId; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkRequest; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Streaming appliance implementation of {@link WorkCommitter}. */ +@ThreadSafe +final class StreamingApplianceWorkCommitter implements WorkCommitter { + private static final Logger LOG = LoggerFactory.getLogger(StreamingApplianceWorkCommitter.class); + private static final long TARGET_COMMIT_BUNDLE_BYTES = 32 << 20; + + private final Consumer<CommitWorkRequest> commitWork; Review Comment: some suffix to make it clearer this is funciton/consumer? makes reading code a little confusing below, there is a function of the same name. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -1396,163 +1392,36 @@ private WorkItemCommitRequest buildWorkItemTruncationRequest( return outputBuilder.build(); } - private void commitLoop() { - Map<ComputationState, Windmill.ComputationCommitWorkRequest.Builder> computationRequestMap = - new HashMap<>(); - while (running.get()) { - computationRequestMap.clear(); - Windmill.CommitWorkRequest.Builder commitRequestBuilder = - Windmill.CommitWorkRequest.newBuilder(); - long commitBytes = 0; - // Block until we have a commit, then batch with additional commits. - Commit commit = null; - try { - commit = commitQueue.take(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - continue; - } - while (commit != null) { - ComputationState computationState = commit.computationState(); - commit.work().setState(Work.State.COMMITTING); - Windmill.ComputationCommitWorkRequest.Builder computationRequestBuilder = - computationRequestMap.get(computationState); - if (computationRequestBuilder == null) { - computationRequestBuilder = commitRequestBuilder.addRequestsBuilder(); - computationRequestBuilder.setComputationId(computationState.getComputationId()); - computationRequestMap.put(computationState, computationRequestBuilder); - } - computationRequestBuilder.addRequests(commit.request()); - // Send the request if we've exceeded the bytes or there is no more - // pending work. commitBytes is a long, so this cannot overflow. - commitBytes += commit.getSize(); - if (commitBytes >= TARGET_COMMIT_BUNDLE_BYTES) { - break; - } - commit = commitQueue.poll(); - } - Windmill.CommitWorkRequest commitRequest = commitRequestBuilder.build(); - LOG.trace("Commit: {}", commitRequest); - activeCommitBytes.addAndGet(commitBytes); - windmillServer.commitWork(commitRequest); - activeCommitBytes.addAndGet(-commitBytes); - for (Map.Entry<ComputationState, Windmill.ComputationCommitWorkRequest.Builder> entry : - computationRequestMap.entrySet()) { - ComputationState computationState = entry.getKey(); - for (Windmill.WorkItemCommitRequest workRequest : entry.getValue().getRequestsList()) { - computationState.completeWorkAndScheduleNextWorkForKey( - ShardedKey.create(workRequest.getKey(), workRequest.getShardingKey()), - WorkId.builder() - .setCacheToken(workRequest.getCacheToken()) - .setWorkToken(workRequest.getWorkToken()) - .build()); - } - } - } - } - - // Adds the commit to the commitStream if it fits, returning true iff it is consumed. - private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream) { - Preconditions.checkNotNull(commit); - final ComputationState state = commit.computationState(); - final Windmill.WorkItemCommitRequest request = commit.request(); - // Drop commits for failed work. Such commits will be dropped by Windmill anyway. - if (commit.work().isFailed()) { - readerCache.invalidateReader( - WindmillComputationKey.create( - state.getComputationId(), request.getKey(), request.getShardingKey())); - stateCache - .forComputation(state.getComputationId()) - .invalidate(request.getKey(), request.getShardingKey()); - state.completeWorkAndScheduleNextWorkForKey( - ShardedKey.create(request.getKey(), request.getShardingKey()), - WorkId.builder() - .setWorkToken(request.getWorkToken()) - .setCacheToken(request.getCacheToken()) - .build()); - return true; - } - - final int size = commit.getSize(); - commit.work().setState(Work.State.COMMITTING); - activeCommitBytes.addAndGet(size); - if (commitStream.commitWorkItem( - state.getComputationId(), - request, - (Windmill.CommitStatus status) -> { - if (status != Windmill.CommitStatus.OK) { - readerCache.invalidateReader( - WindmillComputationKey.create( - state.getComputationId(), request.getKey(), request.getShardingKey())); - stateCache - .forComputation(state.getComputationId()) - .invalidate(request.getKey(), request.getShardingKey()); - } - activeCommitBytes.addAndGet(-size); - state.completeWorkAndScheduleNextWorkForKey( - ShardedKey.create(request.getKey(), request.getShardingKey()), - WorkId.builder() - .setCacheToken(request.getCacheToken()) - .setWorkToken(request.getWorkToken()) - .build()); - })) { - return true; - } else { - // Back out the stats changes since the commit wasn't consumed. - commit.work().setState(Work.State.COMMIT_QUEUED); - activeCommitBytes.addAndGet(-size); - return false; - } + private void onStreamingCommitFailed(Commit commit) { Review Comment: can we get rid of the separate failed handler? Can we just create a CompletedCommit with appropriate error status and use single onCompletedCommit handler? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
