scwhittle commented on code in PR #28428: URL: https://github.com/apache/beam/pull/28428#discussion_r1328633216
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/connectionscache/ReadOnlyWindmillConnectionsCache.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.connectionscache; + +import java.util.Optional; +import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillApplianceGrpc; + +/** Read only interface for {@link WindmillConnectionsCache}. */ +public interface ReadOnlyWindmillConnectionsCache { + + /** + * Used by GetDataStream and CommitWorkStream calls inorder to make sure Windmill API calls get + * routed to the same workers that GetWork was initiated on. If no {@link WindmillConnection} is + * found, it means that the windmill worker working on the key range has moved on (ranges have + * moved to other workers, worker crash, etc.). + * + * @see <a + * href=https://medium.com/google-cloud/streaming-engine-execution-model-1eb2eef69a8e>Windmill + * API</a> + */ + Optional<WindmillConnection> getWindmillWorkerConnection(WindmillConnectionCacheToken token); + + /** + * Used by GetWorkStream to initiate the Windmill API calls and fetch Work items. + * + * @see <a + * href=https://medium.com/google-cloud/streaming-engine-execution-model-1eb2eef69a8e>Windmill + * API</a> + */ + WindmillConnection getNewWindmillWorkerConnection(); + + CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub getDispatcherStub(); + + Optional<CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub> getGlobalDataStub( + Windmill.GlobalDataId globalDataId); + + Optional<WindmillApplianceGrpc.WindmillApplianceBlockingStub> getWindmillApplianceStub(); Review Comment: does this need to be part of this interface? Seems cleaner to keep appliance stuff separate. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStream.java: ########## @@ -51,6 +52,30 @@ void receiveWork( Windmill.WorkItem workItem, Collection<Windmill.LatencyAttribution> getWorkStreamLatencies); } + + @AutoValue + abstract class GetWorkBudget { + public static GetWorkBudget.Builder builder() { + return new AutoValue_WindmillStream_GetWorkStream_GetWorkBudget.Builder(); + } + + public GetWorkBudget consumeBudgetUpdate(long bytes, long items) { Review Comment: consume made me think it was going to subtract how about just increment or add? Doesn't really matter where it came from ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/connectionscache/WindmillChannelFactory.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.connectionscache; + +import java.net.Inet6Address; +import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; +import javax.net.ssl.SSLException; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessChannelBuilder; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.GrpcSslContexts; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NegotiationType; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NettyChannelBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; + +/** Utilities for creating {@link Channel} for gRPC stubs. */ +final class WindmillChannelFactory { + static final String LOCALHOST = "localhost"; + private static final int DEFAULT_GRPC_PORT = 443; + + private WindmillChannelFactory() {} + + static Channel inProcessChannel(String channelName) { + return InProcessChannelBuilder.forName(channelName).directExecutor().build(); + } + + static Channel localhostChannel(int port) { + return NettyChannelBuilder.forAddress(LOCALHOST, port) + .maxInboundMessageSize(Integer.MAX_VALUE) + .negotiationType(NegotiationType.PLAINTEXT) + .build(); + } + + static Channel remoteChannel( + WindmillServiceAddress windmillServiceAddress, int windmillServiceRpcChannelTimeoutSec) { Review Comment: windmillServiceRpcChannelTimeoutSec is a little unclear (ie could be entire channel lifetime, connect timeout etc) how about channelKeepAliveTimeoutSec? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/connectionscache/ReadOnlyWindmillConnectionsCache.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.connectionscache; + +import java.util.Optional; +import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillApplianceGrpc; + +/** Read only interface for {@link WindmillConnectionsCache}. */ +public interface ReadOnlyWindmillConnectionsCache { + + /** + * Used by GetDataStream and CommitWorkStream calls inorder to make sure Windmill API calls get + * routed to the same workers that GetWork was initiated on. If no {@link WindmillConnection} is + * found, it means that the windmill worker working on the key range has moved on (ranges have + * moved to other workers, worker crash, etc.). + * + * @see <a + * href=https://medium.com/google-cloud/streaming-engine-execution-model-1eb2eef69a8e>Windmill + * API</a> + */ + Optional<WindmillConnection> getWindmillWorkerConnection(WindmillConnectionCacheToken token); + + /** + * Used by GetWorkStream to initiate the Windmill API calls and fetch Work items. + * + * @see <a + * href=https://medium.com/google-cloud/streaming-engine-execution-model-1eb2eef69a8e>Windmill + * API</a> + */ + WindmillConnection getNewWindmillWorkerConnection(); Review Comment: In the direct user-worker to windmill worker mode, there is a set of endpoints that get work needs to be sent to not just 1. With the multiple endpoints, we need to think through when we find out about them. One mechanism would be to have the existing interface and poll it periodically. But that introduces latency so we might want to move to where this connections cache either notifies something else via a Consumer etc of endpoints deltas or contains the streams itself. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/connectionscache/WindmillConnectionsCache.java: ########## @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.connectionscache; + +import static org.apache.beam.runners.dataflow.worker.windmill.connectionscache.WindmillChannelFactory.LOCALHOST; +import static org.apache.beam.runners.dataflow.worker.windmill.connectionscache.WindmillChannelFactory.localhostChannel; +import static org.apache.beam.runners.dataflow.worker.windmill.connectionscache.WindmillChannelFactory.remoteChannel; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap; + +import com.google.auto.value.AutoValue; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc; +import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillApplianceGrpc; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ThreadSafe +final class WindmillConnectionsCache implements ReadWriteWindmillConnectionsCache { + private static final Logger LOG = LoggerFactory.getLogger(WindmillConnectionsCache.class); + + private final Object dispatcherConnectionLock; + + @GuardedBy("dispatcherConnectionLock") + private final List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs; + + @GuardedBy("dispatcherConnectionLock") + private final Set<HostAndPort> dispatcherEndpoints; + + @GuardedBy("dispatcherConnectionLock") + private final Random rand; + + /** + * Writes are guarded by synchronization on "this". Reads are done grabbing the {@link Review Comment: is it guarded by synchronization on dispatcherConnectionLock? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/connectionscache/WindmillConnectionCacheToken.java: ########## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.connectionscache; + +import com.google.auto.value.AutoValue; +import java.util.UUID; + +@AutoValue +public abstract class WindmillConnectionCacheToken { Review Comment: Do we need the tokens for Java? With garbage collection perhaps we can just hand out per-endpoint objects to associate with work items. Then we don't have to go back to the cache to lookup based upon the token for subsequent getdata/commit , we just have the endpoint object itself. The cache can keep the same endpoint objects as metadata updates come in if the endpoint remains. If the endpoint is removed, we could note that in the object and subsequent calls using it could fail fast. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/connectionscache/VendoredCredentialsAdapter.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.connectionscache; + +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; +import javax.annotation.Nullable; +import org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.Credentials; +import org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.RequestMetadataCallback; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; + +/** + * Create a wrapper around credentials that delegates to the underlying {@link + * com.google.auth.Credentials}. Note that this class should override every method that is not final + * and not static and call the delegate directly. + * + * <p>TODO: Replace this with an auto generated proxy which calls the underlying implementation Review Comment: I think it means some sort of annotation to generate a proxy class. So that if new methods are added to the Credentials, this doesn't have to be updated. That said, I don't see why this class is needed at all from just looking here. It doesn't seem to modify any behavior, just wraps and forwards on to the underlying. You could try removing it and see if it works (can be separate PR) or look through the history more, perhaps it used to do something and should have been removed when that functionality changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
