This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 57aa9ef [BEAM-7676] uniquely identify SDK workers by the factory that
created them
new 45e07fb Merge pull request #8990 from ibzib/worker-id
57aa9ef is described below
commit 57aa9ef5bbf32f4803e2b53b81c9b0a731479d66
Author: Kyle Weaver <[email protected]>
AuthorDate: Tue Jul 2 14:05:25 2019 -0700
[BEAM-7676] uniquely identify SDK workers by the factory that created them
---
.../beam/runners/fnexecution/control/DefaultJobBundleFactory.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
index 5277423..e37794c 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
@@ -81,7 +81,9 @@ import org.slf4j.LoggerFactory;
@ThreadSafe
public class DefaultJobBundleFactory implements JobBundleFactory {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultJobBundleFactory.class);
+ private static final IdGenerator factoryIdGenerator =
IdGenerators.incrementingLongs();
+ private final String factoryId = factoryIdGenerator.getId();
private final LoadingCache<Environment, WrappedSdkHarnessClient>
environmentCache;
private final Map<String, EnvironmentFactory.Provider>
environmentFactoryProviderMap;
private final ExecutorService executor;
@@ -112,11 +114,11 @@ public class DefaultJobBundleFactory implements
JobBundleFactory {
DefaultJobBundleFactory(
JobInfo jobInfo, Map<String, EnvironmentFactory.Provider>
environmentFactoryMap) {
- IdGenerator stageIdGenerator = IdGenerators.incrementingLongs();
+ IdGenerator stageIdSuffixGenerator = IdGenerators.incrementingLongs();
this.environmentFactoryProviderMap = environmentFactoryMap;
this.executor = Executors.newCachedThreadPool();
this.clientPool = MapControlClientPool.create();
- this.stageIdGenerator = stageIdGenerator;
+ this.stageIdGenerator = () -> factoryId + "-" +
stageIdSuffixGenerator.getId();
this.environmentExpirationMillis = getEnvironmentExpirationMillis(jobInfo);
this.environmentCache =
createEnvironmentCache(serverFactory -> createServerInfo(jobInfo,
serverFactory));