m-trieu commented on code in PR #31133:
URL: https://github.com/apache/beam/pull/31133#discussion_r1598852745
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -800,110 +805,39 @@ private DataflowWorkerHarnessOptions
createTestingPipelineOptions(String... args
}
private StreamingDataflowWorker makeWorker(
- List<ParallelInstruction> instructions,
- DataflowWorkerHarnessOptions options,
- boolean publishCounters,
- Supplier<Instant> clock,
- Function<String, ScheduledExecutorService> executorSupplier,
- int localRetryTimeoutMs) {
- return makeWorker(
- ImmutableMap.of(),
- instructions,
- options,
- publishCounters,
- clock,
- executorSupplier,
- localRetryTimeoutMs);
- }
-
- private StreamingDataflowWorker makeWorker(
- Map<String, String> stateNameMappings,
- List<ParallelInstruction> instructions,
- DataflowWorkerHarnessOptions options,
- boolean publishCounters,
- Supplier<Instant> clock,
- Function<String, ScheduledExecutorService> executorSupplier,
- int localRetryTimeoutMs) {
- ConcurrentMap<String, String> stateNameMap = new ConcurrentHashMap<>();
- stateNameMap.putAll(stateNameMappings);
- stateNameMap.putAll(ImmutableMap.of(DEFAULT_PARDO_USER_NAME,
DEFAULT_PARDO_STATE_FAMILY));
+ StreamingDataflowWorkerTestParams streamingDataflowWorkerTestParams) {
StreamingDataflowWorker worker =
StreamingDataflowWorker.forTesting(
- stateNameMap,
+ streamingDataflowWorkerTestParams.stateNameMappings(),
server,
- Collections.singletonList(defaultMapTask(instructions)),
+ Collections.singletonList(
+
defaultMapTask(streamingDataflowWorkerTestParams.instructions())),
IntrinsicMapTaskExecutorFactory.defaultFactory(),
mockWorkUnitClient,
- options,
- publishCounters,
+ streamingDataflowWorkerTestParams.options(),
+ streamingDataflowWorkerTestParams.publishCounters(),
hotKeyLogger,
- clock,
- executorSupplier,
- localRetryTimeoutMs);
+ streamingDataflowWorkerTestParams.clock(),
+ streamingDataflowWorkerTestParams.executorSupplier(),
+ streamingDataflowWorkerTestParams.localRetryTimeoutMs(),
+ streamingDataflowWorkerTestParams.maxWorkItemCommitBytes());
this.computationStateCache = worker.getComputationStateCache();
return worker;
}
- private StreamingDataflowWorker makeWorker(
- List<ParallelInstruction> instructions,
- DataflowWorkerHarnessOptions options,
- boolean publishCounters,
- Supplier<Instant> clock,
- Function<String, ScheduledExecutorService> executorSupplier) {
- return makeWorker(instructions, options, publishCounters, clock,
executorSupplier, -1);
- }
-
- private StreamingDataflowWorker makeWorker(
- List<ParallelInstruction> instructions,
- DataflowWorkerHarnessOptions options,
- boolean publishCounters) {
- return makeWorker(
- instructions,
- options,
- publishCounters,
- Instant::now,
- (threadName) -> Executors.newSingleThreadScheduledExecutor(),
- -1);
- }
-
- private StreamingDataflowWorker makeWorker(
- List<ParallelInstruction> instructions,
- DataflowWorkerHarnessOptions options,
- boolean publishCounters,
- Map<String, String> stateNameMap) {
- return makeWorker(
- stateNameMap,
- instructions,
- options,
- publishCounters,
- Instant::now,
- (threadName) -> Executors.newSingleThreadScheduledExecutor(),
- -1);
- }
-
- private StreamingDataflowWorker makeWorker(
- List<ParallelInstruction> instructions,
- DataflowWorkerHarnessOptions options,
- boolean publishCounters,
- int localRetryTimeoutMs) {
- return makeWorker(
- instructions,
- options,
- publishCounters,
- Instant::now,
- (threadName) -> Executors.newSingleThreadScheduledExecutor(),
- localRetryTimeoutMs);
- }
-
@Test
public void testBasicHarness() throws Exception {
List<ParallelInstruction> instructions =
Arrays.asList(
makeSourceInstruction(StringUtf8Coder.of()),
makeSinkInstruction(StringUtf8Coder.of(), 0));
-
StreamingDataflowWorker worker =
- makeWorker(instructions, createTestingPipelineOptions(), true /*
publishCounters */);
+ makeWorker(
+ defaultWorkerParams()
+ .setInstructions(instructions)
+ .publishCounters()
+ .setOptions(createTestingPipelineOptions())
Review Comment:
removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]