m-trieu commented on code in PR #28537:
URL: https://github.com/apache/beam/pull/28537#discussion_r1333645495


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -301,69 +248,35 @@ public static void main(String[] args) throws Exception {
   private final Counter<Long, Long> timeAtMaxActiveThreads;
   private final Counter<Integer, Integer> 
windmillMaxObservedWorkItemCommitBytes;
   private final Counter<Integer, Integer> memoryThrashing;
-  private ScheduledExecutorService refreshWorkTimer;
-  private ScheduledExecutorService statusPageTimer;
-
   private final boolean publishCounters;
-  private ScheduledExecutorService globalWorkerUpdatesTimer;
-  private int retryLocallyDelayMs = 10000;
-
-  // Periodically fires a global config request to dataflow service. Only used 
when windmill service
-  // is enabled.
-  private ScheduledExecutorService globalConfigRefreshTimer;
-
   private final MemoryMonitor memoryMonitor;
   private final Thread memoryMonitorThread;
-
   private final WorkerStatusPages statusPages;
-  // Periodic sender of debug information to the debug capture service.
-  private DebugCapture.Manager debugCaptureManager = null;
-
   // Limit on bytes sinked (committed) in a work item.
   private final long maxSinkBytes; // = MAX_SINK_BYTES unless disabled in 
options.
-  // Possibly overridden by streaming engine config.
-  private int maxWorkItemCommitBytes = Integer.MAX_VALUE;
-
   private final EvictingQueue<String> pendingFailuresToReport =
-      EvictingQueue.<String>create(MAX_FAILURES_TO_REPORT_IN_UPDATE);
-
+      EvictingQueue.create(MAX_FAILURES_TO_REPORT_IN_UPDATE);
   private final ReaderCache readerCache;
-
   private final WorkUnitClient workUnitClient;
   private final CompletableFuture<Void> isDoneFuture;
   private final Function<MapTask, MutableNetwork<Node, Edge>> mapTaskToNetwork;
-
-  /**
-   * Sinks are marked 'full' in {@link StreamingModeExecutionContext} once the 
amount of data sinked
-   * (across all the sinks, if there are more than one) reaches this limit. 
This serves as hint for
-   * readers to stop producing more. This can be disabled with 
'disable_limiting_bundle_sink_bytes'
-   * experiment.
-   */
-  static final int MAX_SINK_BYTES = 10_000_000;
-
   private final ReaderRegistry readerRegistry = 
ReaderRegistry.defaultRegistry();
   private final SinkRegistry sinkRegistry = SinkRegistry.defaultRegistry();
-
-  private HotKeyLogger hotKeyLogger;
-
   private final Supplier<Instant> clock;
   private final Function<String, ScheduledExecutorService> executorSupplier;
-
-  public static StreamingDataflowWorker fromDataflowWorkerHarnessOptions(
-      DataflowWorkerHarnessOptions options) throws IOException {
-
-    return new StreamingDataflowWorker(
-        Collections.emptyList(),
-        IntrinsicMapTaskExecutorFactory.defaultFactory(),
-        new DataflowWorkUnitClient(options, LOG),
-        options.as(StreamingDataflowWorkerOptions.class),
-        true,
-        new HotKeyLogger(),
-        Instant::now,
-        (threadName) ->
-            Executors.newSingleThreadScheduledExecutor(
-                new ThreadFactoryBuilder().setNameFormat(threadName).build()));
-  }
+  private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory;
+  private final HotKeyLogger hotKeyLogger;
+  private ScheduledExecutorService refreshWorkTimer;
+  private ScheduledExecutorService statusPageTimer;
+  private ScheduledExecutorService globalWorkerUpdatesTimer;
+  private int retryLocallyDelayMs = 10000;
+  // Periodically fires a global config request to dataflow service. Only used 
when windmill service
+  // is enabled.
+  private ScheduledExecutorService globalConfigRefreshTimer;
+  // Periodic sender of debug information to the debug capture service.
+  private DebugCapture.Manager debugCaptureManager = null;

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to