[
https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=114436&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114436
]
ASF GitHub Bot logged work on BEAM-4145:
----------------------------------------
Author: ASF GitHub Bot
Created on: 21/Jun/18 19:39
Start Date: 21/Jun/18 19:39
Worklog Time Spent: 10m
Work Description: jkff closed pull request #5680: [BEAM-4145] Populate
the worker_id metadata in the Java SDK Harness
URL: https://github.com/apache/beam/pull/5680
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java
index 588153fd933..a849d71031b 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java
@@ -89,6 +89,7 @@ public RemoteEnvironment createEnvironment(Environment
container) throws Excepti
() -> {
try {
FnHarness.main(
+ "id",
options,
loggingServer.getApiServiceDescriptor(),
controlServer.getApiServiceDescriptor(),
diff --git
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 0d9c3b21323..cd9f8b03df7 100644
---
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -132,6 +132,7 @@ public void setup() throws Exception {
sdkHarnessExecutor.submit(
() ->
FnHarness.main(
+ "id",
PipelineOptionsFactory.create(),
loggingServer.getApiServiceDescriptor(),
controlServer.getApiServiceDescriptor(),
diff --git
a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/InProcessManagedChannelFactory.java
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/InProcessManagedChannelFactory.java
deleted file mode 100644
index e134aecc5be..00000000000
---
a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/InProcessManagedChannelFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.reference.testing;
-
-import io.grpc.ManagedChannel;
-import io.grpc.inprocess.InProcessChannelBuilder;
-import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
-import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
-
-/**
- * A {@link org.apache.beam.sdk.fn.channel.ManagedChannelFactory} that uses
in-process channels.
- *
- * <p>The channel builder uses {@link ApiServiceDescriptor#getUrl()} as the
unique in-process name.
- */
-public class InProcessManagedChannelFactory extends ManagedChannelFactory {
-
- @Override
- public ManagedChannel forDescriptor(ApiServiceDescriptor
apiServiceDescriptor) {
- return
InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
- }
-}
diff --git a/sdks/go/pkg/beam/util/grpcx/metadata.go
b/sdks/go/pkg/beam/util/grpcx/metadata.go
index eed51ede439..0d77d62a8f3 100644
--- a/sdks/go/pkg/beam/util/grpcx/metadata.go
+++ b/sdks/go/pkg/beam/util/grpcx/metadata.go
@@ -24,7 +24,7 @@ import (
"google.golang.org/grpc/metadata"
)
-const idKey = "id"
+const idKey = "worker_id"
// ReadWorkerID reads the worker ID from an incoming gRPC request context.
func ReadWorkerID(ctx context.Context) (string, error) {
diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go
index 1c80e0bab94..ad7a35d2972 100644
--- a/sdks/java/container/boot.go
+++ b/sdks/java/container/boot.go
@@ -92,6 +92,7 @@ func main() {
// (3) Invoke the Java harness, preserving artifact ordering in
classpath.
+ os.Setenv("HARNESS_ID", *id)
os.Setenv("PIPELINE_OPTIONS", options)
os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR",
proto.MarshalTextString(&pb.ApiServiceDescriptor{Url: *loggingEndpoint}))
os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR",
proto.MarshalTextString(&pb.ApiServiceDescriptor{Url: *controlEndpoint}))
diff --git
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java
index 0a4a35d58f1..57d2c68e363 100644
---
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java
+++
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.fn.channel;
+import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.NettyChannelBuilder;
@@ -26,11 +27,10 @@
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.unix.DomainSocketAddress;
import java.net.SocketAddress;
+import java.util.List;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
-/**
- * A Factory which creates an underlying {@link ManagedChannel} implementation.
- */
+/** A Factory which creates an underlying {@link ManagedChannel}
implementation. */
public abstract class ManagedChannelFactory {
public static ManagedChannelFactory createDefault() {
return new Default();
@@ -41,7 +41,20 @@ public static ManagedChannelFactory createEpoll() {
return new Epoll();
}
- public abstract ManagedChannel forDescriptor(ApiServiceDescriptor
apiServiceDescriptor);
+ public ManagedChannel forDescriptor(ApiServiceDescriptor
apiServiceDescriptor) {
+ return builderFor(apiServiceDescriptor).build();
+ }
+
+ /** Create a {@link ManagedChannelBuilder} for the provided {@link
ApiServiceDescriptor}. */
+ protected abstract ManagedChannelBuilder<?> builderFor(ApiServiceDescriptor
descriptor);
+
+ /**
+ * Returns a {@link ManagedChannelFactory} like this one, but which will
apply the provided {@link
+ * ClientInterceptor ClientInterceptors} to any channel it creates.
+ */
+ public ManagedChannelFactory withInterceptors(List<ClientInterceptor>
interceptors) {
+ return new InterceptedManagedChannelFactory(this, interceptors);
+ }
/**
* Creates a {@link ManagedChannel} backed by an {@link
EpollDomainSocketChannel} if the address
@@ -50,17 +63,18 @@ public static ManagedChannelFactory createEpoll() {
*/
private static class Epoll extends ManagedChannelFactory {
@Override
- public ManagedChannel forDescriptor(ApiServiceDescriptor
apiServiceDescriptor) {
+ public ManagedChannelBuilder<?> builderFor(ApiServiceDescriptor
apiServiceDescriptor) {
SocketAddress address =
SocketAddressFactory.createFrom(apiServiceDescriptor.getUrl());
return NettyChannelBuilder.forAddress(address)
- .channelType(address instanceof DomainSocketAddress
- ? EpollDomainSocketChannel.class : EpollSocketChannel.class)
+ .channelType(
+ address instanceof DomainSocketAddress
+ ? EpollDomainSocketChannel.class
+ : EpollSocketChannel.class)
.eventLoopGroup(new EpollEventLoopGroup())
.usePlaintext(true)
// Set the message size to max value here. The actual size is
governed by the
// buffer size in the layers above.
- .maxInboundMessageSize(Integer.MAX_VALUE)
- .build();
+ .maxInboundMessageSize(Integer.MAX_VALUE);
}
}
@@ -70,13 +84,38 @@ public ManagedChannel forDescriptor(ApiServiceDescriptor
apiServiceDescriptor) {
*/
private static class Default extends ManagedChannelFactory {
@Override
- public ManagedChannel forDescriptor(ApiServiceDescriptor
apiServiceDescriptor) {
+ public ManagedChannelBuilder<?> builderFor(ApiServiceDescriptor
apiServiceDescriptor) {
return ManagedChannelBuilder.forTarget(apiServiceDescriptor.getUrl())
.usePlaintext(true)
// Set the message size to max value here. The actual size is
governed by the
// buffer size in the layers above.
- .maxInboundMessageSize(Integer.MAX_VALUE)
- .build();
+ .maxInboundMessageSize(Integer.MAX_VALUE);
+ }
+ }
+
+ private static class InterceptedManagedChannelFactory extends
ManagedChannelFactory {
+ private final ManagedChannelFactory channelFactory;
+ private final List<ClientInterceptor> interceptors;
+
+ private InterceptedManagedChannelFactory(
+ ManagedChannelFactory managedChannelFactory, List<ClientInterceptor>
interceptors) {
+ this.channelFactory = managedChannelFactory;
+ this.interceptors = interceptors;
+ }
+
+ @Override
+ public ManagedChannel forDescriptor(ApiServiceDescriptor
apiServiceDescriptor) {
+ return builderFor(apiServiceDescriptor).intercept(interceptors).build();
+ }
+
+ @Override
+ protected ManagedChannelBuilder<?> builderFor(ApiServiceDescriptor
descriptor) {
+ return channelFactory.builderFor(descriptor);
+ }
+
+ @Override
+ public ManagedChannelFactory withInterceptors(List<ClientInterceptor>
interceptors) {
+ return new InterceptedManagedChannelFactory(channelFactory,
interceptors);
}
}
}
diff --git
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/test/InProcessManagedChannelFactory.java
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/test/InProcessManagedChannelFactory.java
index 787047b7f6e..6e0d87fe907 100644
---
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/test/InProcessManagedChannelFactory.java
+++
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/test/InProcessManagedChannelFactory.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.sdk.fn.test;
-import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
import io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
@@ -35,7 +35,7 @@ public static ManagedChannelFactory create() {
private InProcessManagedChannelFactory() {}
@Override
- public ManagedChannel forDescriptor(ApiServiceDescriptor
apiServiceDescriptor) {
- return
InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
+ public ManagedChannelBuilder<?> builderFor(ApiServiceDescriptor
apiServiceDescriptor) {
+ return InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl());
}
}
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index b6e81a735a4..aa6a6fb3089 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -50,18 +50,21 @@
* Main entry point into the Beam SDK Fn Harness for Java.
*
* <p>This entry point expects the following environment variables:
+ *
* <ul>
- * <li>LOGGING_API_SERVICE_DESCRIPTOR: A
- * {@link org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor}
encoded as text
- * representing the endpoint that is to be connected to for the Beam Fn
Logging service.</li>
- * <li>CONTROL_API_SERVICE_DESCRIPTOR: A
- * {@link Endpoints.ApiServiceDescriptor} encoded as text
- * representing the endpoint that is to be connected to for the Beam Fn
Control service.</li>
+ * <li>HARNESS_ID: A String representing the ID of this FnHarness. This will
be added to the
+ * headers of calls to the Beam Control Service
+ * <li>LOGGING_API_SERVICE_DESCRIPTOR: A {@link
+ * org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor}
encoded as text
+ * representing the endpoint that is to be connected to for the Beam Fn
Logging service.
+ * <li>CONTROL_API_SERVICE_DESCRIPTOR: A {@link
Endpoints.ApiServiceDescriptor} encoded as text
+ * representing the endpoint that is to be connected to for the Beam Fn
Control service.
* <li>PIPELINE_OPTIONS: A serialized form of {@link PipelineOptions}. See
{@link PipelineOptions}
- * for further details.</li>
+ * for further details.
* </ul>
*/
public class FnHarness {
+ private static final String HARNESS_ID = "HARNESS_ID";
private static final String CONTROL_API_SERVICE_DESCRIPTOR =
"CONTROL_API_SERVICE_DESCRIPTOR";
private static final String LOGGING_API_SERVICE_DESCRIPTOR =
"LOGGING_API_SERVICE_DESCRIPTOR";
private static final String PIPELINE_OPTIONS = "PIPELINE_OPTIONS";
@@ -77,14 +80,17 @@
public static void main(String[] args) throws Exception {
System.out.format("SDK Fn Harness started%n");
+ System.out.format("Harness ID %s%n", System.getenv(HARNESS_ID));
System.out.format("Logging location %s%n",
System.getenv(LOGGING_API_SERVICE_DESCRIPTOR));
System.out.format("Control location %s%n",
System.getenv(CONTROL_API_SERVICE_DESCRIPTOR));
System.out.format("Pipeline options %s%n",
System.getenv(PIPELINE_OPTIONS));
- ObjectMapper objectMapper = new ObjectMapper().registerModules(
- ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
- PipelineOptions options = objectMapper.readValue(
- System.getenv(PIPELINE_OPTIONS), PipelineOptions.class);
+ String id = System.getenv(HARNESS_ID);
+ ObjectMapper objectMapper =
+ new ObjectMapper()
+
.registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+ PipelineOptions options =
+ objectMapper.readValue(System.getenv(PIPELINE_OPTIONS),
PipelineOptions.class);
Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor =
getApiServiceDescriptor(LOGGING_API_SERVICE_DESCRIPTOR);
@@ -92,12 +98,15 @@ public static void main(String[] args) throws Exception {
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor =
getApiServiceDescriptor(CONTROL_API_SERVICE_DESCRIPTOR);
- main(options, loggingApiServiceDescriptor, controlApiServiceDescriptor);
+ main(id, options, loggingApiServiceDescriptor,
controlApiServiceDescriptor);
}
- public static void main(PipelineOptions options,
+ public static void main(
+ String id,
+ PipelineOptions options,
Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor,
- Endpoints.ApiServiceDescriptor controlApiServiceDescriptor) throws
Exception {
+ Endpoints.ApiServiceDescriptor controlApiServiceDescriptor)
+ throws Exception {
ManagedChannelFactory channelFactory;
List<String> experiments =
options.as(ExperimentalOptions.class).getExperiments();
if (experiments != null && experiments.contains("beam_fn_api_epoll")) {
@@ -108,6 +117,7 @@ public static void main(PipelineOptions options,
OutboundObserverFactory outboundObserverFactory =
HarnessStreamObserverFactories.fromOptions(options);
main(
+ id,
options,
loggingApiServiceDescriptor,
controlApiServiceDescriptor,
@@ -116,47 +126,46 @@ public static void main(PipelineOptions options,
}
public static void main(
+ String id,
PipelineOptions options,
Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor,
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor,
ManagedChannelFactory channelFactory,
OutboundObserverFactory outboundObserverFactory) {
IdGenerator idGenerator = IdGenerators.decrementingLongs();
- try (BeamFnLoggingClient logging = new BeamFnLoggingClient(
- options,
- loggingApiServiceDescriptor,
- channelFactory::forDescriptor)) {
+ // The logging client variable is not used per se, but during its lifetime
(until close()) it
+ // intercepts logging and sends it to the logging service.
+ try (BeamFnLoggingClient logging =
+ new BeamFnLoggingClient(
+ options, loggingApiServiceDescriptor,
channelFactory::forDescriptor)) {
LOG.info("Fn Harness started");
// Register standard file systems.
FileSystems.setDefaultPipelineOptions(options);
- EnumMap<BeamFnApi.InstructionRequest.RequestCase,
+ EnumMap<
+ BeamFnApi.InstructionRequest.RequestCase,
ThrowingFunction<InstructionRequest, Builder>>
handlers = new
EnumMap<>(BeamFnApi.InstructionRequest.RequestCase.class);
RegisterHandler fnApiRegistry = new RegisterHandler();
- BeamFnDataGrpcClient beamFnDataMultiplexer = new BeamFnDataGrpcClient(
- options, channelFactory::forDescriptor, outboundObserverFactory);
+ BeamFnDataGrpcClient beamFnDataMultiplexer =
+ new BeamFnDataGrpcClient(
+ options, channelFactory::forDescriptor, outboundObserverFactory);
BeamFnStateGrpcClientCache beamFnStateGrpcClientCache =
new BeamFnStateGrpcClientCache(
idGenerator, channelFactory::forDescriptor,
outboundObserverFactory);
- ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(
- options,
- fnApiRegistry::getById,
- beamFnDataMultiplexer,
- beamFnStateGrpcClientCache);
- handlers.put(BeamFnApi.InstructionRequest.RequestCase.REGISTER,
- fnApiRegistry::register);
- handlers.put(BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE,
+ ProcessBundleHandler processBundleHandler =
+ new ProcessBundleHandler(
+ options, fnApiRegistry::getById, beamFnDataMultiplexer,
beamFnStateGrpcClientCache);
+ handlers.put(BeamFnApi.InstructionRequest.RequestCase.REGISTER,
fnApiRegistry::register);
+ handlers.put(
+ BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE,
processBundleHandler::processBundle);
BeamFnControlClient control =
new BeamFnControlClient(
- controlApiServiceDescriptor,
- channelFactory::forDescriptor,
- outboundObserverFactory,
- handlers);
+ id, controlApiServiceDescriptor, channelFactory,
outboundObserverFactory, handlers);
LOG.info("Entering instruction processing loop");
control.processInstructionRequests(options.as(GcsOptions.class).getExecutorService());
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/AddHarnessIdInterceptor.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/AddHarnessIdInterceptor.java
new file mode 100644
index 00000000000..8a215607171
--- /dev/null
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/AddHarnessIdInterceptor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.beam.fn.harness.control;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import io.grpc.ClientInterceptor;
+import io.grpc.Metadata;
+import io.grpc.Metadata.Key;
+import io.grpc.stub.MetadataUtils;
+
+/** A {@link ClientInterceptor} that attaches a provided SDK Harness ID to
outgoing messages. */
+public class AddHarnessIdInterceptor {
+ private static final Key<String> ID_KEY = Key.of("worker_id",
Metadata.ASCII_STRING_MARSHALLER);
+
+ public static ClientInterceptor create(String harnessId) {
+ checkArgument(harnessId != null, "harnessId must not be null");
+ Metadata md = new Metadata();
+ md.put(ID_KEY, harnessId);
+ return MetadataUtils.newAttachHeadersInterceptor(md);
+ }
+
+ // This is implemented via MetadataUtils, so we never actually create an
instance of this class
+ private AddHarnessIdInterceptor() {}
+}
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
index d0df30900d1..cfd8ff88fbe 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
@@ -20,8 +20,8 @@
import static com.google.common.base.Throwables.getStackTraceAsString;
+import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Uninterruptibles;
-import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.EnumMap;
@@ -30,10 +30,10 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingDeque;
-import java.util.function.Function;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
-import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.function.ThrowingFunction;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.slf4j.Logger;
@@ -67,8 +67,9 @@
private final CompletableFuture<Object> onFinish;
public BeamFnControlClient(
- Endpoints.ApiServiceDescriptor apiServiceDescriptor,
- Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory,
+ String id,
+ ApiServiceDescriptor apiServiceDescriptor,
+ ManagedChannelFactory channelFactory,
OutboundObserverFactory outboundObserverFactory,
EnumMap<
BeamFnApi.InstructionRequest.RequestCase,
@@ -77,7 +78,11 @@ public BeamFnControlClient(
this.bufferedInstructions = new LinkedBlockingDeque<>();
this.outboundObserver =
outboundObserverFactory.outboundObserverFor(
-
BeamFnControlGrpc.newStub(channelFactory.apply(apiServiceDescriptor))::control,
+ BeamFnControlGrpc.newStub(
+ channelFactory
+
.withInterceptors(ImmutableList.of(AddHarnessIdInterceptor.create(id)))
+ .forDescriptor(apiServiceDescriptor))
+ ::control,
new InboundObserver());
this.handlers = handlers;
this.onFinish = new CompletableFuture<>();
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
index 0e68f6d8dd1..8dc43e56727 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
@@ -118,7 +118,7 @@ public void testLaunchFnHarnessAndTeardownCleanly() throws
Exception {
.setUrl("localhost:" + controlServer.getPort())
.build();
- FnHarness.main(options, loggingDescriptor, controlDescriptor);
+ FnHarness.main("id", options, loggingDescriptor, controlDescriptor);
assertThat(instructionResponses, contains(INSTRUCTION_RESPONSE));
} finally {
controlServer.shutdownNow();
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
index 4f465647f73..4de6f9e4aeb 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
@@ -27,9 +27,7 @@
import static org.junit.Assert.fail;
import com.google.common.util.concurrent.Uninterruptibles;
-import io.grpc.ManagedChannel;
import io.grpc.Server;
-import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
@@ -49,6 +47,7 @@
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.function.ThrowingFunction;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -117,9 +116,6 @@ public void testDelegation() throws Exception {
.build();
server.start();
try {
- ManagedChannel channel =
-
InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
-
EnumMap<
BeamFnApi.InstructionRequest.RequestCase,
ThrowingFunction<BeamFnApi.InstructionRequest,
BeamFnApi.InstructionResponse.Builder>>
@@ -137,8 +133,9 @@ public void testDelegation() throws Exception {
BeamFnControlClient client =
new BeamFnControlClient(
+ "",
apiServiceDescriptor,
- (Endpoints.ApiServiceDescriptor descriptor) -> channel,
+ InProcessManagedChannelFactory.create(),
OutboundObserverFactory.trivial(),
handlers);
@@ -204,9 +201,6 @@ public void testJavaErrorResponse() throws Exception {
.build();
server.start();
try {
- ManagedChannel channel =
-
InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
-
EnumMap<
BeamFnApi.InstructionRequest.RequestCase,
ThrowingFunction<BeamFnApi.InstructionRequest,
BeamFnApi.InstructionResponse.Builder>>
@@ -219,8 +213,9 @@ public void testJavaErrorResponse() throws Exception {
BeamFnControlClient client =
new BeamFnControlClient(
+ "",
apiServiceDescriptor,
- (Endpoints.ApiServiceDescriptor descriptor) -> channel,
+ InProcessManagedChannelFactory.create(),
OutboundObserverFactory.trivial(),
handlers);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 114436)
Time Spent: 4h 50m (was: 4h 40m)
> Java SDK Harness populates control request headers with worker id
> -----------------------------------------------------------------
>
> Key: BEAM-4145
> URL: https://issues.apache.org/jira/browse/BEAM-4145
> Project: Beam
> Issue Type: New Feature
> Components: sdk-java-harness
> Reporter: Ben Sidhom
> Assignee: Eugene Kirpichov
> Priority: Minor
> Fix For: 2.6.0
>
> Time Spent: 4h 50m
> Remaining Estimate: 0h
>
> Runner code needs to be able to identify incoming harness connections by the
> worker ids that it assigns to them on creation. This is currently done by the
> go boot code when the harness runs in a docker container. However, in-process
> harnesses never specify worker ids. This prevents in-process harnesses from
> being multiplexed by a runner (most likely the ULR and test code).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)