pnowojski commented on a change in pull request #16556:
URL: https://github.com/apache/flink/pull/16556#discussion_r678020435



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/throughput/ThroughputCalculator.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throughput;
+
+import org.apache.flink.util.clock.Clock;
+
+/** Class for measuring the throughput based on incoming data size and 
measurement period. */
+public class ThroughputCalculator {
+    private static final long NOT_TRACKED = -1;
+    private final Clock clock;
+    private final ThroughputEMA throughputEMA;
+
+    /** Accumulated data size for current period. */
+    private long currentSize;
+
+    /** Total measurement time for current period. */
+    private long currentTime;
+
+    /** The start of measurement time. */
+    private long startTime = NOT_TRACKED;
+
+    /** The last throughput . */

Review comment:
       nit: noop comment?

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
##########
@@ -512,6 +512,21 @@
                                     + " the configured min/max size, the 
min/max size will be used. The exact size of Network Memory can be"
                                     + " explicitly specified by setting the 
min/max size to the same value.");
 
+    /** The period between recalculation amount of the in-flight data. */
+    public static final ConfigOption<Integer> IN_FLIGHT_DATA_ADJUSTMENT_PERIOD 
=
+            
ConfigOptions.key("taskmanager.memory.network.in-flight-data-adjustment.period")
+                    .intType()
+                    .defaultValue(500)
+                    .withDescription("The amount of time between calculation 
the throughput(ms).");

Review comment:
       adjust the description?

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1727,6 +1731,99 @@ protected void cancelTask() {
         assertTrue(OpenFailingOperator.wasClosed);
     }
 
+    @Test
+    public void testMailboxNotifyThroughputMeterAboutIdleTime() throws 
Exception {
+        final long sleepTimeOutsideMail = 42;
+        final long sleepTimeInsideMail = 44;
+        int incomingDataSize = 10_000;
+
+        @Nullable WaitingThread waitingThread = null;
+        try (final MockEnvironment environment = setupEnvironment(true, true)) 
{
+            final UnAvailableTestInputProcessor inputProcessor =
+                    new UnAvailableTestInputProcessor();
+            final StreamTask task =
+                    new MockStreamTaskBuilder(environment)
+                            .setStreamInputProcessor(inputProcessor)
+                            .build();
+            TaskIOMetricGroup ioMetricGroup =
+                    task.getEnvironment().getMetricGroup().getIOMetricGroup();
+            ThroughputCalculator throughputCalculator = 
environment.getThroughputMeter();
+
+            final MailboxExecutor executor = 
task.mailboxProcessor.getMainMailboxExecutor();
+            final RunnableWithException completeFutureTask =
+                    () ->
+                            inputProcessor
+                                    .availabilityProvider
+                                    .getUnavailableToResetAvailable()
+                                    .complete(null);
+
+            waitingThread =
+                    new WaitingThread(
+                            executor,
+                            completeFutureTask,
+                            sleepTimeInsideMail,
+                            sleepTimeOutsideMail,
+                            ioMetricGroup.getIdleTimeMsPerSecond());
+            // Make sure WaitingThread is started after Task starts processing.
+            executor.submit(
+                    waitingThread::start,
+                    "Start WaitingThread after Task starts processing input.");
+
+            long startTs = System.currentTimeMillis();
+            throughputCalculator.calculateThroughput();
+            throughputCalculator.incomingDataSize(incomingDataSize);
+            task.invoke();
+            long resultThroughput = throughputCalculator.calculateThroughput();
+            long totalDuration = System.currentTimeMillis() - startTs;
+
+            assertThat(
+                    resultThroughput,
+                    greaterThanOrEqualTo(
+                            incomingDataSize * 1000 / (totalDuration - 
sleepTimeOutsideMail)));
+        } finally {
+            if (waitingThread != null) {
+                waitingThread.join();
+            }
+        }
+    }
+
+    @Test
+    public void testThroughputSchedulerStartsOnInvoke() throws Exception {
+        // given: Throughput meter which finishes the task when 
calculateThroughput was called.
+        UnAvailableTestInputProcessor inputProcessor = new 
UnAvailableTestInputProcessor();
+        AtomicInteger desirableNumberOfCalculation = new AtomicInteger(2);
+        try (MockEnvironment mockEnvironment =
+                new MockEnvironmentBuilder()
+                        .setTaskConfiguration(
+                                new 
Configuration().set(IN_FLIGHT_DATA_ADJUSTMENT_PERIOD, 1))
+                        .setThroughputMeter(
+                                new 
ThroughputCalculator(SystemClock.getInstance(), 10) {
+                                    @Override
+                                    public long calculateThroughput() {
+                                        if 
(desirableNumberOfCalculation.decrementAndGet() == 0) {
+                                            inputProcessor
+                                                    .availabilityProvider
+                                                    
.getUnavailableToResetAvailable()
+                                                    .complete(null);
+                                        }
+                                        return super.calculateThroughput();
+                                    }
+                                })
+                        .build()) {
+
+            StreamTask task =
+                    new MockStreamTaskBuilder(mockEnvironment)
+                            .setStreamInputProcessor(inputProcessor)
+                            .build();
+
+            // when: Starting the task.
+            task.invoke();
+
+            // then: The task successfully finishes because throughput 
calculation was called from
+            // scheduler which leads to the finish of the task
+        }
+    }

Review comment:
       What is the purpose of this test? Isn't it superseded by/subset of the 
`testMailboxNotifyThroughputMeterAboutIdleTime`?

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
##########
@@ -512,6 +512,21 @@
                                     + " the configured min/max size, the 
min/max size will be used. The exact size of Network Memory can be"
                                     + " explicitly specified by setting the 
min/max size to the same value.");
 
+    /** The period between recalculation amount of the in-flight data. */
+    public static final ConfigOption<Integer> IN_FLIGHT_DATA_ADJUSTMENT_PERIOD 
=
+            
ConfigOptions.key("taskmanager.memory.network.in-flight-data-adjustment.period")

Review comment:
       as per offline discussion: 
   ```
   taskmanager.network.memory.automatic-buffer-adjustment.period
   taskmanager.network.memory.automatic-buffer-adjustment.samples
   taskmanager.network.memory.automatic-buffer-adjustment.enabled #(future PR)
   ```?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/throughput/ThroughputCalculator.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throughput;
+
+import org.apache.flink.util.clock.Clock;
+
+/** Class for measuring the throughput based on incoming data size and 
measurement period. */
+public class ThroughputCalculator {
+    private static final long NOT_TRACKED = -1;
+    private final Clock clock;
+    private final ThroughputEMA throughputEMA;
+
+    /** Accumulated data size for current period. */
+    private long currentSize;
+
+    /** Total measurement time for current period. */
+    private long currentTime;
+
+    /** The start of measurement time. */
+    private long startTime = NOT_TRACKED;
+
+    /** The last throughput . */
+    private long lastThroughput;
+
+    public ThroughputCalculator(Clock clock, int numberOfSamples) {
+        this.clock = clock;
+        this.throughputEMA = new ThroughputEMA(numberOfSamples);
+    }
+
+    /** @param size The size of received data. */

Review comment:
       nit: rename `size` -> `receivedDataSize` and drop the comment? 
   
   (and ditto in other places like `currentSize` -> 
`currentAccumulatedDataSize`, `currentTime` -> `currentMeasurementTime`, 
`startTime` -> `measurementStartTime`)?

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1727,6 +1731,99 @@ protected void cancelTask() {
         assertTrue(OpenFailingOperator.wasClosed);
     }
 
+    @Test
+    public void testMailboxNotifyThroughputMeterAboutIdleTime() throws 
Exception {
+        final long sleepTimeOutsideMail = 42;
+        final long sleepTimeInsideMail = 44;
+        int incomingDataSize = 10_000;
+
+        @Nullable WaitingThread waitingThread = null;
+        try (final MockEnvironment environment = setupEnvironment(true, true)) 
{
+            final UnAvailableTestInputProcessor inputProcessor =
+                    new UnAvailableTestInputProcessor();
+            final StreamTask task =
+                    new MockStreamTaskBuilder(environment)
+                            .setStreamInputProcessor(inputProcessor)
+                            .build();
+            TaskIOMetricGroup ioMetricGroup =
+                    task.getEnvironment().getMetricGroup().getIOMetricGroup();
+            ThroughputCalculator throughputCalculator = 
environment.getThroughputMeter();
+
+            final MailboxExecutor executor = 
task.mailboxProcessor.getMainMailboxExecutor();
+            final RunnableWithException completeFutureTask =
+                    () ->
+                            inputProcessor
+                                    .availabilityProvider
+                                    .getUnavailableToResetAvailable()
+                                    .complete(null);
+
+            waitingThread =
+                    new WaitingThread(
+                            executor,
+                            completeFutureTask,
+                            sleepTimeInsideMail,
+                            sleepTimeOutsideMail,
+                            ioMetricGroup.getIdleTimeMsPerSecond());
+            // Make sure WaitingThread is started after Task starts processing.
+            executor.submit(
+                    waitingThread::start,
+                    "Start WaitingThread after Task starts processing input.");
+
+            long startTs = System.currentTimeMillis();
+            throughputCalculator.calculateThroughput();
+            throughputCalculator.incomingDataSize(incomingDataSize);
+            task.invoke();
+            long resultThroughput = throughputCalculator.calculateThroughput();
+            long totalDuration = System.currentTimeMillis() - startTs;
+
+            assertThat(
+                    resultThroughput,
+                    greaterThanOrEqualTo(
+                            incomingDataSize * 1000 / (totalDuration - 
sleepTimeOutsideMail)));

Review comment:
       This test duplicates quite a lot of code from 
`testProcessWithUnAvailableInput`. I would either:
   1. put those highlighted lines inside `testProcessWithUnAvailableInput`
   2. or deduplicate in another way the test setup 
   
   I'm not sure how 2. could be done easily, so I would go with 1?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to