m-trieu commented on code in PR #31133:
URL: https://github.com/apache/beam/pull/31133#discussion_r1585824700


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -387,24 +393,53 @@ private StreamingDataflowWorker(
     LOG.debug("WindmillServiceEndpoint: {}", 
options.getWindmillServiceEndpoint());
     LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort());
     LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
-    LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes);
+    LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes.get());
   }
 
   public static StreamingDataflowWorker 
fromOptions(DataflowWorkerHarnessOptions options) {
     long clientId = clientIdGenerator.nextLong();
     MemoryMonitor memoryMonitor = MemoryMonitor.fromOptions(options);
-    ConcurrentMap<String, ComputationState> computationMap = new 
ConcurrentHashMap<>();
     ConcurrentMap<String, StageInfo> stageInfo = new ConcurrentHashMap<>();
+    ConcurrentMap<String, String> stateNameMap = new ConcurrentHashMap<>();
     StreamingCounters streamingCounters = StreamingCounters.create();
-
+    WorkUnitClient dataflowServiceClient = new DataflowWorkUnitClient(options, 
LOG);
+    BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
+    AtomicInteger maxWorkItemCommitBytes = new 
AtomicInteger(Integer.MAX_VALUE);
+    WindmillStateCache windmillStateCache =
+        WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb());
+    Function<String, ScheduledExecutorService> executorSupplier =
+        threadName ->
+            Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setNameFormat(threadName).build());
     GrpcWindmillStreamFactory windmillStreamFactory =
         createWindmillStreamFactory(options, clientId);
-    WindmillServerStub windmillServer =
-        createWindmillServerStub(
-            options,
-            windmillStreamFactory,
-            new WorkHeartbeatResponseProcessor(
-                computationId -> 
Optional.ofNullable(computationMap.get(computationId))));
+    WindmillServerStub windmillServer = createWindmillServerStub(options, 
windmillStreamFactory);
+    ComputationConfig.Fetcher configFetcher =
+        options.isEnableStreamingEngine()
+            ? StreamingEngineConfigFetcher.forTesting(
+                true,
+                options.getGlobalConfigRefreshPeriod().getMillis(),
+                dataflowServiceClient,
+                executorSupplier,
+                FIX_MULTI_OUTPUT_INFOS_ON_PAR_DO_INSTRUCTIONS,
+                config ->
+                    onPipelineConfig(
+                        config,
+                        stateNameMap,
+                        windmillServer::setWindmillServiceEndpoints,
+                        maxWorkItemCommitBytes))
+            : new StreamingApplianceConfigFetcher(
+                windmillServer,
+                config -> consumeUserStepToStateFamilyName(config, 
stateNameMap),
+                FIX_MULTI_OUTPUT_INFOS_ON_PAR_DO_INSTRUCTIONS);
+    ComputationStateCache computationStateCache =
+        ComputationStateCache.create(
+            configFetcher, workExecutor, windmillStateCache::forComputation);
+    if (windmillServer instanceof GrpcWindmillServer) {

Review Comment:
   sgtm agreed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to