This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d8aa9b902eadc74bd3ce3d6f6ef5f0a73bc7331f
Author: tison <[email protected]>
AuthorDate: Fri Nov 29 10:47:31 2019 +0800

    [FLINK-14762][client] Implement JobClient#getAccumulators
---
 .../deployment/ClusterClientJobClientAdapter.java  |  6 +++++
 .../apache/flink/client/program/ClusterClient.java |  5 ++--
 .../flink/client/program/MiniClusterClient.java    | 18 +++++---------
 .../client/program/rest/RestClusterClient.java     | 24 ++++++++-----------
 .../flink/client/program/TestingClusterClient.java |  3 +--
 .../client/program/rest/RestClusterClientTest.java | 19 ++++++---------
 .../api/common/accumulators/AccumulatorHelper.java | 28 ++++++++++++++++++++--
 .../org/apache/flink/core/execution/JobClient.java |  7 ++++++
 .../apache/flink/api/java/TestingJobClient.java    |  6 +++++
 .../streaming/environment/TestingJobClient.java    |  6 +++++
 .../test/accumulators/AccumulatorLiveITCase.java   |  2 +-
 .../utils/SavepointMigrationTestBase.java          | 19 ++++++---------
 12 files changed, 85 insertions(+), 58 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
index f31b9b7..91a9c91 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
@@ -31,6 +31,7 @@ import org.apache.flink.util.function.FunctionUtils;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -74,6 +75,11 @@ public class ClusterClientJobClientAdapter<ClusterID> 
implements JobClient {
        }
 
        @Override
+       public CompletableFuture<Map<String, Object>> 
getAccumulators(ClassLoader classLoader) {
+               return clusterClient.getAccumulators(jobID, classLoader);
+       }
+
+       @Override
        public CompletableFuture<JobExecutionResult> 
getJobExecutionResult(final ClassLoader userClassloader) {
                checkNotNull(userClassloader);
 
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 8208465..eb19109 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.OptionalFailure;
 
 import javax.annotation.Nullable;
 
@@ -114,7 +113,7 @@ public interface ClusterClient<T> extends AutoCloseable {
         * @param jobID The job identifier of a job.
         * @return A Map containing the accumulator's name and its value.
         */
-       default CompletableFuture<Map<String, OptionalFailure<Object>>> 
getAccumulators(JobID jobID) {
+       default CompletableFuture<Map<String, Object>> getAccumulators(JobID 
jobID) {
                return getAccumulators(jobID, 
ClassLoader.getSystemClassLoader());
        }
 
@@ -125,7 +124,7 @@ public interface ClusterClient<T> extends AutoCloseable {
         * @param loader The class loader for deserializing the accumulator 
results.
         * @return A Map containing the accumulator's name and its value.
         */
-       CompletableFuture<Map<String, OptionalFailure<Object>>> 
getAccumulators(JobID jobID, ClassLoader loader);
+       CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, 
ClassLoader loader);
 
        /**
         * Cancels a job identified by the job id.
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index 1f32c0a..7f47552 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -20,6 +20,7 @@ package org.apache.flink.client.program;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -29,8 +30,6 @@ import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.OptionalFailure;
-import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,7 +38,6 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -106,20 +104,16 @@ public class MiniClusterClient implements 
ClusterClient<MiniClusterClient.MiniCl
        }
 
        @Override
-       public CompletableFuture<Map<String, OptionalFailure<Object>>> 
getAccumulators(JobID jobID, ClassLoader loader) {
+       public CompletableFuture<Map<String, Object>> getAccumulators(JobID 
jobID, ClassLoader loader) {
                return miniCluster
                        .getExecutionGraph(jobID)
                        
.thenApply(AccessExecutionGraph::getAccumulatorsSerialized)
                        .thenApply(accumulators -> {
-                               Map<String, OptionalFailure<Object>> result = 
new HashMap<>(accumulators.size());
-                               for (Map.Entry<String, 
SerializedValue<OptionalFailure<Object>>> acc : accumulators.entrySet()) {
-                                       try {
-                                               result.put(acc.getKey(), 
acc.getValue().deserializeValue(loader));
-                                       } catch (Exception e) {
-                                               throw new 
CompletionException("Cannot deserialize accumulators.", e);
-                                       }
+                               try {
+                                       return 
AccumulatorHelper.deserializeAndUnwrapAccumulators(accumulators, loader);
+                               } catch (Exception e) {
+                                       throw new CompletionException("Cannot 
deserialize and unwrap accumulators properly.", e);
                                }
-                               return result;
                        });
        }
 
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index bacbe15..607c320 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -88,7 +88,6 @@ import 
org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.function.CheckedSupplier;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
@@ -405,7 +404,7 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
        }
 
        @Override
-       public CompletableFuture<Map<String, OptionalFailure<Object>>> 
getAccumulators(JobID jobID, ClassLoader loader) {
+       public CompletableFuture<Map<String, Object>> getAccumulators(JobID 
jobID, ClassLoader loader) {
                final JobAccumulatorsHeaders accumulatorsHeaders = 
JobAccumulatorsHeaders.getInstance();
                final JobAccumulatorsMessageParameters accMsgParams = 
accumulatorsHeaders.getUnresolvedMessageParameters();
                accMsgParams.jobPathParameter.resolve(jobID);
@@ -415,18 +414,15 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
                        accumulatorsHeaders,
                        accMsgParams);
 
-               return responseFuture.thenApply((JobAccumulatorsInfo 
accumulatorsInfo) -> {
-                       try {
-                               return 
AccumulatorHelper.deserializeAccumulators(
-                                       
accumulatorsInfo.getSerializedUserAccumulators(),
-                                       loader);
-                       } catch (Exception e) {
-                               throw new CompletionException(
-                                       new FlinkException(
-                                               String.format("Deserialization 
of accumulators for job %s failed.", jobID),
-                                               e));
-                       }
-               });
+               return responseFuture
+                       
.thenApply(JobAccumulatorsInfo::getSerializedUserAccumulators)
+                       .thenApply(accumulators -> {
+                               try {
+                                       return 
AccumulatorHelper.deserializeAndUnwrapAccumulators(accumulators, loader);
+                               } catch (Exception e) {
+                                       throw new CompletionException("Cannot 
deserialize and unwrap accumulators properly.", e);
+                               }
+                       });
        }
 
        private CompletableFuture<SavepointInfo> pollSavepointAsync(
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
index a711c55..f3be219 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.function.TriFunction;
 
 import javax.annotation.Nonnull;
@@ -109,7 +108,7 @@ public class TestingClusterClient<T> implements 
ClusterClient<T> {
        }
 
        @Override
-       public CompletableFuture<Map<String, OptionalFailure<Object>>> 
getAccumulators(JobID jobID, ClassLoader loader) {
+       public CompletableFuture<Map<String, Object>> getAccumulators(JobID 
jobID, ClassLoader loader) {
                throw new UnsupportedOperationException();
        }
 
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 16599c5..d3a381c 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -131,6 +131,7 @@ import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -512,21 +513,15 @@ public class RestClusterClientTest extends TestLogger {
                TestAccumulatorHandler accumulatorHandler = new 
TestAccumulatorHandler();
 
                try (TestRestServerEndpoint restServerEndpoint = 
createRestServerEndpoint(accumulatorHandler)){
-                       RestClusterClient<?> restClusterClient = 
createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
 
-                       try {
+                       try (RestClusterClient<?> restClusterClient = 
createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
                                JobID id = new JobID();
+                               Map<String, Object> accumulators = 
restClusterClient.getAccumulators(id).get();
+                               assertNotNull(accumulators);
+                               assertEquals(1, accumulators.size());
 
-                               {
-                                       Map<String, OptionalFailure<Object>> 
accumulators = restClusterClient.getAccumulators(id).get();
-                                       assertNotNull(accumulators);
-                                       assertEquals(1, accumulators.size());
-
-                                       assertEquals(true, 
accumulators.containsKey("testKey"));
-                                       assertEquals("testValue", 
accumulators.get("testKey").get().toString());
-                               }
-                       } finally {
-                               restClusterClient.close();
+                               assertTrue(accumulators.containsKey("testKey"));
+                               assertEquals("testValue", 
accumulators.get("testKey").toString());
                        }
                }
        }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
index 9bc1299..b6d3971 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
@@ -178,8 +178,6 @@ public class AccumulatorHelper {
         * @param serializedAccumulators The serialized accumulator results.
         * @param loader The class loader to use.
         * @return The deserialized accumulator results.
-        * @throws IOException
-        * @throws ClassNotFoundException
         */
        public static Map<String, OptionalFailure<Object>> 
deserializeAccumulators(
                        Map<String, SerializedValue<OptionalFailure<Object>>> 
serializedAccumulators,
@@ -203,4 +201,30 @@ public class AccumulatorHelper {
 
                return accumulators;
        }
+
+       /**
+        * Takes the serialized accumulator results and tries to deserialize 
them using the provided
+        * class loader, and then try to unwrap the value unchecked.
+        * @param serializedAccumulators The serialized accumulator results.
+        * @param loader The class loader to use.
+        * @return The deserialized and unwrapped accumulator results.
+        */
+       public static Map<String, Object> deserializeAndUnwrapAccumulators(
+               Map<String, SerializedValue<OptionalFailure<Object>>> 
serializedAccumulators,
+               ClassLoader loader) throws IOException, ClassNotFoundException {
+
+               Map<String, OptionalFailure<Object>> deserializedAccumulators = 
deserializeAccumulators(serializedAccumulators, loader);
+
+               if (deserializedAccumulators.isEmpty()) {
+                       return Collections.emptyMap();
+               }
+
+               Map<String, Object> accumulators = new 
HashMap<>(serializedAccumulators.size());
+
+               for (Map.Entry<String, OptionalFailure<Object>> entry : 
deserializedAccumulators.entrySet()) {
+                       accumulators.put(entry.getKey(), 
entry.getValue().getUnchecked());
+               }
+
+               return accumulators;
+       }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java 
b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
index d514bc4..5bec503 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID;
 
 import javax.annotation.Nullable;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -65,6 +66,12 @@ public interface JobClient extends AutoCloseable {
        CompletableFuture<String> triggerSavepoint(@Nullable String 
savepointDirectory);
 
        /**
+        * Requests the accumulators of the associated job. Accumulators can be 
requested while it is running
+        * or after it has finished. The class loader is used to deserialize 
the incoming accumulator results.
+        */
+       CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader 
classLoader);
+
+       /**
         * Returns the {@link JobExecutionResult result of the job execution} 
of the submitted job.
         *
         * @param userClassloader the classloader used to de-serialize the 
accumulators of the job.
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java 
b/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java
index d1725a5..b5c9873 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.execution.JobClient;
 import javax.annotation.Nullable;
 
 import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -57,4 +58,9 @@ public class TestingJobClient implements JobClient {
                return CompletableFuture.completedFuture("null");
        }
 
+       @Override
+       public CompletableFuture<Map<String, Object>> 
getAccumulators(ClassLoader classLoader) {
+               return 
CompletableFuture.completedFuture(Collections.emptyMap());
+       }
+
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java
index ede0e19..bcc02ee 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.execution.JobClient;
 import javax.annotation.Nullable;
 
 import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -57,4 +58,9 @@ public class TestingJobClient implements JobClient {
                return CompletableFuture.completedFuture("null");
        }
 
+       @Override
+       public CompletableFuture<Map<String, Object>> 
getAccumulators(ClassLoader classLoader) {
+               return 
CompletableFuture.completedFuture(Collections.emptyMap());
+       }
+
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 20195d5..32e41e7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -169,7 +169,7 @@ public class AccumulatorLiveITCase extends TestLogger {
                                deadline,
                                accumulators -> accumulators.size() == 1
                                        && 
accumulators.containsKey(ACCUMULATOR_NAME)
-                                       && (int) 
accumulators.get(ACCUMULATOR_NAME).getUnchecked() == NUM_ITERATIONS,
+                                       && (int) 
accumulators.get(ACCUMULATOR_NAME) == NUM_ITERATIONS,
                                TestingUtils.defaultScheduledExecutor()
                        ).get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 3c2446f..2436cb8 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -38,7 +38,6 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
-import org.apache.flink.util.OptionalFailure;
 
 import org.apache.commons.io.FileUtils;
 import org.junit.BeforeClass;
@@ -58,8 +57,8 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
-import static junit.framework.Assert.fail;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
 
 /**
  * Test savepoint migration.
@@ -146,21 +145,17 @@ public abstract class SavepointMigrationTestBase extends 
TestBaseUtils {
                boolean done = false;
                while (deadLine.hasTimeLeft()) {
                        Thread.sleep(100);
-                       Map<String, OptionalFailure<Object>> accumulators = 
client.getAccumulators(jobSubmissionResult.getJobID()).get();
+                       Map<String, Object> accumulators = 
client.getAccumulators(jobSubmissionResult.getJobID()).get();
 
                        boolean allDone = true;
                        for (Tuple2<String, Integer> acc : 
expectedAccumulators) {
-                               OptionalFailure<Object> accumOpt = 
accumulators.get(acc.f0);
+                               Object accumOpt = accumulators.get(acc.f0);
                                if (accumOpt == null) {
                                        allDone = false;
                                        break;
                                }
 
-                               Integer numFinished = (Integer) accumOpt.get();
-                               if (numFinished == null) {
-                                       allDone = false;
-                                       break;
-                               }
+                               Integer numFinished = (Integer) accumOpt;
                                if (!numFinished.equals(acc.f1)) {
                                        allDone = false;
                                        break;
@@ -226,16 +221,16 @@ public abstract class SavepointMigrationTestBase extends 
TestBaseUtils {
                        }
 
                        Thread.sleep(100);
-                       Map<String, OptionalFailure<Object>> accumulators = 
client.getAccumulators(jobId).get();
+                       Map<String, Object> accumulators = 
client.getAccumulators(jobId).get();
 
                        boolean allDone = true;
                        for (Tuple2<String, Integer> acc : 
expectedAccumulators) {
-                               OptionalFailure<Object> numFinished = 
accumulators.get(acc.f0);
+                               Object numFinished = accumulators.get(acc.f0);
                                if (numFinished == null) {
                                        allDone = false;
                                        break;
                                }
-                               if (!numFinished.get().equals(acc.f1)) {
+                               if (!numFinished.equals(acc.f1)) {
                                        allDone = false;
                                        break;
                                }

Reply via email to