pnowojski commented on a change in pull request #16556:
URL: https://github.com/apache/flink/pull/16556#discussion_r676542558



##########
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
##########
@@ -285,6 +286,11 @@ public TaskEventDispatcher getTaskEventDispatcher() {
         throw new UnsupportedOperationException(ERROR_MSG);
     }
 
+    @Override
+    public ThroughputMeter getThroughputMeter() {
+        return null;

Review comment:
       `null`? `throw new UnsupportedOperationException()`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/throughput/EMAThroughputCalculator.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throughput;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Implementation of 'Exponential moving average' algorithm. */
+public class EMAThroughputCalculator implements AverageThroughputCalculator {
+    private SMAThroughputCalculator smaThroughputCalculator;
+    private long currentThroughput;
+
+    /** EMA algorithm specific constant which responsible for speed of 
reaction. */
+    private final double alpha;
+
+    public EMAThroughputCalculator(int numberOfPreviousValues) {
+        smaThroughputCalculator = new 
SMAThroughputCalculator(numberOfPreviousValues);
+        alpha = 2.0 / (numberOfPreviousValues + 1);
+    }
+
+    @Override
+    public long calculateThroughput(long dataSize, long time) {
+        checkArgument(dataSize >= 0, "Size of data should be non negative");
+        checkArgument(time >= 0, "Time should be non negative");
+
+        if (time == 0) {
+            return currentThroughput;
+        }
+
+        if (smaThroughputCalculator.isWarmedUp()) {

Review comment:
       Why do we need `SMAThroughputCalculator`? Why are we sampling many 
throughputs to calculate `EMA`? Is it only in order to handle the "warm up" 
period? If so, why don't you just initialise first EMA's value with the first 
sample? 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
##########
@@ -238,6 +240,19 @@
 
     private static final String SOLUTION_SET_OBJECTS = "itertive.ss.obj";
 
+    public static final ConfigOption<Integer> THROUGHPUT_CALCULATION_PERIOD =
+            ConfigOptions.key("io.throughput.calculationPeriod")

Review comment:
       `calculation-period` instead of camel case 
(https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/throughput/ThroughputMeter.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throughput;
+
+import org.apache.flink.util.clock.Clock;
+
+/** Class for measuring the throughput based on incoming data size and busy + 
backpressure time. */

Review comment:
       I wouldn't mention backpressure/idle/busy here, it's I think not 
necessary on this level of abstraction. Instead of `idleStart()` or `idleEnd()` 
you could just have `pause/startMeasurement()` and that would make this class 
more generic.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
##########
@@ -427,6 +428,14 @@ public void setInPhysicalEdges(List<StreamEdge> inEdges) {
         }
     }
 
+    public void setThroughputCalculationPeriod(int 
throughputCalculationPeriod) {
+        config.set(THROUGHPUT_CALCULATION_PERIOD, throughputCalculationPeriod);
+    }
+
+    public int getThroughputCalculationPeriod() {
+        return config.get(THROUGHPUT_CALCULATION_PERIOD);
+    }

Review comment:
       Why do we need those changes?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/PeriodTimer.java
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+/** Interface for the measurement of the period of time. */
+public interface PeriodTimer {
+    void markStart();
+
+    void markEnd();
+}

Review comment:
       Extracting/introducing this could also have been another pure "refactor" 
commit

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/throughput/ThroughputMeter.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throughput;
+
+import org.apache.flink.util.clock.Clock;
+
+/** Class for measuring the throughput based on incoming data size and busy + 
backpressure time. */
+public class ThroughputMeter {
+    private static final long NOT_TRACKED = -1;
+    private final Clock clock;
+    private final AverageThroughputCalculator averageThroughputCalculator;
+
+    /** Accumulated data size for current period. */
+    private long currentSize;
+
+    /** Total busy time for current period(the total time minus idle time). */
+    private long currentTime;
+
+    /** The start of busy time. */
+    private long startTime;
+
+    /** The last throughput . */
+    private long effectiveThroughput;
+
+    public ThroughputMeter(Clock clock, AverageThroughputCalculator 
averageThroughputCalculator) {
+        this.clock = clock;
+        this.averageThroughputCalculator = averageThroughputCalculator;
+    }
+
+    /** @param size The size of received data. */
+    public void incomingDataSize(long size) {
+        currentSize += size;
+    }
+
+    /** Mark when idle time is started. */
+    public void idleStart() {
+        if (startTime != NOT_TRACKED) {
+            currentTime += clock.relativeTimeMillis() - startTime;
+        }
+        startTime = NOT_TRACKED;
+    }
+
+    /** Mark when idle time is ended. */
+    public void idleEnd() {
+        startTime = clock.relativeTimeMillis();
+    }
+
+    /**
+     * Calculate throughput based on the collected data for the last period.
+     *
+     * @return {@code -1} if it is impossible to calculate throughput and 
actual value otherwise.
+     */
+    public long calculateThroughput() {

Review comment:
       First call to this method will/might return very very low throughput, as 
`startTime == 0`. Initialise `startTime = clock.relativeTimeMillis()` in the 
constructor?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/throughput/EMAThroughputCalculator.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throughput;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Implementation of 'Exponential moving average' algorithm. */
+public class EMAThroughputCalculator implements AverageThroughputCalculator {
+    private SMAThroughputCalculator smaThroughputCalculator;

Review comment:
       `final`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/throughput/ThroughputMeter.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throughput;
+
+import org.apache.flink.util.clock.Clock;
+
+/** Class for measuring the throughput based on incoming data size and busy + 
backpressure time. */
+public class ThroughputMeter {

Review comment:
        you could have added `ThroughputMeter` (and tests for it?) as 
independent commit.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
##########
@@ -238,6 +240,19 @@
 
     private static final String SOLUTION_SET_OBJECTS = "itertive.ss.obj";
 
+    public static final ConfigOption<Integer> THROUGHPUT_CALCULATION_PERIOD =
+            ConfigOptions.key("io.throughput.calculationPeriod")
+                    .intType()
+                    .defaultValue(500)
+                    .withDescription("The amount of time between calculation 
the throughput(ms).");
+
+    public static final ConfigOption<Integer> THROUGHPUT_EMA_LAST_VALUES =

Review comment:
       `taskmanager.memory.network.in-flight-data-adjustment.period` +
   `taskmanager.memory.network.in-flight-data-adjustment.samples` +
   move to `TaskManagerOptions`?
   
   Exposing configuration also could have been another commit - it would be 
easier to pull in someone else to verify that the config names/placement make 
sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to