m-trieu commented on code in PR #28537:
URL: https://github.com/apache/beam/pull/28537#discussion_r1333647955


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -170,106 +191,35 @@ public class StreamingDataflowWorker {
   private static final Function<MapTask, MutableNetwork<Node, Edge>> 
mapTaskToBaseNetwork =
       new MapTaskToNetworkFunction(idGenerator);
 
-  private static Random clientIdGenerator = new Random();
-
-  // Maximum number of threads for processing.  Currently each thread 
processes one key at a time.
-  static final int MAX_PROCESSING_THREADS = 300;
-  static final long THREAD_EXPIRATION_TIME_SEC = 60;
-  static final long TARGET_COMMIT_BUNDLE_BYTES = 32 << 20;
-  static final int MAX_COMMIT_QUEUE_BYTES = 500 << 20; // 500MB
-  static final int NUM_COMMIT_STREAMS = 1;
-  static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3;
-  static final Duration COMMIT_STREAM_TIMEOUT = Duration.standardMinutes(1);
-
   private static final int DEFAULT_STATUS_PORT = 8081;
-
   // Maximum size of the result of a GetWork request.
   private static final long MAX_GET_WORK_FETCH_BYTES = 64L << 20; // 64m
-
   // Reserved ID for counter updates.
   // Matches kWindmillCounterUpdate in workflow_worker_service_multi_hubs.cc.
   private static final String WINDMILL_COUNTER_UPDATE_WORK_ID = "3";
-
   /** Maximum number of failure stacktraces to report in each update sent to 
backend. */
   private static final int MAX_FAILURES_TO_REPORT_IN_UPDATE = 1000;
 
-  // TODO(https://github.com/apache/beam/issues/19632): Update throttling 
counters to use generic
-  // throttling-msecs metric.
-  public static final MetricName BIGQUERY_STREAMING_INSERT_THROTTLE_TIME =
-      MetricName.named(
-          
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl",
-          "throttling-msecs");
-
   private static final Duration MAX_LOCAL_PROCESSING_RETRY_DURATION = 
Duration.standardMinutes(5);
-
-  /** Returns whether an exception was caused by a {@link OutOfMemoryError}. */
-  private static boolean isOutOfMemoryError(Throwable t) {
-    while (t != null) {
-      if (t instanceof OutOfMemoryError) {
-        return true;
-      }
-      t = t.getCause();
-    }
-    return false;
-  }
-
-  private static MapTask parseMapTask(String input) throws IOException {
-    return Transport.getJsonFactory().fromString(input, MapTask.class);
-  }
-
-  public static void main(String[] args) throws Exception {
-    JvmInitializers.runOnStartup();
-
-    
DataflowWorkerHarnessHelper.initializeLogging(StreamingDataflowWorker.class);
-    DataflowWorkerHarnessOptions options =
-        DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions(
-            StreamingDataflowWorker.class);
-    DataflowWorkerHarnessHelper.configureLogging(options);
-    checkArgument(
-        options.isStreaming(),
-        "%s instantiated with options indicating batch use",
-        StreamingDataflowWorker.class.getName());
-
-    checkArgument(
-        !DataflowRunner.hasExperiment(options, "beam_fn_api"),
-        "%s cannot be main() class with beam_fn_api enabled",
-        StreamingDataflowWorker.class.getSimpleName());
-
-    StreamingDataflowWorker worker =
-        StreamingDataflowWorker.fromDataflowWorkerHarnessOptions(options);
-
-    // Use the MetricsLogger container which is used by BigQueryIO to 
periodically log process-wide
-    // metrics.
-    MetricsEnvironment.setProcessWideContainer(new MetricsLogger(null));
-
-    JvmInitializers.runBeforeProcessing(options);
-    worker.startStatusPages();
-    worker.start();
-  }
-
+  private static final Random clientIdGenerator = new Random();
+  final WindmillStateCache stateCache;

Review Comment:
   I was thinking we can eventually have either 
StreamingApplianceDataflowWorker or StreamingEngineDataflowWorker (depending on 
the StreamingDataflowWorkerOptions).  StreamingDataflowWorker can instantiate 
either one depending on the passed in options, and we can have different tests 
for each.
   
   We can inject all the dependencies into the constructors and have some 
static factory class/methods to create the shapes we want (for test or not 
etc). 
   
   Future code changes won't ever touch the explicit code paths/classes for 
appliance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to