akalash commented on a change in pull request #16556:
URL: https://github.com/apache/flink/pull/16556#discussion_r674015219
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
##########
@@ -238,6 +240,19 @@
private static final String SOLUTION_SET_OBJECTS = "itertive.ss.obj";
+ public static final ConfigOption<Integer> THROUGHPUT_CALCULATION_PERIOD =
+ ConfigOptions.key("io.throughput.calculationPeriod")
+ .intType()
+ .defaultValue(500)
+ .withDescription("The amount of time between calculation
the throughput(ms).");
+
+ public static final ConfigOption<Integer> THROUGHPUT_EMA_LAST_VALUES =
Review comment:
It is actually not clear where should be both these configurations.
THROUGHPUT_CALCULATION_PERIOD relates to streaming and perhaps it is better to
move it to StreamConfig but holding them in separate places is also not a good
idea.
So any ideas for the right place for these configs?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/throughput/EMAThroughputCalculator.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throughput;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Implementation of 'Exponential moving average' algorithm. */
+public class EMAThroughputCalculator implements AverageThroughputCalculator {
+ private SMAThroughputCalculator smaThroughputCalculator;
+ private long currentThroughput;
+
+ /** EMA algorithm specific constant which responsible for speed of
reaction. */
+ private final double alpha;
+
+ public EMAThroughputCalculator(int numberOfPreviousValues) {
+ smaThroughputCalculator = new
SMAThroughputCalculator(numberOfPreviousValues);
+ alpha = 2.0 / (numberOfPreviousValues + 1);
+ }
+
+ @Override
+ public long calculateThroughput(long dataSize, long time) {
+ checkArgument(dataSize >= 0, "Size of data should be non negative");
+ checkArgument(time >= 0, "Time should be non negative");
+
+ if (time == 0) {
+ return currentThroughput;
+ }
+
+ if (smaThroughputCalculator.isWarmedUp()) {
+ currentThroughput += alpha * (dataSize * MILLIS_IN_SECOND / time -
currentThroughput);
Review comment:
> dataSize * MILLIS_IN_SECOND
Maybe it is not so critical but can lead to overflow, so perhaps, it is
better to rewrite it.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -417,11 +424,18 @@ public Task(
.toArray(new IndexedInputGate[0]);
this.inputGates = new IndexedInputGate[gates.length];
+ this.throughputMeter =
+ new ThroughputMeter(
+ SystemClock.getInstance(),
+ new EMAThroughputCalculator(
+
taskConfiguration.get(THROUGHPUT_EMA_LAST_VALUES)));
Review comment:
One more sign of the problem with the configuration. I don't sure that
we should get this directly from taskConfiguration but I don't now another way
for now.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/throughput/SMAThroughputCalculator.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throughput;
+
+import static
org.apache.flink.runtime.throughput.AverageThroughputCalculator.instantThroughput;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Implementation of 'Simple moving average' algorithm. */
+public class SMAThroughputCalculator implements AverageThroughputCalculator {
+ /** The sum of the last throughputs.length throughputs. */
+ private long totalThroughput;
+ /** The buffer of the last calculated throughput. */
+ private long[] throughputs;
+ /** Current position in the throughputs. */
+ private int currentIndex = -1;
+ /** The number of last values which should be taken into account during
calculation. */
+ private int numberOfPreviousValues = 0;
+
+ /**
+ * @param numberOfPreviousValues How many previous values should be taken
into account during
+ * the calculation.
+ */
+ public SMAThroughputCalculator(int numberOfPreviousValues) {
+ checkState(numberOfPreviousValues > 0, "Number of previous values
should be positive.");
+ throughputs = new long[numberOfPreviousValues];
+ }
+
+ @Override
+ public long calculateThroughput(long dataSize, long time) {
+ checkArgument(dataSize >= 0, "Size of data should be non negative");
+ checkArgument(time >= 0, "Time should be non negative");
+
+ if (time == 0) {
+ return numberOfPreviousValues == 0 ? 0 : totalThroughput /
numberOfPreviousValues;
+ }
+
+ long newThroughput = instantThroughput(dataSize, time);
+
+ currentIndex = (currentIndex + 1) % throughputs.length;
+ totalThroughput -= throughputs[currentIndex];
+ totalThroughput += (throughputs[currentIndex] = newThroughput);
Review comment:
totalThroughput can be overflowed because it is the sum of the several
last values. So it is possible to rewrite it via the previous value:
`throughputPrevious + (currentInstantThrougput -
instantThroughputNValuesAgo)/N` where N is number of desirable last values
This approach is not applicable for the warm-up period, so the final
solution will be a little more difficult.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]