m-trieu commented on code in PR #31133:
URL: https://github.com/apache/beam/pull/31133#discussion_r1585820746


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -387,24 +393,53 @@ private StreamingDataflowWorker(
     LOG.debug("WindmillServiceEndpoint: {}", 
options.getWindmillServiceEndpoint());
     LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort());
     LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
-    LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes);
+    LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes.get());
   }
 
   public static StreamingDataflowWorker 
fromOptions(DataflowWorkerHarnessOptions options) {
     long clientId = clientIdGenerator.nextLong();
     MemoryMonitor memoryMonitor = MemoryMonitor.fromOptions(options);
-    ConcurrentMap<String, ComputationState> computationMap = new 
ConcurrentHashMap<>();
     ConcurrentMap<String, StageInfo> stageInfo = new ConcurrentHashMap<>();
+    ConcurrentMap<String, String> stateNameMap = new ConcurrentHashMap<>();
     StreamingCounters streamingCounters = StreamingCounters.create();
-
+    WorkUnitClient dataflowServiceClient = new DataflowWorkUnitClient(options, 
LOG);
+    BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
+    AtomicInteger maxWorkItemCommitBytes = new 
AtomicInteger(Integer.MAX_VALUE);
+    WindmillStateCache windmillStateCache =
+        WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb());
+    Function<String, ScheduledExecutorService> executorSupplier =
+        threadName ->
+            Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setNameFormat(threadName).build());
     GrpcWindmillStreamFactory windmillStreamFactory =
         createWindmillStreamFactory(options, clientId);
-    WindmillServerStub windmillServer =
-        createWindmillServerStub(
-            options,
-            windmillStreamFactory,
-            new WorkHeartbeatResponseProcessor(
-                computationId -> 
Optional.ofNullable(computationMap.get(computationId))));
+    WindmillServerStub windmillServer = createWindmillServerStub(options, 
windmillStreamFactory);
+    ComputationConfig.Fetcher configFetcher =
+        options.isEnableStreamingEngine()
+            ? StreamingEngineConfigFetcher.forTesting(

Review Comment:
   removed this was supposed to just use the create method 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to