akalash commented on a change in pull request #16556:
URL: https://github.com/apache/flink/pull/16556#discussion_r674015219



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
##########
@@ -238,6 +240,19 @@
 
     private static final String SOLUTION_SET_OBJECTS = "itertive.ss.obj";
 
+    public static final ConfigOption<Integer> THROUGHPUT_CALCULATION_PERIOD =
+            ConfigOptions.key("io.throughput.calculationPeriod")
+                    .intType()
+                    .defaultValue(500)
+                    .withDescription("The amount of time between calculation 
the throughput(ms).");
+
+    public static final ConfigOption<Integer> THROUGHPUT_EMA_LAST_VALUES =

Review comment:
       It is actually not clear where should be both these configurations. 
THROUGHPUT_CALCULATION_PERIOD relates to streaming and perhaps it is better to 
move it to StreamConfig but holding them in separate places is also not a good 
idea. 
   So any ideas for the right place for these configs?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/throughput/EMAThroughputCalculator.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throughput;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Implementation of 'Exponential moving average' algorithm. */
+public class EMAThroughputCalculator implements AverageThroughputCalculator {
+    private SMAThroughputCalculator smaThroughputCalculator;
+    private long currentThroughput;
+
+    /** EMA algorithm specific constant which responsible for speed of 
reaction. */
+    private final double alpha;
+
+    public EMAThroughputCalculator(int numberOfPreviousValues) {
+        smaThroughputCalculator = new 
SMAThroughputCalculator(numberOfPreviousValues);
+        alpha = 2.0 / (numberOfPreviousValues + 1);
+    }
+
+    @Override
+    public long calculateThroughput(long dataSize, long time) {
+        checkArgument(dataSize >= 0, "Size of data should be non negative");
+        checkArgument(time >= 0, "Time should be non negative");
+
+        if (time == 0) {
+            return currentThroughput;
+        }
+
+        if (smaThroughputCalculator.isWarmedUp()) {
+            currentThroughput += alpha * (dataSize * MILLIS_IN_SECOND / time - 
currentThroughput);

Review comment:
       > dataSize * MILLIS_IN_SECOND
   
   Maybe it is not so critical but can lead to overflow, so perhaps, it is 
better to rewrite it.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -417,11 +424,18 @@ public Task(
                         .toArray(new IndexedInputGate[0]);
 
         this.inputGates = new IndexedInputGate[gates.length];
+        this.throughputMeter =
+                new ThroughputMeter(
+                        SystemClock.getInstance(),
+                        new EMAThroughputCalculator(
+                                
taskConfiguration.get(THROUGHPUT_EMA_LAST_VALUES)));

Review comment:
       One more sign of the problem with the configuration. I don't sure that 
we should get this directly from taskConfiguration but I don't now another way 
for now.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/throughput/SMAThroughputCalculator.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throughput;
+
+import static 
org.apache.flink.runtime.throughput.AverageThroughputCalculator.instantThroughput;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Implementation of 'Simple moving average' algorithm. */
+public class SMAThroughputCalculator implements AverageThroughputCalculator {
+    /** The sum of the last throughputs.length throughputs. */
+    private long totalThroughput;
+    /** The buffer of the last calculated throughput. */
+    private long[] throughputs;
+    /** Current position in the throughputs. */
+    private int currentIndex = -1;
+    /** The number of last values which should be taken into account during 
calculation. */
+    private int numberOfPreviousValues = 0;
+
+    /**
+     * @param numberOfPreviousValues How many previous values should be taken 
into account during
+     *     the calculation.
+     */
+    public SMAThroughputCalculator(int numberOfPreviousValues) {
+        checkState(numberOfPreviousValues > 0, "Number of previous values 
should be positive.");
+        throughputs = new long[numberOfPreviousValues];
+    }
+
+    @Override
+    public long calculateThroughput(long dataSize, long time) {
+        checkArgument(dataSize >= 0, "Size of data should be non negative");
+        checkArgument(time >= 0, "Time should be non negative");
+
+        if (time == 0) {
+            return numberOfPreviousValues == 0 ? 0 : totalThroughput / 
numberOfPreviousValues;
+        }
+
+        long newThroughput = instantThroughput(dataSize, time);
+
+        currentIndex = (currentIndex + 1) % throughputs.length;
+        totalThroughput -= throughputs[currentIndex];
+        totalThroughput += (throughputs[currentIndex] = newThroughput);

Review comment:
       totalThroughput can be overflowed because it is the sum of the several 
last values. So it is possible to rewrite it via the previous value:
   `throughputPrevious + (currentInstantThrougput - 
instantThroughputNValuesAgo)/N` where N is number of desirable last values
   This approach is not applicable for the warm-up period, so the final 
solution will be a little more difficult.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to