m-trieu commented on code in PR #31133:
URL: https://github.com/apache/beam/pull/31133#discussion_r1585844460
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -387,24 +393,53 @@ private StreamingDataflowWorker(
LOG.debug("WindmillServiceEndpoint: {}",
options.getWindmillServiceEndpoint());
LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort());
LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
- LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes);
+ LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes.get());
}
public static StreamingDataflowWorker
fromOptions(DataflowWorkerHarnessOptions options) {
long clientId = clientIdGenerator.nextLong();
MemoryMonitor memoryMonitor = MemoryMonitor.fromOptions(options);
- ConcurrentMap<String, ComputationState> computationMap = new
ConcurrentHashMap<>();
ConcurrentMap<String, StageInfo> stageInfo = new ConcurrentHashMap<>();
+ ConcurrentMap<String, String> stateNameMap = new ConcurrentHashMap<>();
StreamingCounters streamingCounters = StreamingCounters.create();
-
+ WorkUnitClient dataflowServiceClient = new DataflowWorkUnitClient(options,
LOG);
+ BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
+ AtomicInteger maxWorkItemCommitBytes = new
AtomicInteger(Integer.MAX_VALUE);
+ WindmillStateCache windmillStateCache =
+ WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb());
+ Function<String, ScheduledExecutorService> executorSupplier =
+ threadName ->
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat(threadName).build());
GrpcWindmillStreamFactory windmillStreamFactory =
createWindmillStreamFactory(options, clientId);
- WindmillServerStub windmillServer =
- createWindmillServerStub(
- options,
- windmillStreamFactory,
- new WorkHeartbeatResponseProcessor(
- computationId ->
Optional.ofNullable(computationMap.get(computationId))));
+ WindmillServerStub windmillServer = createWindmillServerStub(options,
windmillStreamFactory);
+ ComputationConfig.Fetcher configFetcher =
+ options.isEnableStreamingEngine()
+ ? StreamingEngineConfigFetcher.forTesting(
+ true,
+ options.getGlobalConfigRefreshPeriod().getMillis(),
+ dataflowServiceClient,
+ executorSupplier,
+ FIX_MULTI_OUTPUT_INFOS_ON_PAR_DO_INSTRUCTIONS,
+ config ->
+ onPipelineConfig(
+ config,
+ stateNameMap,
+ windmillServer::setWindmillServiceEndpoints,
+ maxWorkItemCommitBytes))
+ : new StreamingApplianceConfigFetcher(
+ windmillServer,
+ config -> consumeUserStepToStateFamilyName(config,
stateNameMap),
+ FIX_MULTI_OUTPUT_INFOS_ON_PAR_DO_INSTRUCTIONS);
+ ComputationStateCache computationStateCache =
+ ComputationStateCache.create(
+ configFetcher, workExecutor, windmillStateCache::forComputation);
+ if (windmillServer instanceof GrpcWindmillServer) {
Review Comment:
i actually realized that appliance does not refresh heartbeats which is what
that consumer of heartbeat responses is for.
we only call refresh active work on get data stream if streaming engine is
enabled.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]