m-trieu commented on code in PR #31133:
URL: https://github.com/apache/beam/pull/31133#discussion_r1585964743


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -387,24 +393,53 @@ private StreamingDataflowWorker(
     LOG.debug("WindmillServiceEndpoint: {}", 
options.getWindmillServiceEndpoint());
     LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort());
     LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
-    LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes);
+    LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes.get());
   }
 
   public static StreamingDataflowWorker 
fromOptions(DataflowWorkerHarnessOptions options) {
     long clientId = clientIdGenerator.nextLong();
     MemoryMonitor memoryMonitor = MemoryMonitor.fromOptions(options);
-    ConcurrentMap<String, ComputationState> computationMap = new 
ConcurrentHashMap<>();
     ConcurrentMap<String, StageInfo> stageInfo = new ConcurrentHashMap<>();
+    ConcurrentMap<String, String> stateNameMap = new ConcurrentHashMap<>();
     StreamingCounters streamingCounters = StreamingCounters.create();
-
+    WorkUnitClient dataflowServiceClient = new DataflowWorkUnitClient(options, 
LOG);
+    BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
+    AtomicInteger maxWorkItemCommitBytes = new 
AtomicInteger(Integer.MAX_VALUE);
+    WindmillStateCache windmillStateCache =
+        WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb());
+    Function<String, ScheduledExecutorService> executorSupplier =
+        threadName ->
+            Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setNameFormat(threadName).build());
     GrpcWindmillStreamFactory windmillStreamFactory =
         createWindmillStreamFactory(options, clientId);
-    WindmillServerStub windmillServer =
-        createWindmillServerStub(
-            options,
-            windmillStreamFactory,
-            new WorkHeartbeatResponseProcessor(
-                computationId -> 
Optional.ofNullable(computationMap.get(computationId))));
+    WindmillServerStub windmillServer = createWindmillServerStub(options, 
windmillStreamFactory);
+    ComputationConfig.Fetcher configFetcher =
+        options.isEnableStreamingEngine()
+            ? StreamingEngineConfigFetcher.forTesting(
+                true,
+                options.getGlobalConfigRefreshPeriod().getMillis(),
+                dataflowServiceClient,
+                executorSupplier,
+                FIX_MULTI_OUTPUT_INFOS_ON_PAR_DO_INSTRUCTIONS,
+                config ->
+                    onPipelineConfig(
+                        config,
+                        stateNameMap,
+                        windmillServer::setWindmillServiceEndpoints,
+                        maxWorkItemCommitBytes))
+            : new StreamingApplianceConfigFetcher(
+                windmillServer,
+                config -> consumeUserStepToStateFamilyName(config, 
stateNameMap),
+                FIX_MULTI_OUTPUT_INFOS_ON_PAR_DO_INSTRUCTIONS);
+    ComputationStateCache computationStateCache =
+        ComputationStateCache.create(
+            configFetcher, workExecutor, windmillStateCache::forComputation);
+    if (windmillServer instanceof GrpcWindmillServer) {

Review Comment:
   done
   i changed the way this was created, still not super clean but much better 
than before.
   
   I wonder if we can make `GrpcWindmillServer` not implement 
`WindmillServerStub`.  Not sure it currently makes sense to group together 
Appliance and Engine client implementations and leads to wonky situtations like 
above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to