[
https://issues.apache.org/jira/browse/FLINK-13992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zili Chen closed FLINK-13992.
-----------------------------
Resolution: Fixed
> Refactor Optional parameter in InputGateWithMetrics#updateMetrics
> -----------------------------------------------------------------
>
> Key: FLINK-13992
> URL: https://issues.apache.org/jira/browse/FLINK-13992
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Coordination
> Affects Versions: 1.10.0
> Reporter: Zili Chen
> Assignee: Zili Chen
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.10.0
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
> As consensus from community code style discussion, in
> {{InputGateWithMetrics#updateMetrics}} we can refactor to reduce the usage of
> Optional parameter.
> cc [~azagrebin]
> {code:java}
> diff --git
> a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
>
> b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
> index 5d2cfd95c4..e548fbf02b 100644
> ---
> a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
> +++
> b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
> @@ -24,6 +24,8 @@ import
> org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
> import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
> import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
>
> +import javax.annotation.Nonnull;
> +
> import java.io.IOException;
> import java.util.Optional;
> import java.util.concurrent.CompletableFuture;
> @@ -67,12 +69,12 @@ public class InputGateWithMetrics extends InputGate {
>
> @Override
> public Optional<BufferOrEvent> getNext() throws IOException,
> InterruptedException {
> - return updateMetrics(inputGate.getNext());
> + return inputGate.getNext().map(this::updateMetrics);
> }
>
> @Override
> public Optional<BufferOrEvent> pollNext() throws IOException,
> InterruptedException {
> - return updateMetrics(inputGate.pollNext());
> + return inputGate.pollNext().map(this::updateMetrics);
> }
>
> @Override
> @@ -85,8 +87,8 @@ public class InputGateWithMetrics extends InputGate {
> inputGate.close();
> }
>
> - private Optional<BufferOrEvent> updateMetrics(Optional<BufferOrEvent>
> bufferOrEvent) {
> - bufferOrEvent.ifPresent(b -> numBytesIn.inc(b.getSize()));
> + private BufferOrEvent updateMetrics(@Nonnull BufferOrEvent
> bufferOrEvent) {
> + numBytesIn.inc(bufferOrEvent.getSize());
> return bufferOrEvent;
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)