This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 57aa9ef  [BEAM-7676] uniquely identify SDK workers by the factory that 
created them
     new 45e07fb  Merge pull request #8990 from ibzib/worker-id
57aa9ef is described below

commit 57aa9ef5bbf32f4803e2b53b81c9b0a731479d66
Author: Kyle Weaver <[email protected]>
AuthorDate: Tue Jul 2 14:05:25 2019 -0700

    [BEAM-7676] uniquely identify SDK workers by the factory that created them
---
 .../beam/runners/fnexecution/control/DefaultJobBundleFactory.java   | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
index 5277423..e37794c 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
@@ -81,7 +81,9 @@ import org.slf4j.LoggerFactory;
 @ThreadSafe
 public class DefaultJobBundleFactory implements JobBundleFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultJobBundleFactory.class);
+  private static final IdGenerator factoryIdGenerator = 
IdGenerators.incrementingLongs();
 
+  private final String factoryId = factoryIdGenerator.getId();
   private final LoadingCache<Environment, WrappedSdkHarnessClient> 
environmentCache;
   private final Map<String, EnvironmentFactory.Provider> 
environmentFactoryProviderMap;
   private final ExecutorService executor;
@@ -112,11 +114,11 @@ public class DefaultJobBundleFactory implements 
JobBundleFactory {
 
   DefaultJobBundleFactory(
       JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> 
environmentFactoryMap) {
-    IdGenerator stageIdGenerator = IdGenerators.incrementingLongs();
+    IdGenerator stageIdSuffixGenerator = IdGenerators.incrementingLongs();
     this.environmentFactoryProviderMap = environmentFactoryMap;
     this.executor = Executors.newCachedThreadPool();
     this.clientPool = MapControlClientPool.create();
-    this.stageIdGenerator = stageIdGenerator;
+    this.stageIdGenerator = () -> factoryId + "-" + 
stageIdSuffixGenerator.getId();
     this.environmentExpirationMillis = getEnvironmentExpirationMillis(jobInfo);
     this.environmentCache =
         createEnvironmentCache(serverFactory -> createServerInfo(jobInfo, 
serverFactory));

Reply via email to