akalash commented on a change in pull request #16710:
URL: https://github.com/apache/flink/pull/16710#discussion_r682810375
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1760,39 +1760,31 @@ protected void cancelTask() {
*/
@Test
public void testThroughputSchedulerStartsOnInvoke() throws Exception {
- // given: Throughput meter which finishes the task when
calculateThroughput was called.
- UnAvailableTestInputProcessor inputProcessor = new
UnAvailableTestInputProcessor();
- AtomicInteger desirableNumberOfCalculation = new AtomicInteger(2);
- try (MockEnvironment mockEnvironment =
- new MockEnvironmentBuilder()
- .setTaskConfiguration(
- new Configuration()
- .set(BUFFER_DEBLOAT_PERIOD,
Duration.ofMillis(1)))
+ CompletableFuture<?> finishFuture = new CompletableFuture<>();
+ try (StreamTaskMailboxTestHarness<String> harness =
+ new
StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO)
+ .modifyStreamConfig(
+ config ->
+ config.getConfiguration()
+ .set(BUFFER_DEBLOAT_PERIOD,
Duration.ofMillis(1)))
+ .addInput(STRING_TYPE_INFO)
+ .setupOutputForSingletonOperatorChain(
+ new TestBoundedOneInputStreamOperator())
.setThroughputMeter(
Review comment:
Oh, I forgot to rename the setter to `setThroughputCalculator`. I will
need to not forgot to fix it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]