[ 
https://issues.apache.org/jira/browse/BEAM-6180?focusedWorklogId=172117&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172117
 ]

ASF GitHub Bot logged work on BEAM-6180:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Dec/18 00:44
            Start Date: 05/Dec/18 00:44
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #7201: [BEAM-6180] Make 
dataflow runner harness use IdGenerator from fnexecution
URL: https://github.com/apache/beam/pull/7201
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
index 2666dea61115..1433702f714d 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
@@ -36,7 +36,6 @@
 import 
org.apache.beam.runners.dataflow.worker.SdkHarnessRegistry.SdkWorkerHarness;
 import 
org.apache.beam.runners.dataflow.worker.apiary.FixMultiOutputInfosOnParDoInstructions;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
-import org.apache.beam.runners.dataflow.worker.fn.IdGenerator;
 import 
org.apache.beam.runners.dataflow.worker.graph.CloneAmbiguousFlattensFunction;
 import 
org.apache.beam.runners.dataflow.worker.graph.CreateRegisterFnOperationFunction;
 import 
org.apache.beam.runners.dataflow.worker.graph.DeduceFlattenLocationsFunction;
@@ -53,6 +52,8 @@
 import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
 import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.util.Weighted;
 import org.apache.beam.sdk.util.WeightedValue;
 import org.slf4j.Logger;
@@ -80,6 +81,9 @@
   /** The factory to create {@link DataflowMapTaskExecutor 
DataflowMapTaskExecutors}. */
   private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory;
 
+  /** The idGenerator to generate unique id globally. */
+  private static final IdGenerator idGenerator = 
IdGenerators.decrementingLongs();
+
   /**
    * Function which converts map tasks to their network representation for 
execution.
    *
@@ -91,7 +95,7 @@
    * </ul>
    */
   private static final Function<MapTask, MutableNetwork<Node, Edge>> 
mapTaskToBaseNetwork =
-      new FixMultiOutputInfosOnParDoInstructions(IdGenerator::generate)
+      new FixMultiOutputInfosOnParDoInstructions(idGenerator)
           .andThen(new MapTaskToNetworkFunction());
 
   /** Registry of known {@link ReaderFactory ReaderFactories}. */
@@ -217,18 +221,14 @@ protected BatchDataflowWorker(
       Function<MutableNetwork<Node, Edge>, Node> sdkFusedStage =
           pipeline == null
               ? RegisterNodeFunction.withoutPipeline(
-                  IdGenerator::generate, 
sdkHarnessRegistry.beamFnStateApiServiceDescriptor())
+                  idGenerator, 
sdkHarnessRegistry.beamFnStateApiServiceDescriptor())
               : RegisterNodeFunction.forPipeline(
-                  pipeline,
-                  IdGenerator::generate,
-                  sdkHarnessRegistry.beamFnStateApiServiceDescriptor());
+                  pipeline, idGenerator, 
sdkHarnessRegistry.beamFnStateApiServiceDescriptor());
       Function<MutableNetwork<Node, Edge>, MutableNetwork<Node, Edge>> 
lengthPrefixUnknownCoders =
           LengthPrefixUnknownCoders::forSdkNetwork;
       Function<MutableNetwork<Node, Edge>, MutableNetwork<Node, Edge>> 
transformToRunnerNetwork =
           new CreateRegisterFnOperationFunction(
-              IdGenerator::generate,
-              this::createPortNode,
-              lengthPrefixUnknownCoders.andThen(sdkFusedStage));
+              idGenerator, this::createPortNode, 
lengthPrefixUnknownCoders.andThen(sdkFusedStage));
 
       mapTaskToNetwork =
           mapTaskToBaseNetwork
@@ -268,8 +268,8 @@ private Node createPortNode(String predecessorId, String 
successorId) {
         RemoteGrpcPort.newBuilder()
             
.setApiServiceDescriptor(sdkHarnessRegistry.beamFnDataApiServiceDescriptor())
             .build(),
-        IdGenerator.generate(),
-        IdGenerator.generate(),
+        idGenerator.getId(),
+        idGenerator.getId(),
         predecessorId,
         successorId);
   }
@@ -346,7 +346,7 @@ boolean doWork(WorkItem workItem, WorkItemStatusClient 
workItemStatusClient) thr
                 sinkRegistry,
                 executionContext,
                 counterSet,
-                IdGenerator::generate);
+                idGenerator);
       } else if (workItem.getSourceOperationTask() != null) {
         worker =
             SourceOperationExecutorFactory.create(
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
index 8e191e2654a4..a76f326f33b4 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
@@ -40,7 +40,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
-import java.util.function.Supplier;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.runners.core.ElementByteSizeObservable;
@@ -90,6 +89,7 @@
 import org.apache.beam.runners.fnexecution.state.StateDelegator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -128,7 +128,7 @@ public DataflowMapTaskExecutor create(
       SinkFactory sinkFactory,
       DataflowExecutionContext<?> executionContext,
       CounterSet counterSet,
-      Supplier<String> idGenerator) {
+      IdGenerator idGenerator) {
 
     // TODO: remove this once we trust the code paths
     checkArgument(
@@ -204,7 +204,7 @@ public DataflowMapTaskExecutor create(
 
   private Function<Node, Node> 
createOperationTransformForFetchAndFilterStreamingSideInputNodes(
       MutableNetwork<Node, Edge> network,
-      Supplier<String> idGenerator,
+      IdGenerator idGenerator,
       InstructionRequestHandler instructionRequestHandler,
       FnDataService beamFnDataService,
       Endpoints.ApiServiceDescriptor dataApiServiceDescriptor,
@@ -303,7 +303,7 @@ public Node typedApply(RemoteGrpcPortNode input) {
   }
 
   private Function<Node, Node> createOperationTransformForRegisterFnNodes(
-      final Supplier<String> idGenerator,
+      final IdGenerator idGenerator,
       final InstructionRequestHandler instructionRequestHandler,
       final StateDelegator beamFnStateDelegator,
       final String stageName,
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutorFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutorFactory.java
index b6bcb5a5f51a..4e6e5a11eb08 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutorFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutorFactory.java
@@ -19,7 +19,6 @@
 
 import com.google.api.services.dataflow.model.MapTask;
 import com.google.common.graph.MutableNetwork;
-import java.util.function.Supplier;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
 import org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService;
@@ -28,6 +27,7 @@
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
 import org.apache.beam.runners.fnexecution.state.GrpcStateService;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 /** Creates a {@link DataflowMapTaskExecutor} from a {@link MapTask} 
definition. */
@@ -49,5 +49,5 @@ DataflowMapTaskExecutor create(
       SinkFactory sinkFactory,
       DataflowExecutionContext<?> executionContext,
       CounterSet counterSet,
-      Supplier<String> idGenerator);
+      IdGenerator idGenerator);
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/FetchAndFilterStreamingSideInputsOperation.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/FetchAndFilterStreamingSideInputsOperation.java
index aef6e2e26c59..f65e8308192e 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/FetchAndFilterStreamingSideInputsOperation.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/FetchAndFilterStreamingSideInputsOperation.java
@@ -26,7 +26,6 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
@@ -36,6 +35,7 @@
 import org.apache.beam.runners.fnexecution.data.FnDataService;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -71,7 +71,7 @@ public FetchAndFilterStreamingSideInputsOperation(
       InstructionRequestHandler instructionRequestHandler,
       FnDataService beamFnDataService,
       ApiServiceDescriptor dataServiceApiServiceDescriptor,
-      Supplier<String> idGenerator,
+      IdGenerator idGenerator,
       Coder<WindowedValue<T>> inputCoder,
       WindowingStrategy<?, W> windowingStrategy,
       DataflowExecutionContext.DataflowStepContext stepContext,
@@ -162,7 +162,7 @@ public void finishSpecifying(PInput upstreamInput, 
PTransform<?, ?> upstreamTran
   }
 
   private Iterable<PCollectionView<?>> 
buildPCollectionViewsWithSdkSupportedWindowMappingFn(
-      Supplier<String> idGenerator,
+      IdGenerator idGenerator,
       InstructionRequestHandler instructionRequestHandler,
       FnDataService beamFnDataService,
       ApiServiceDescriptor dataServiceApiServiceDescriptor,
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFn.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFn.java
index ff798c0346a5..71f5bedbae37 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFn.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFn.java
@@ -27,7 +27,6 @@
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
-import java.util.function.Supplier;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
@@ -48,6 +47,7 @@
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
 import org.apache.beam.sdk.fn.data.InboundDataClient;
 import org.apache.beam.sdk.fn.data.LogicalEndpoint;
@@ -93,7 +93,7 @@ public static CacheKey create(SdkFunctionSpec 
windowMappingFn, BoundedWindow mai
   private static final Cache<CacheKey, BoundedWindow> sideInputMappingCache =
       CacheBuilder.newBuilder().maximumSize(1000).build();
 
-  private final Supplier<String> idGenerator;
+  private final IdGenerator idGenerator;
   private final FnDataService beamFnDataService;
   private final InstructionRequestHandler instructionRequestHandler;
   private final SdkFunctionSpec windowMappingFn;
@@ -102,7 +102,7 @@ public static CacheKey create(SdkFunctionSpec 
windowMappingFn, BoundedWindow mai
   private final ProcessBundleDescriptor processBundleDescriptor;
 
   FnApiWindowMappingFn(
-      Supplier<String> idGenerator,
+      IdGenerator idGenerator,
       InstructionRequestHandler instructionRequestHandler,
       ApiServiceDescriptor dataServiceApiServiceDescriptor,
       FnDataService beamFnDataService,
@@ -223,7 +223,7 @@ public TargetWindowT getSideInputWindow(BoundedWindow 
mainWindow) {
 
   private TargetWindowT loadIfNeeded(SdkFunctionSpec windowMappingFn, 
BoundedWindow mainWindow) {
     try {
-      String processRequestInstructionId = idGenerator.get();
+      String processRequestInstructionId = idGenerator.getId();
       InstructionRequest processRequest =
           InstructionRequest.newBuilder()
               .setInstructionId(processRequestInstructionId)
@@ -296,12 +296,12 @@ private TargetWindowT loadIfNeeded(SdkFunctionSpec 
windowMappingFn, BoundedWindo
    */
   private synchronized String registerIfRequired() throws ExecutionException, 
InterruptedException {
     if (processBundleDescriptorId == null) {
-      String descriptorId = idGenerator.get();
+      String descriptorId = idGenerator.getId();
 
       CompletionStage<InstructionResponse> response =
           instructionRequestHandler.handle(
               InstructionRequest.newBuilder()
-                  .setInstructionId(idGenerator.get())
+                  .setInstructionId(idGenerator.getId())
                   .setRegister(
                       RegisterRequest.newBuilder()
                           .addProcessBundleDescriptor(
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
index e632719bc7a6..99eee23c6ce4 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
@@ -37,7 +37,6 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.function.Function;
-import java.util.function.Supplier;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.runners.core.ElementByteSizeObservable;
 import org.apache.beam.runners.dataflow.DataflowRunner;
@@ -75,6 +74,7 @@
 import org.apache.beam.runners.fnexecution.state.GrpcStateService;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
@@ -109,7 +109,7 @@ public DataflowMapTaskExecutor create(
       SinkFactory sinkFactory,
       DataflowExecutionContext<?> executionContext,
       CounterSet counterSet,
-      Supplier<String> idGenerator) {
+      IdGenerator idGenerator) {
 
     // TODO: remove this once we trust the code paths
     checkArgument(
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 80556a8194d8..d75c70327c3b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -88,7 +88,6 @@
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
 import 
org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
-import org.apache.beam.runners.dataflow.worker.fn.IdGenerator;
 import 
org.apache.beam.runners.dataflow.worker.graph.CloneAmbiguousFlattensFunction;
 import 
org.apache.beam.runners.dataflow.worker.graph.CreateRegisterFnOperationFunction;
 import 
org.apache.beam.runners.dataflow.worker.graph.DeduceFlattenLocationsFunction;
@@ -125,6 +124,8 @@
 import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.GetWorkStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.StreamPool;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.util.BackOff;
 import org.apache.beam.sdk.util.BackOffUtils;
@@ -143,12 +144,14 @@
 public class StreamingDataflowWorker {
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamingDataflowWorker.class);
 
+  /** The idGenerator to generate unique id globally. */
+  private static final IdGenerator idGenerator = 
IdGenerators.decrementingLongs();
   /**
    * Fix up MapTask representation because MultiOutputInfos are missing from 
system generated
    * ParDoInstructions.
    */
   private static final Function<MapTask, MapTask> fixMultiOutputInfos =
-      new FixMultiOutputInfosOnParDoInstructions(IdGenerator::generate);
+      new FixMultiOutputInfosOnParDoInstructions(idGenerator);
 
   /**
    * Function which converts map tasks to their network representation for 
execution.
@@ -627,11 +630,9 @@ public void run() {
       Function<MutableNetwork<Node, Edge>, Node> sdkFusedStage =
           pipeline == null
               ? RegisterNodeFunction.withoutPipeline(
-                  IdGenerator::generate, 
sdkHarnessRegistry.beamFnStateApiServiceDescriptor())
+                  idGenerator, 
sdkHarnessRegistry.beamFnStateApiServiceDescriptor())
               : RegisterNodeFunction.forPipeline(
-                  pipeline,
-                  IdGenerator::generate,
-                  sdkHarnessRegistry.beamFnStateApiServiceDescriptor());
+                  pipeline, idGenerator, 
sdkHarnessRegistry.beamFnStateApiServiceDescriptor());
       Function<MutableNetwork<Node, Edge>, MutableNetwork<Node, Edge>> 
lengthPrefixUnknownCoders =
           LengthPrefixUnknownCoders::forSdkNetwork;
       Function<MutableNetwork<Node, Edge>, MutableNetwork<Node, Edge>>
@@ -640,9 +641,7 @@ public void run() {
 
       Function<MutableNetwork<Node, Edge>, MutableNetwork<Node, Edge>> 
transformToRunnerNetwork =
           new CreateRegisterFnOperationFunction(
-              IdGenerator::generate,
-              this::createPortNode,
-              lengthPrefixUnknownCoders.andThen(sdkFusedStage));
+              idGenerator, this::createPortNode, 
lengthPrefixUnknownCoders.andThen(sdkFusedStage));
 
       mapTaskToNetwork =
           mapTaskToBaseNetwork
@@ -669,8 +668,8 @@ private Node createPortNode(String predecessorId, String 
successorId) {
         RemoteGrpcPort.newBuilder()
             
.setApiServiceDescriptor(sdkHarnessRegistry.beamFnDataApiServiceDescriptor())
             .build(),
-        IdGenerator.generate(),
-        IdGenerator.generate(),
+        idGenerator.getId(),
+        idGenerator.getId(),
         predecessorId,
         successorId);
   }
@@ -1153,7 +1152,7 @@ private void process(
                 sinkRegistry,
                 context,
                 pendingDeltaCounters,
-                IdGenerator::generate);
+                idGenerator);
         ReadOperation readOperation = mapTaskExecutor.getReadOperation();
         // Disable progress updates since its results are unused  for streaming
         // and involves starting a thread.
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/apiary/FixMultiOutputInfosOnParDoInstructions.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/apiary/FixMultiOutputInfosOnParDoInstructions.java
index e774bcc68aa0..b161baa386dc 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/apiary/FixMultiOutputInfosOnParDoInstructions.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/apiary/FixMultiOutputInfosOnParDoInstructions.java
@@ -24,7 +24,7 @@
 import com.google.common.collect.ImmutableList;
 import java.util.List;
 import java.util.function.Function;
-import java.util.function.Supplier;
+import org.apache.beam.sdk.fn.IdGenerator;
 
 /**
  * {@link ParDoInstruction}s are meant to always have {@link MultiOutputInfo}s 
which give names to
@@ -33,9 +33,9 @@
  * should supply ids outside the ids used within the {@link MapTask} to 
prevent collisions.
  */
 public class FixMultiOutputInfosOnParDoInstructions implements 
Function<MapTask, MapTask> {
-  private final Supplier<String> idGenerator;
+  private final IdGenerator idGenerator;
 
-  public FixMultiOutputInfosOnParDoInstructions(Supplier<String> idGenerator) {
+  public FixMultiOutputInfosOnParDoInstructions(IdGenerator idGenerator) {
     this.idGenerator = idGenerator;
   }
 
@@ -50,7 +50,7 @@ public MapTask apply(MapTask input) {
         if (numOutputs != 
Apiary.listOrEmpty(instruction.getParDo().getMultiOutputInfos()).size()) {
           if (numOutputs == 1) {
             parDoInstruction.setMultiOutputInfos(
-                ImmutableList.of(new 
MultiOutputInfo().setTag(idGenerator.get())));
+                ImmutableList.of(new 
MultiOutputInfo().setTag(idGenerator.getId())));
           } else {
             throw new IllegalArgumentException(
                 String.format(
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/IdGenerator.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/IdGenerator.java
deleted file mode 100644
index edf25b5fe1ad..000000000000
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/IdGenerator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.fn;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * IdGenerator generates a new, unique identifier on each invocation.
- *
- * <p>Consumers of these ids should not make any additional assumptions 
regarding the nature of the
- * returned identifiers. Uniqueness is the only guaranteed property.
- */
-public final class IdGenerator {
-  private static final AtomicLong idGenerator = new AtomicLong(-1);
-
-  public static String generate() {
-    return Long.toString(idGenerator.getAndDecrement());
-  }
-}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
index 7dd674f020f8..c316ea71dfb3 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
@@ -32,7 +32,6 @@
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
@@ -62,6 +61,7 @@
 import org.apache.beam.runners.fnexecution.state.StateDelegator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.transforms.Materializations;
@@ -89,7 +89,7 @@
 
   private static final OutputReceiver[] EMPTY_RECEIVERS = new 
OutputReceiver[0];
 
-  private final Supplier<String> idGenerator;
+  private final IdGenerator idGenerator;
   private final InstructionRequestHandler instructionRequestHandler;
   private final StateDelegator beamFnStateDelegator;
   private final RegisterRequest registerRequest;
@@ -108,7 +108,7 @@
   private String grpcReadTransformOutputName = null;
 
   public RegisterAndProcessBundleOperation(
-      Supplier<String> idGenerator,
+      IdGenerator idGenerator,
       InstructionRequestHandler instructionRequestHandler,
       StateDelegator beamFnStateDelegator,
       RegisterRequest registerRequest,
@@ -226,7 +226,7 @@ private static String escapeDot(String s) {
    */
   public synchronized String getProcessBundleInstructionId() {
     if (processBundleId == null) {
-      processBundleId = idGenerator.get();
+      processBundleId = idGenerator.getId();
     }
     return processBundleId;
   }
@@ -240,7 +240,7 @@ public void start() throws Exception {
       if (registerFuture == null) {
         InstructionRequest request =
             InstructionRequest.newBuilder()
-                .setInstructionId(idGenerator.get())
+                .setInstructionId(idGenerator.getId())
                 .setRegister(registerRequest)
                 .build();
         registerFuture = instructionRequestHandler.handle(request);
@@ -314,7 +314,7 @@ public void abort() throws Exception {
     }
     InstructionRequest processBundleRequest =
         InstructionRequest.newBuilder()
-            .setInstructionId(idGenerator.get())
+            .setInstructionId(idGenerator.getId())
             .setProcessBundleProgress(
                 
ProcessBundleProgressRequest.newBuilder().setInstructionReference(processBundleId))
             .build();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortReadOperation.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortReadOperation.java
index 7195cc6642ea..2e591a67a657 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortReadOperation.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortReadOperation.java
@@ -19,13 +19,13 @@
 
 import com.google.common.base.MoreObjects;
 import java.io.Closeable;
-import java.util.function.Supplier;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.Operation;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver;
 import org.apache.beam.runners.fnexecution.data.FnDataService;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.data.InboundDataClient;
 import org.apache.beam.sdk.fn.data.LogicalEndpoint;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -45,7 +45,7 @@
   private static final Logger LOG = 
LoggerFactory.getLogger(RemoteGrpcPortReadOperation.class);
   private final Coder<WindowedValue<T>> coder;
   private final FnDataService beamFnDataService;
-  private final Supplier<String> bundleIdSupplier;
+  private final IdGenerator bundleIdSupplier;
   // Should only be set and cleared once per start/finish cycle in the start 
method and
   // finish method respectively.
   private String bundleId;
@@ -55,7 +55,7 @@
   public RemoteGrpcPortReadOperation(
       FnDataService beamFnDataService,
       Target target,
-      Supplier<String> bundleIdSupplier,
+      IdGenerator bundleIdSupplier,
       Coder<WindowedValue<T>> coder,
       OutputReceiver[] receivers,
       OperationContext context) {
@@ -69,7 +69,7 @@ public RemoteGrpcPortReadOperation(
   @Override
   public void start() throws Exception {
     try (Closeable scope = context.enterStart()) {
-      bundleId = bundleIdSupplier.get();
+      bundleId = bundleIdSupplier.getId();
       super.start();
       inboundDataClient =
           beamFnDataService.receive(
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
index 9cf3dd4ad12a..bd625bdbd771 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
@@ -33,6 +33,7 @@
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ReceivingOperation;
 import org.apache.beam.runners.fnexecution.data.FnDataService;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
 import org.apache.beam.sdk.fn.data.LogicalEndpoint;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -50,7 +51,7 @@
   private static final OutputReceiver[] EMPTY_RECEIVER_ARRAY = new 
OutputReceiver[0];
   private final Coder<WindowedValue<T>> coder;
   private final FnDataService beamFnDataService;
-  private final Supplier<String> bundleIdSupplier;
+  private final IdGenerator bundleIdSupplier;
   // Should only be set and cleared once per start/finish cycle in the start 
method and
   // finish method respectively.
   private String bundleId;
@@ -72,7 +73,7 @@
   public RemoteGrpcPortWriteOperation(
       FnDataService beamFnDataService,
       Target target,
-      Supplier<String> bundleIdSupplier,
+      IdGenerator bundleIdSupplier,
       Coder<WindowedValue<T>> coder,
       OperationContext context) {
     this(beamFnDataService, target, bundleIdSupplier, coder, context, 
System::currentTimeMillis);
@@ -81,7 +82,7 @@ public RemoteGrpcPortWriteOperation(
   public RemoteGrpcPortWriteOperation(
       FnDataService beamFnDataService,
       Target target,
-      Supplier<String> bundleIdSupplier,
+      IdGenerator bundleIdSupplier,
       Coder<WindowedValue<T>> coder,
       OperationContext context,
       Supplier<Long> currentTimeMillis) {
@@ -101,7 +102,7 @@ public void start() throws Exception {
       targetElementsSent = 1;
       elementsFlushed = 0;
       super.start();
-      bundleId = bundleIdSupplier.get();
+      bundleId = bundleIdSupplier.getId();
       receiver = beamFnDataService.send(LogicalEndpoint.of(bundleId, target), 
coder);
     }
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunction.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunction.java
index 1ca45e9ba7f5..2a71acc79803 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunction.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunction.java
@@ -39,6 +39,7 @@
 import 
org.apache.beam.runners.dataflow.worker.graph.Nodes.InstructionOutputNode;
 import org.apache.beam.runners.dataflow.worker.graph.Nodes.Node;
 import 
org.apache.beam.runners.dataflow.worker.graph.Nodes.ParallelInstructionNode;
+import org.apache.beam.sdk.fn.IdGenerator;
 
 /**
  * Splits the instruction graph into SDK and runner harness portions replacing 
the SDK sub-graphs
@@ -74,7 +75,7 @@
 public class CreateRegisterFnOperationFunction
     implements Function<MutableNetwork<Node, Edge>, MutableNetwork<Node, 
Edge>> {
 
-  private final Supplier<String> idGenerator;
+  private final IdGenerator idGenerator;
   private final BiFunction<String, String, Node> portSupplier;
   private final Function<MutableNetwork<Node, Edge>, Node> 
registerFnOperationFunction;
 
@@ -90,7 +91,7 @@
    *     produces a {@link Node} that is able to register the SDK functions 
within the SDK harness.
    */
   public CreateRegisterFnOperationFunction(
-      Supplier<String> idGenerator,
+      IdGenerator idGenerator,
       BiFunction<String, String, Node> portSupplier,
       Function<MutableNetwork<Node, Edge>, Node> registerFnOperationFunction) {
     this.idGenerator = idGenerator;
@@ -243,8 +244,8 @@ private Node rewireAcrossSdkRunnerPortNode(
         InstructionOutputNode.create(outputNode.getInstructionOutput());
     InstructionOutputNode portOutputNode =
         InstructionOutputNode.create(outputNode.getInstructionOutput());
-    String predecessorPortEdgeId = idGenerator.get();
-    String successorPortEdgeId = idGenerator.get();
+    String predecessorPortEdgeId = idGenerator.getId();
+    String successorPortEdgeId = idGenerator.getId();
     Node portNode = portSupplier.apply(predecessorPortEdgeId, 
successorPortEdgeId);
     network.addNode(newPredecessorOutputNode);
     network.addNode(portNode);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
index bccb26a73437..8791fd40749d 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
@@ -42,7 +42,6 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
-import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.RegisterRequest;
@@ -79,6 +78,7 @@
 import org.apache.beam.runners.fnexecution.wire.LengthPrefixUnknownCoders;
 import org.apache.beam.runners.fnexecution.wire.WireCoders;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.values.KV;
@@ -115,7 +115,7 @@
   private static final String SERIALIZED_SOURCE = "serialized_source";
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
-  private final Supplier<String> idGenerator;
+  private final IdGenerator idGenerator;
   private final Endpoints.ApiServiceDescriptor stateApiServiceDescriptor;
   private final @Nullable RunnerApi.Pipeline pipeline;
 
@@ -125,7 +125,7 @@
    */
   public static RegisterNodeFunction forPipeline(
       RunnerApi.Pipeline pipeline,
-      Supplier<String> idGenerator,
+      IdGenerator idGenerator,
       Endpoints.ApiServiceDescriptor stateApiServiceDescriptor) {
     return new RegisterNodeFunction(pipeline, idGenerator, 
stateApiServiceDescriptor);
   }
@@ -136,13 +136,13 @@ public static RegisterNodeFunction forPipeline(
    * harnesses, then this method should be removed.
    */
   public static RegisterNodeFunction withoutPipeline(
-      Supplier<String> idGenerator, Endpoints.ApiServiceDescriptor 
stateApiServiceDescriptor) {
+      IdGenerator idGenerator, Endpoints.ApiServiceDescriptor 
stateApiServiceDescriptor) {
     return new RegisterNodeFunction(null, idGenerator, 
stateApiServiceDescriptor);
   }
 
   private RegisterNodeFunction(
       @Nullable RunnerApi.Pipeline pipeline,
-      Supplier<String> idGenerator,
+      IdGenerator idGenerator,
       Endpoints.ApiServiceDescriptor stateApiServiceDescriptor) {
     this.pipeline = pipeline;
     this.idGenerator = idGenerator;
@@ -173,7 +173,7 @@ public Node apply(MutableNetwork<Node, Edge> input) {
             input.addEdge(
                 node,
                 successor,
-                MultiOutputInfoEdge.create(new 
MultiOutputInfo().setTag(idGenerator.get())));
+                MultiOutputInfoEdge.create(new 
MultiOutputInfo().setTag(idGenerator.getId())));
           }
         }
       }
@@ -185,7 +185,7 @@ public Node apply(MutableNetwork<Node, Edge> input) {
 
     ProcessBundleDescriptor.Builder processBundleDescriptor =
         ProcessBundleDescriptor.newBuilder()
-            .setId(idGenerator.get())
+            .setId(idGenerator.getId())
             .setStateApiServiceDescriptor(stateApiServiceDescriptor);
 
     // For intermediate PCollections we fabricate, we make a bogus 
WindowingStrategy
@@ -199,7 +199,7 @@ public Node apply(MutableNetwork<Node, Edge> input) {
       
sdkComponents.registerEnvironment(Environments.JAVA_SDK_HARNESS_ENVIRONMENT);
     }
 
-    String fakeWindowingStrategyId = "fakeWindowingStrategy" + 
idGenerator.get();
+    String fakeWindowingStrategyId = "fakeWindowingStrategy" + 
idGenerator.getId();
     try {
       RunnerApi.MessageWithComponents fakeWindowingStrategyProto =
           WindowingStrategyTranslation.toMessageProto(
@@ -224,7 +224,7 @@ public Node apply(MutableNetwork<Node, Edge> input) {
         Iterables.filter(input.nodes(), InstructionOutputNode.class)) {
       InstructionOutput instructionOutput = node.getInstructionOutput();
 
-      String coderId = "generatedCoder" + idGenerator.get();
+      String coderId = "generatedCoder" + idGenerator.getId();
       try (ByteString.Output output = ByteString.newOutput()) {
         try {
           Coder<?> javaCoder =
@@ -259,7 +259,7 @@ public Node apply(MutableNetwork<Node, Edge> input) {
             e);
       }
 
-      String pcollectionId = "generatedPcollection" + idGenerator.get();
+      String pcollectionId = "generatedPcollection" + idGenerator.getId();
       processBundleDescriptor.putPcollections(
           pcollectionId,
           RunnerApi.PCollection.newBuilder()
@@ -273,7 +273,7 @@ public Node apply(MutableNetwork<Node, Edge> input) {
     for (ParallelInstructionNode node :
         Iterables.filter(input.nodes(), ParallelInstructionNode.class)) {
       ParallelInstruction parallelInstruction = node.getParallelInstruction();
-      String ptransformId = "generatedPtransform" + idGenerator.get();
+      String ptransformId = "generatedPtransform" + idGenerator.getId();
       ptransformIdToNameContexts.put(
           ptransformId,
           NameContext.create(
@@ -383,7 +383,7 @@ public Node apply(MutableNetwork<Node, Edge> input) {
 
       for (Node predecessorOutput : input.predecessors(node)) {
         pTransform.putInputs(
-            "generatedInput" + idGenerator.get(), 
nodesToPCollections.get(predecessorOutput));
+            "generatedInput" + idGenerator.getId(), 
nodesToPCollections.get(predecessorOutput));
       }
 
       for (Edge edge : input.outEdges(node)) {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFnTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFnTest.java
index eb44eaa93268..ae9abf891c17 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFnTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFnTest.java
@@ -34,10 +34,10 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
 import org.apache.beam.runners.core.construction.ParDoTranslation;
 import org.apache.beam.runners.core.construction.SdkComponents;
-import org.apache.beam.runners.dataflow.worker.fn.IdGenerator;
 import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
 import org.apache.beam.runners.fnexecution.data.FnDataService;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
 import org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
@@ -73,7 +73,7 @@ public void testWindowMapping() throws Exception {
 
     FnApiWindowMappingFn windowMappingFn =
         new FnApiWindowMappingFn(
-            IdGenerator::generate,
+            IdGenerators.decrementingLongs(),
             testSdkHarness,
             DATA_SERVICE,
             testSdkHarness,
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
index 972c19a01122..be947c48276b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
@@ -67,7 +67,6 @@
 import 
org.apache.beam.runners.dataflow.worker.counters.Counter.CounterUpdateExtractor;
 import 
org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterMean;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
-import org.apache.beam.runners.dataflow.worker.fn.IdGenerator;
 import org.apache.beam.runners.dataflow.worker.graph.Edges.Edge;
 import org.apache.beam.runners.dataflow.worker.graph.Edges.MultiOutputInfoEdge;
 import org.apache.beam.runners.dataflow.worker.graph.MapTaskToNetworkFunction;
@@ -88,6 +87,8 @@
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -115,8 +116,10 @@
 public class IntrinsicMapTaskExecutorFactoryTest {
   private static final String STAGE = "test";
 
+  private static final IdGenerator idGenerator = 
IdGenerators.decrementingLongs();
+
   private static final Function<MapTask, MutableNetwork<Node, Edge>> 
mapTaskToNetwork =
-      new FixMultiOutputInfosOnParDoInstructions(IdGenerator::generate)
+      new FixMultiOutputInfosOnParDoInstructions(idGenerator)
           .andThen(new MapTaskToNetworkFunction());
 
   private static final CloudObject windowedStringCoder =
@@ -180,7 +183,7 @@ public void testCreateMapTaskExecutor() throws Exception {
             sinkRegistry,
             BatchModeExecutionContext.forTesting(options, counterSet, 
"testStage"),
             counterSet,
-            IdGenerator::generate)) {
+            idGenerator)) {
       // Safe covariant cast not expressible without rawtypes.
       @SuppressWarnings({"rawtypes", "unchecked"})
       List<Object> operations = (List) executor.operations;
@@ -271,7 +274,7 @@ public void testExecutionContextPlumbing() throws Exception 
{
             sinkRegistry,
             context,
             counterSet,
-            IdGenerator::generate)) {
+            idGenerator)) {
       executor.execute();
     }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/apiary/FixMultiOutputInfosOnParDoInstructionsTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/apiary/FixMultiOutputInfosOnParDoInstructionsTest.java
index 7a2436947762..99fc74594539 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/apiary/FixMultiOutputInfosOnParDoInstructionsTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/apiary/FixMultiOutputInfosOnParDoInstructionsTest.java
@@ -26,8 +26,7 @@
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Supplier;
+import org.apache.beam.sdk.fn.IdGenerators;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -39,19 +38,10 @@
 public class FixMultiOutputInfosOnParDoInstructionsTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
 
-  private Supplier<String> makeIdGenerator() {
-    return makeIdGeneratorStartingFrom(-1);
-  }
-
-  private Supplier<String> makeIdGeneratorStartingFrom(long initialValue) {
-    AtomicLong longIdGenerator = new AtomicLong(initialValue);
-    return () -> Long.toString(longIdGenerator.getAndDecrement());
-  }
-
   @Test
   public void testExistingMultiOutputInfosAreUnmodified() {
     FixMultiOutputInfosOnParDoInstructions function =
-        new FixMultiOutputInfosOnParDoInstructions(makeIdGenerator());
+        new 
FixMultiOutputInfosOnParDoInstructions(IdGenerators.decrementingLongs());
     MapTask output = function.apply(createMapTaskWithParDo(2, "5", "6"));
     assertEquals(createMapTaskWithParDo(2, "5", "6"), output);
   }
@@ -59,7 +49,7 @@ public void testExistingMultiOutputInfosAreUnmodified() {
   @Test
   public void testDefaultOutputIsAddedIfOnlySingleOutput() {
     FixMultiOutputInfosOnParDoInstructions function =
-        new FixMultiOutputInfosOnParDoInstructions(makeIdGenerator());
+        new 
FixMultiOutputInfosOnParDoInstructions(IdGenerators.decrementingLongs());
     MapTask output = function.apply(createMapTaskWithParDo(1));
     assertEquals(createMapTaskWithParDo(1, "-1"), output);
   }
@@ -67,7 +57,7 @@ public void testDefaultOutputIsAddedIfOnlySingleOutput() {
   @Test
   public void testDefaultOutputHasDifferentIdsForEachMapTask() {
     FixMultiOutputInfosOnParDoInstructions function =
-        new FixMultiOutputInfosOnParDoInstructions(makeIdGenerator());
+        new 
FixMultiOutputInfosOnParDoInstructions(IdGenerators.decrementingLongs());
     MapTask output = function.apply(createMapTaskWithParDo(1));
     assertEquals(createMapTaskWithParDo(1, "-1"), output);
 
@@ -78,7 +68,7 @@ public void testDefaultOutputHasDifferentIdsForEachMapTask() {
   @Test
   public void testMissingTagsForMultipleOutputsThrows() {
     FixMultiOutputInfosOnParDoInstructions function =
-        new 
FixMultiOutputInfosOnParDoInstructions(makeIdGeneratorStartingFrom(0));
+        new 
FixMultiOutputInfosOnParDoInstructions(IdGenerators.decrementingLongs());
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("Invalid ParDoInstruction");
     thrown.expectMessage("2 outputs specified");
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/IdGeneratorTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/IdGeneratorTest.java
deleted file mode 100644
index 2bfca08240d7..000000000000
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/IdGeneratorTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.fn;
-
-import static org.junit.Assert.assertNotEquals;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link IdGenerator}. */
-@RunWith(JUnit4.class)
-public class IdGeneratorTest {
-  @Test
-  public void testGenerationNeverMatches() {
-    String previous = IdGenerator.generate();
-    for (int i = 0; i < 10000; ++i) {
-      String next = IdGenerator.generate();
-      assertNotEquals(previous, next);
-      previous = next;
-    }
-  }
-}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
index 9bcb2242b2b4..dd07d994cea0 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
@@ -32,8 +32,6 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Supplier;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
@@ -46,6 +44,7 @@
 import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
 import org.apache.beam.runners.fnexecution.state.StateDelegator;
 import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
 import org.apache.beam.sdk.util.MoreFutures;
 import org.junit.Before;
@@ -103,8 +102,6 @@ public void setUp() {
 
   @Test(timeout = ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS * 10)
   public void testTentativeUserMetrics() throws Exception {
-    Supplier<String> idGenerator = makeIdGeneratorStartingFrom(777L);
-
     final String stepName = "fakeStepNameWithUserMetrics";
     final String namespace = "sdk/whatever";
     final String name = "someCounter";
@@ -166,7 +163,7 @@ public void close() {}
 
     RegisterAndProcessBundleOperation processOperation =
         new RegisterAndProcessBundleOperation(
-            idGenerator,
+            IdGenerators.decrementingLongs(),
             instructionRequestHandler,
             mockBeamFnStateDelegator,
             REGISTER_REQUEST,
@@ -204,8 +201,6 @@ public void close() {}
   /** Tests that successive metric updates overwrite the previous. */
   @Test(timeout = ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS * 10)
   public void testTentativeUserMetricsOverwrite() throws Exception {
-    Supplier<String> idGenerator = makeIdGeneratorStartingFrom(777L);
-
     final String stepName = "fakeStepNameWithUserMetrics";
     final String namespace = "sdk/whatever";
     final String name = "someCounter";
@@ -275,7 +270,7 @@ public void close() {}
 
     RegisterAndProcessBundleOperation processOperation =
         new RegisterAndProcessBundleOperation(
-            idGenerator,
+            IdGenerators.decrementingLongs(),
             instructionRequestHandler,
             mockBeamFnStateDelegator,
             REGISTER_REQUEST,
@@ -312,8 +307,6 @@ public void close() {}
 
   @Test(timeout = ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS * 10)
   public void testFinalUserMetrics() throws Exception {
-    Supplier<String> idGenerator = makeIdGeneratorStartingFrom(777L);
-
     final String stepName = "fakeStepNameWithUserMetrics";
     final String namespace = "sdk/whatever";
     final String name = "someCounter";
@@ -391,7 +384,7 @@ public void close() {}
 
     RegisterAndProcessBundleOperation processOperation =
         new RegisterAndProcessBundleOperation(
-            idGenerator,
+            IdGenerators.decrementingLongs(),
             instructionRequestHandler,
             mockBeamFnStateDelegator,
             REGISTER_REQUEST,
@@ -430,11 +423,6 @@ public void close() {}
         contains(new 
CounterHamcrestMatchers.CounterUpdateIntegerValueMatcher(finalCounterValue)));
   }
 
-  private Supplier<String> makeIdGeneratorStartingFrom(long initialValue) {
-    AtomicLong longIdGenerator = new AtomicLong(initialValue);
-    return () -> Long.toString(longIdGenerator.getAndIncrement());
-  }
-
   private BeamFnApi.InstructionResponse.Builder 
responseFor(BeamFnApi.InstructionRequest request) {
     return 
BeamFnApi.InstructionResponse.newBuilder().setInstructionId(request.getInstructionId());
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
index 342216b5638b..8e24bf8603cb 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
@@ -46,7 +46,6 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
@@ -63,7 +62,6 @@
 import org.apache.beam.runners.core.SideInputReader;
 import 
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
 import 
org.apache.beam.runners.dataflow.worker.DataflowPortabilityPCollectionView;
-import org.apache.beam.runners.dataflow.worker.fn.IdGenerator;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext;
 import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
 import org.apache.beam.runners.fnexecution.state.StateDelegator;
@@ -71,6 +69,8 @@
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -140,15 +140,21 @@ public void abort() {
             });
   }
 
-  private Supplier<String> makeIdGeneratorStartingFrom(long initialValue) {
-    AtomicLong longIdGenerator = new AtomicLong(initialValue);
-    return () -> Long.toString(longIdGenerator.getAndIncrement());
+  private IdGenerator makeIdGeneratorStartingFrom(long initialValue) {
+    return new IdGenerator() {
+      AtomicLong longs = new AtomicLong(initialValue);
+
+      @Override
+      public String getId() {
+        return Long.toString(longs.getAndIncrement());
+      }
+    };
   }
 
   @Test
   public void testSupportsRestart() {
     new RegisterAndProcessBundleOperation(
-            IdGenerator::generate,
+            IdGenerators.decrementingLongs(),
             new InstructionRequestHandler() {
               @Override
               public CompletionStage<InstructionResponse> 
handle(InstructionRequest request) {
@@ -173,7 +179,7 @@ public void close() {}
   @Test
   public void testRegisterOnlyOnFirstBundle() throws Exception {
     List<BeamFnApi.InstructionRequest> requests = new ArrayList<>();
-    Supplier<String> idGenerator = makeIdGeneratorStartingFrom(777L);
+    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
     RegisterAndProcessBundleOperation operation =
         new RegisterAndProcessBundleOperation(
             idGenerator,
@@ -243,7 +249,7 @@ public void close() {}
 
   @Test
   public void testTentativeUserMetrics() throws Exception {
-    Supplier<String> idGenerator = makeIdGeneratorStartingFrom(777L);
+    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
 
     CountDownLatch processBundleLatch = new CountDownLatch(1);
 
@@ -330,7 +336,7 @@ public void close() {}
   @Test
   public void testFinalUserMetrics() throws Exception {
     List<BeamFnApi.InstructionRequest> requests = new ArrayList<>();
-    Supplier<String> idGenerator = makeIdGeneratorStartingFrom(777L);
+    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
     ExecutorService executorService = Executors.newCachedThreadPool();
 
     CountDownLatch processBundleLatch = new CountDownLatch(1);
@@ -438,7 +444,7 @@ public void close() {}
   @Test
   public void testProcessingBundleBlocksOnFinish() throws Exception {
     List<BeamFnApi.InstructionRequest> requests = new ArrayList<>();
-    Supplier<String> idGenerator = makeIdGeneratorStartingFrom(777L);
+    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
     ExecutorService executorService = Executors.newCachedThreadPool();
     RegisterAndProcessBundleOperation operation =
         new RegisterAndProcessBundleOperation(
@@ -510,7 +516,7 @@ public void close() {}
 
   @Test
   public void testProcessingBundleHandlesUserStateRequests() throws Exception {
-    Supplier<String> idGenerator = makeIdGeneratorStartingFrom(777L);
+    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
     ExecutorService executorService = Executors.newCachedThreadPool();
 
     InMemoryStateInternals<ByteString> stateInternals =
@@ -620,7 +626,7 @@ public void close() {}
 
   @Test
   public void testProcessingBundleHandlesMultimapSideInputRequests() throws 
Exception {
-    Supplier<String> idGenerator = makeIdGeneratorStartingFrom(777L);
+    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
     ExecutorService executorService = Executors.newCachedThreadPool();
 
     DataflowStepContext mockStepContext = mock(DataflowStepContext.class);
@@ -748,7 +754,7 @@ public boolean isEmpty() {
 
   @Test
   public void testAbortCancelsAndCleansUpDuringRegister() throws Exception {
-    Supplier<String> idGenerator = makeIdGeneratorStartingFrom(777L);
+    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
     ExecutorService executorService = Executors.newCachedThreadPool();
 
     CountDownLatch waitForAbortToComplete = new CountDownLatch(1);
@@ -795,7 +801,7 @@ public void close() {}
 
   @Test
   public void testAbortCancelsAndCleansUpDuringProcessBundle() throws 
Exception {
-    Supplier<String> idGenerator = makeIdGeneratorStartingFrom(777L);
+    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
     ExecutorService executorService = Executors.newCachedThreadPool();
 
     CountDownLatch waitForAbortToComplete = new CountDownLatch(1);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortReadOperationTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortReadOperationTest.java
index ae417da6183c..d73980126ad6 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortReadOperationTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortReadOperationTest.java
@@ -30,7 +30,6 @@
 
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.function.Supplier;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.runners.dataflow.worker.NameContextsForTests;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext;
@@ -39,6 +38,7 @@
 import org.apache.beam.runners.fnexecution.data.FnDataService;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.fn.data.InboundDataClient;
@@ -68,7 +68,7 @@
   @Mock private FnDataService beamFnDataService;
   @Mock private OperationContext operationContext;
   @Captor private ArgumentCaptor<FnDataReceiver<WindowedValue<String>>> 
consumerCaptor;
-  @Mock private Supplier<String> bundleIdSupplier;
+  @Mock private IdGenerator bundleIdSupplier;
   private RemoteGrpcPortReadOperation<String> operation;
   private TestOutputReceiver testReceiver;
 
@@ -96,7 +96,7 @@ public void testSuccessfulProcessing() throws Exception {
     InboundDataClient inboundDataClient = 
CompletableFutureInboundDataClient.create();
     when(beamFnDataService.receive(any(), 
Matchers.<Coder<WindowedValue<String>>>any(), any()))
         .thenReturn(inboundDataClient);
-    when(bundleIdSupplier.get()).thenReturn(BUNDLE_ID);
+    when(bundleIdSupplier.getId()).thenReturn(BUNDLE_ID);
 
     operation.start();
     verify(beamFnDataService)
@@ -122,14 +122,14 @@ public void testSuccessfulProcessing() throws Exception {
     inboundDataClient.complete();
     operationFinish.get();
 
-    verify(bundleIdSupplier, times(1)).get();
+    verify(bundleIdSupplier, times(1)).getId();
     assertThat(
         testReceiver.outputElems,
         contains(
             valueInGlobalWindow("ABC"), valueInGlobalWindow("DEF"), 
valueInGlobalWindow("GHI")));
 
     // Ensure that the old bundle id is cleared.
-    when(bundleIdSupplier.get()).thenReturn(BUNDLE_ID_2);
+    when(bundleIdSupplier.getId()).thenReturn(BUNDLE_ID_2);
     operation.start();
     verify(beamFnDataService)
         .receive(eq(LogicalEndpoint.of(BUNDLE_ID_2, TARGET)), eq(CODER), 
consumerCaptor.capture());
@@ -140,7 +140,7 @@ public void testStartAndAbort() throws Exception {
     InboundDataClient inboundDataClient = 
CompletableFutureInboundDataClient.create();
     when(beamFnDataService.receive(any(), 
Matchers.<Coder<WindowedValue<String>>>any(), any()))
         .thenReturn(inboundDataClient);
-    when(bundleIdSupplier.get()).thenReturn(BUNDLE_ID);
+    when(bundleIdSupplier.getId()).thenReturn(BUNDLE_ID);
 
     operation.start();
     verify(beamFnDataService)
@@ -149,6 +149,6 @@ public void testStartAndAbort() throws Exception {
     assertFalse(inboundDataClient.isDone());
     operation.abort();
     assertTrue(inboundDataClient.isDone());
-    verify(bundleIdSupplier, times(1)).get();
+    verify(bundleIdSupplier, times(1)).getId();
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperationTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperationTest.java
index 727413210781..404e71885387 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperationTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperationTest.java
@@ -31,12 +31,12 @@
 import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
-import java.util.function.Supplier;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext;
 import org.apache.beam.runners.fnexecution.data.FnDataService;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
 import org.apache.beam.sdk.fn.data.LogicalEndpoint;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -59,7 +59,7 @@
   private static final String BUNDLE_ID = "999";
   private static final String BUNDLE_ID_2 = "222";
 
-  @Mock Supplier<String> bundleIdSupplier;
+  @Mock IdGenerator bundleIdSupplier;
   @Mock private FnDataService beamFnDataService;
   @Mock private OperationContext operationContext;
   private RemoteGrpcPortWriteOperation<String> operation;
@@ -82,7 +82,7 @@ public void testSuccessfulProcessing() throws Exception {
     RecordingConsumer<WindowedValue<String>> recordingConsumer = new 
RecordingConsumer<>();
     when(beamFnDataService.send(any(), 
Matchers.<Coder<WindowedValue<String>>>any()))
         .thenReturn(recordingConsumer);
-    when(bundleIdSupplier.get()).thenReturn(BUNDLE_ID);
+    when(bundleIdSupplier.getId()).thenReturn(BUNDLE_ID);
     operation.start();
     verify(beamFnDataService).send(LogicalEndpoint.of(BUNDLE_ID, TARGET), 
CODER);
     assertFalse(recordingConsumer.closed);
@@ -94,7 +94,7 @@ public void testSuccessfulProcessing() throws Exception {
 
     operation.finish();
     assertTrue(recordingConsumer.closed);
-    verify(bundleIdSupplier, times(1)).get();
+    verify(bundleIdSupplier, times(1)).getId();
 
     assertThat(
         recordingConsumer,
@@ -102,7 +102,7 @@ public void testSuccessfulProcessing() throws Exception {
             valueInGlobalWindow("ABC"), valueInGlobalWindow("DEF"), 
valueInGlobalWindow("GHI")));
 
     // Ensure that the old bundle id is cleared.
-    when(bundleIdSupplier.get()).thenReturn(BUNDLE_ID_2);
+    when(bundleIdSupplier.getId()).thenReturn(BUNDLE_ID_2);
     when(beamFnDataService.send(any(), 
Matchers.<Coder<WindowedValue<String>>>any()))
         .thenReturn(recordingConsumer);
     operation.start();
@@ -116,7 +116,7 @@ public void testStartAndAbort() throws Exception {
     RecordingConsumer<WindowedValue<String>> recordingConsumer = new 
RecordingConsumer<>();
     when(beamFnDataService.send(any(), 
Matchers.<Coder<WindowedValue<String>>>any()))
         .thenReturn(recordingConsumer);
-    when(bundleIdSupplier.get()).thenReturn(BUNDLE_ID);
+    when(bundleIdSupplier.getId()).thenReturn(BUNDLE_ID);
     operation.start();
     verify(beamFnDataService).send(LogicalEndpoint.of(BUNDLE_ID, TARGET), 
CODER);
     assertFalse(recordingConsumer.closed);
@@ -127,7 +127,7 @@ public void testStartAndAbort() throws Exception {
 
     operation.abort();
     assertTrue(recordingConsumer.closed);
-    verify(bundleIdSupplier, times(1)).get();
+    verify(bundleIdSupplier, times(1)).getId();
 
     verifyNoMoreInteractions(beamFnDataService);
   }
@@ -156,7 +156,7 @@ public void testBufferRateLimiting() throws Exception {
     RecordingConsumer<WindowedValue<String>> recordingConsumer = new 
RecordingConsumer<>();
     when(beamFnDataService.send(any(), 
Matchers.<Coder<WindowedValue<String>>>any()))
         .thenReturn(recordingConsumer);
-    when(bundleIdSupplier.get()).thenReturn(BUNDLE_ID);
+    when(bundleIdSupplier.getId()).thenReturn(BUNDLE_ID);
 
     Consumer<Integer> processedElementConsumer = 
operation.processedElementsConsumer();
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunctionTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunctionTest.java
index fe29bc3aad3a..cb8d55a7439d 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunctionTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunctionTest.java
@@ -40,13 +40,13 @@
 import java.util.List;
 import java.util.function.BiFunction;
 import java.util.function.Function;
-import org.apache.beam.runners.dataflow.worker.fn.IdGenerator;
 import org.apache.beam.runners.dataflow.worker.graph.Edges.DefaultEdge;
 import org.apache.beam.runners.dataflow.worker.graph.Edges.Edge;
 import org.apache.beam.runners.dataflow.worker.graph.Edges.HappensBeforeEdge;
 import 
org.apache.beam.runners.dataflow.worker.graph.Nodes.InstructionOutputNode;
 import org.apache.beam.runners.dataflow.worker.graph.Nodes.Node;
 import 
org.apache.beam.runners.dataflow.worker.graph.Nodes.ParallelInstructionNode;
+import org.apache.beam.sdk.fn.IdGenerators;
 import org.hamcrest.Matchers;
 import org.junit.Before;
 import org.junit.Test;
@@ -70,7 +70,7 @@ public void setUp() {
     MockitoAnnotations.initMocks(this);
     createRegisterFnOperation =
         new CreateRegisterFnOperationFunction(
-            IdGenerator::generate, portSupplier, registerFnOperationFunction);
+            IdGenerators.decrementingLongs(), portSupplier, 
registerFnOperationFunction);
   }
 
   @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 172117)
    Time Spent: 40m  (was: 0.5h)

> Replace duplicated runner IdGenerator code with IdGenerator from shared lib
> ---------------------------------------------------------------------------
>
>                 Key: BEAM-6180
>                 URL: https://issues.apache.org/jira/browse/BEAM-6180
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-dataflow
>            Reporter: Boyuan Zhang
>            Assignee: Boyuan Zhang
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to