m-trieu commented on code in PR #28537:
URL: https://github.com/apache/beam/pull/28537#discussion_r1333643977
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -170,106 +191,35 @@ public class StreamingDataflowWorker {
private static final Function<MapTask, MutableNetwork<Node, Edge>>
mapTaskToBaseNetwork =
new MapTaskToNetworkFunction(idGenerator);
- private static Random clientIdGenerator = new Random();
-
- // Maximum number of threads for processing. Currently each thread
processes one key at a time.
- static final int MAX_PROCESSING_THREADS = 300;
- static final long THREAD_EXPIRATION_TIME_SEC = 60;
- static final long TARGET_COMMIT_BUNDLE_BYTES = 32 << 20;
- static final int MAX_COMMIT_QUEUE_BYTES = 500 << 20; // 500MB
- static final int NUM_COMMIT_STREAMS = 1;
- static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3;
- static final Duration COMMIT_STREAM_TIMEOUT = Duration.standardMinutes(1);
-
private static final int DEFAULT_STATUS_PORT = 8081;
-
// Maximum size of the result of a GetWork request.
private static final long MAX_GET_WORK_FETCH_BYTES = 64L << 20; // 64m
-
// Reserved ID for counter updates.
// Matches kWindmillCounterUpdate in workflow_worker_service_multi_hubs.cc.
private static final String WINDMILL_COUNTER_UPDATE_WORK_ID = "3";
-
/** Maximum number of failure stacktraces to report in each update sent to
backend. */
private static final int MAX_FAILURES_TO_REPORT_IN_UPDATE = 1000;
- // TODO(https://github.com/apache/beam/issues/19632): Update throttling
counters to use generic
- // throttling-msecs metric.
- public static final MetricName BIGQUERY_STREAMING_INSERT_THROTTLE_TIME =
- MetricName.named(
-
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl",
- "throttling-msecs");
-
private static final Duration MAX_LOCAL_PROCESSING_RETRY_DURATION =
Duration.standardMinutes(5);
-
- /** Returns whether an exception was caused by a {@link OutOfMemoryError}. */
- private static boolean isOutOfMemoryError(Throwable t) {
- while (t != null) {
- if (t instanceof OutOfMemoryError) {
- return true;
- }
- t = t.getCause();
- }
- return false;
- }
-
- private static MapTask parseMapTask(String input) throws IOException {
- return Transport.getJsonFactory().fromString(input, MapTask.class);
- }
-
- public static void main(String[] args) throws Exception {
- JvmInitializers.runOnStartup();
-
-
DataflowWorkerHarnessHelper.initializeLogging(StreamingDataflowWorker.class);
- DataflowWorkerHarnessOptions options =
- DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions(
- StreamingDataflowWorker.class);
- DataflowWorkerHarnessHelper.configureLogging(options);
- checkArgument(
- options.isStreaming(),
- "%s instantiated with options indicating batch use",
- StreamingDataflowWorker.class.getName());
-
- checkArgument(
- !DataflowRunner.hasExperiment(options, "beam_fn_api"),
- "%s cannot be main() class with beam_fn_api enabled",
- StreamingDataflowWorker.class.getSimpleName());
-
- StreamingDataflowWorker worker =
- StreamingDataflowWorker.fromDataflowWorkerHarnessOptions(options);
-
- // Use the MetricsLogger container which is used by BigQueryIO to
periodically log process-wide
- // metrics.
- MetricsEnvironment.setProcessWideContainer(new MetricsLogger(null));
-
- JvmInitializers.runBeforeProcessing(options);
- worker.startStatusPages();
- worker.start();
- }
-
+ private static final Random clientIdGenerator = new Random();
+ final WindmillStateCache stateCache;
Review Comment:
this is accessed in the StreamingDataflowWorkerTest.testMergeWindowsCaching,
i can mark it as `@VisibleForTesting`?
I plan to do some breakup/cleanup of StreamingDataflowWorker in a later CL.
If we opt to use dependency injection and inject the cache for tests, we can
make all of this private.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]