m-trieu commented on code in PR #28537:
URL: https://github.com/apache/beam/pull/28537#discussion_r1333650700


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -170,106 +191,35 @@ public class StreamingDataflowWorker {
   private static final Function<MapTask, MutableNetwork<Node, Edge>> 
mapTaskToBaseNetwork =
       new MapTaskToNetworkFunction(idGenerator);
 
-  private static Random clientIdGenerator = new Random();
-
-  // Maximum number of threads for processing.  Currently each thread 
processes one key at a time.
-  static final int MAX_PROCESSING_THREADS = 300;
-  static final long THREAD_EXPIRATION_TIME_SEC = 60;
-  static final long TARGET_COMMIT_BUNDLE_BYTES = 32 << 20;
-  static final int MAX_COMMIT_QUEUE_BYTES = 500 << 20; // 500MB
-  static final int NUM_COMMIT_STREAMS = 1;
-  static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3;
-  static final Duration COMMIT_STREAM_TIMEOUT = Duration.standardMinutes(1);
-
   private static final int DEFAULT_STATUS_PORT = 8081;
-
   // Maximum size of the result of a GetWork request.
   private static final long MAX_GET_WORK_FETCH_BYTES = 64L << 20; // 64m
-
   // Reserved ID for counter updates.
   // Matches kWindmillCounterUpdate in workflow_worker_service_multi_hubs.cc.
   private static final String WINDMILL_COUNTER_UPDATE_WORK_ID = "3";
-
   /** Maximum number of failure stacktraces to report in each update sent to 
backend. */
   private static final int MAX_FAILURES_TO_REPORT_IN_UPDATE = 1000;
 
-  // TODO(https://github.com/apache/beam/issues/19632): Update throttling 
counters to use generic
-  // throttling-msecs metric.
-  public static final MetricName BIGQUERY_STREAMING_INSERT_THROTTLE_TIME =
-      MetricName.named(
-          
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl",
-          "throttling-msecs");
-
   private static final Duration MAX_LOCAL_PROCESSING_RETRY_DURATION = 
Duration.standardMinutes(5);
-
-  /** Returns whether an exception was caused by a {@link OutOfMemoryError}. */
-  private static boolean isOutOfMemoryError(Throwable t) {
-    while (t != null) {
-      if (t instanceof OutOfMemoryError) {
-        return true;
-      }
-      t = t.getCause();
-    }
-    return false;
-  }
-
-  private static MapTask parseMapTask(String input) throws IOException {
-    return Transport.getJsonFactory().fromString(input, MapTask.class);
-  }
-
-  public static void main(String[] args) throws Exception {
-    JvmInitializers.runOnStartup();
-
-    
DataflowWorkerHarnessHelper.initializeLogging(StreamingDataflowWorker.class);
-    DataflowWorkerHarnessOptions options =
-        DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions(
-            StreamingDataflowWorker.class);
-    DataflowWorkerHarnessHelper.configureLogging(options);
-    checkArgument(
-        options.isStreaming(),
-        "%s instantiated with options indicating batch use",
-        StreamingDataflowWorker.class.getName());
-
-    checkArgument(
-        !DataflowRunner.hasExperiment(options, "beam_fn_api"),
-        "%s cannot be main() class with beam_fn_api enabled",
-        StreamingDataflowWorker.class.getSimpleName());
-
-    StreamingDataflowWorker worker =
-        StreamingDataflowWorker.fromDataflowWorkerHarnessOptions(options);
-
-    // Use the MetricsLogger container which is used by BigQueryIO to 
periodically log process-wide
-    // metrics.
-    MetricsEnvironment.setProcessWideContainer(new MetricsLogger(null));
-
-    JvmInitializers.runBeforeProcessing(options);
-    worker.startStatusPages();
-    worker.start();
-  }
-
+  private static final Random clientIdGenerator = new Random();
+  final WindmillStateCache stateCache;

Review Comment:
   Also for my own knowledge, why don't we use Guice or Dagger dependency 
injection (if we want to avoid the runtime overhead)? @scwhittle 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to