scwhittle commented on code in PR #31133:
URL: https://github.com/apache/beam/pull/31133#discussion_r1598094225
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -800,110 +805,39 @@ private DataflowWorkerHarnessOptions
createTestingPipelineOptions(String... args
}
private StreamingDataflowWorker makeWorker(
- List<ParallelInstruction> instructions,
- DataflowWorkerHarnessOptions options,
- boolean publishCounters,
- Supplier<Instant> clock,
- Function<String, ScheduledExecutorService> executorSupplier,
- int localRetryTimeoutMs) {
- return makeWorker(
- ImmutableMap.of(),
- instructions,
- options,
- publishCounters,
- clock,
- executorSupplier,
- localRetryTimeoutMs);
- }
-
- private StreamingDataflowWorker makeWorker(
- Map<String, String> stateNameMappings,
- List<ParallelInstruction> instructions,
- DataflowWorkerHarnessOptions options,
- boolean publishCounters,
- Supplier<Instant> clock,
- Function<String, ScheduledExecutorService> executorSupplier,
- int localRetryTimeoutMs) {
- ConcurrentMap<String, String> stateNameMap = new ConcurrentHashMap<>();
- stateNameMap.putAll(stateNameMappings);
- stateNameMap.putAll(ImmutableMap.of(DEFAULT_PARDO_USER_NAME,
DEFAULT_PARDO_STATE_FAMILY));
+ StreamingDataflowWorkerTestParams streamingDataflowWorkerTestParams) {
StreamingDataflowWorker worker =
StreamingDataflowWorker.forTesting(
- stateNameMap,
+ streamingDataflowWorkerTestParams.stateNameMappings(),
server,
- Collections.singletonList(defaultMapTask(instructions)),
+ Collections.singletonList(
+
defaultMapTask(streamingDataflowWorkerTestParams.instructions())),
IntrinsicMapTaskExecutorFactory.defaultFactory(),
mockWorkUnitClient,
- options,
- publishCounters,
+ streamingDataflowWorkerTestParams.options(),
+ streamingDataflowWorkerTestParams.publishCounters(),
hotKeyLogger,
- clock,
- executorSupplier,
- localRetryTimeoutMs);
+ streamingDataflowWorkerTestParams.clock(),
+ streamingDataflowWorkerTestParams.executorSupplier(),
+ streamingDataflowWorkerTestParams.localRetryTimeoutMs(),
+ streamingDataflowWorkerTestParams.maxWorkItemCommitBytes());
this.computationStateCache = worker.getComputationStateCache();
return worker;
}
- private StreamingDataflowWorker makeWorker(
- List<ParallelInstruction> instructions,
- DataflowWorkerHarnessOptions options,
- boolean publishCounters,
- Supplier<Instant> clock,
- Function<String, ScheduledExecutorService> executorSupplier) {
- return makeWorker(instructions, options, publishCounters, clock,
executorSupplier, -1);
- }
-
- private StreamingDataflowWorker makeWorker(
- List<ParallelInstruction> instructions,
- DataflowWorkerHarnessOptions options,
- boolean publishCounters) {
- return makeWorker(
- instructions,
- options,
- publishCounters,
- Instant::now,
- (threadName) -> Executors.newSingleThreadScheduledExecutor(),
- -1);
- }
-
- private StreamingDataflowWorker makeWorker(
- List<ParallelInstruction> instructions,
- DataflowWorkerHarnessOptions options,
- boolean publishCounters,
- Map<String, String> stateNameMap) {
- return makeWorker(
- stateNameMap,
- instructions,
- options,
- publishCounters,
- Instant::now,
- (threadName) -> Executors.newSingleThreadScheduledExecutor(),
- -1);
- }
-
- private StreamingDataflowWorker makeWorker(
- List<ParallelInstruction> instructions,
- DataflowWorkerHarnessOptions options,
- boolean publishCounters,
- int localRetryTimeoutMs) {
- return makeWorker(
- instructions,
- options,
- publishCounters,
- Instant::now,
- (threadName) -> Executors.newSingleThreadScheduledExecutor(),
- localRetryTimeoutMs);
- }
-
@Test
public void testBasicHarness() throws Exception {
List<ParallelInstruction> instructions =
Arrays.asList(
makeSourceInstruction(StringUtf8Coder.of()),
makeSinkInstruction(StringUtf8Coder.of(), 0));
-
StreamingDataflowWorker worker =
- makeWorker(instructions, createTestingPipelineOptions(), true /*
publishCounters */);
+ makeWorker(
+ defaultWorkerParams()
+ .setInstructions(instructions)
+ .publishCounters()
+ .setOptions(createTestingPipelineOptions())
Review Comment:
can setOptions(createTestingPipelineOptions()) since it's default
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -3203,12 +3145,21 @@ public void testExceptionInvalidatesCache() throws
Exception {
WindowingStrategy.globalDefault()),
makeSinkInstruction(StringUtf8Coder.of(), 1,
GlobalWindow.Coder.INSTANCE));
+ defaultWorkerParams()
Review Comment:
rm, not used
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java:
##########
@@ -218,32 +241,46 @@ public void testGetAllComputations() {
@Test
public void testTotalCurrentActiveGetWorkBudget() {
String computationId = "computationId";
+ String computationId2 = "computationId2";
MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
Map<String, String> userTransformToStateFamilyName =
ImmutableMap.of("userTransformName", "stateFamilyName");
ComputationConfig computationConfig =
ComputationConfig.create(mapTask, userTransformToStateFamilyName,
ImmutableMap.of());
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.of(computationConfig));
+
when(configFetcher.fetchConfig(eq(computationId2))).thenReturn(Optional.of(computationConfig));
+ Work work1 = createWork(1, 1);
+ Work work2 = createWork(2, 2);
+ Work work3 = createWork(3, 3);
+
+ // Activate 3 Work(s) for computationId
Optional<ComputationState> maybeComputationState =
computationStateCache.get(computationId);
assertTrue(maybeComputationState.isPresent());
-
ComputationState computationState = maybeComputationState.get();
ShardedKey shardedKey = ShardedKey.create(ByteString.EMPTY, 1);
- Work work1 = createWork(1, 1);
- Work work2 = createWork(2, 2);
- Work work3 = createWork(3, 3);
computationState.activateWork(shardedKey, work1);
computationState.activateWork(shardedKey, work2);
computationState.activateWork(shardedKey, work3);
+ // Activate 3 Work(s) for computationId2
+ Optional<ComputationState> maybeComputationState2 =
computationStateCache.get(computationId);
Review Comment:
nit: how about just activating work1 on computationState and work2, work3 on
comptuationState2. Then we're not deviating from normal usage of work
(shouldn't be activated on multiple/different comptuatoins) in case it causes
issues down the line.
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -3368,7 +3318,13 @@ public void testActiveWorkRefresh() throws Exception {
DataflowWorkerHarnessOptions options = createTestingPipelineOptions();
options.setActiveWorkRefreshPeriodMillis(100);
- StreamingDataflowWorker worker = makeWorker(instructions, options, true /*
publishCounters */);
+ StreamingDataflowWorker worker =
+ makeWorker(
+ defaultWorkerParams()
Review Comment:
does passing in "--activeWorkRefreshPeriodMillis=100" let you get rid of
options?
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -3391,7 +3347,13 @@ public void testActiveWorkFailure() throws Exception {
DataflowWorkerHarnessOptions options = createTestingPipelineOptions();
options.setActiveWorkRefreshPeriodMillis(100);
- StreamingDataflowWorker worker = makeWorker(instructions, options, true /*
publishCounters */);
+ StreamingDataflowWorker worker =
+ makeWorker(
+ defaultWorkerParams()
Review Comment:
ditto, here and below
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]