This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 27a1c41  [FLINK-12822] Add explicit transformer from 
SerializableOptional to Optional
27a1c41 is described below

commit 27a1c415d39d07790c322fe12d7c4e8075477cea
Author: tison <wander4...@gmail.com>
AuthorDate: Thu Jun 13 10:39:05 2019 +0800

    [FLINK-12822] Add explicit transformer from SerializableOptional to Optional
    
    This closes #8724.
---
 .../main/java/org/apache/flink/types/SerializableOptional.java | 10 +++++++---
 .../apache/flink/runtime/resourcemanager/ResourceManager.java  |  2 +-
 2 files changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java 
b/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java
index 89dcea4..d620e97 100644
--- a/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java
+++ b/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java
@@ -59,14 +59,18 @@ public final class SerializableOptional<T extends 
Serializable> implements Seria
                }
        }
 
-       public <R> Optional<R> map(Function<? super T, ? extends R> mapper) {
+       public <R extends Serializable> SerializableOptional<R> map(Function<? 
super T, ? extends R> mapper) {
                if (value == null) {
-                       return Optional.empty();
+                       return empty();
                } else {
-                       return Optional.ofNullable(mapper.apply(value));
+                       return ofNullable(mapper.apply(value));
                }
        }
 
+       public Optional<T> toOptional() {
+               return Optional.ofNullable(value);
+       }
+
        public static <T extends Serializable> SerializableOptional<T> 
of(@Nonnull T value) {
                return new SerializableOptional<>(value);
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 03e1d87..441cbbd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -588,7 +588,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
 
                        final CompletableFuture<Optional<Tuple2<ResourceID, 
String>>> metricQueryServiceAddressFuture = taskExecutorGateway
                                .requestMetricQueryServiceAddress(timeout)
-                               .thenApply(optional -> optional.map(path -> 
Tuple2.of(tmResourceId, path)));
+                               .thenApply(o -> o.toOptional().map(address -> 
Tuple2.of(tmResourceId, address)));
 
                        
metricQueryServiceAddressFutures.add(metricQueryServiceAddressFuture);
                }

Reply via email to