This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d8aa9b902eadc74bd3ce3d6f6ef5f0a73bc7331f Author: tison <[email protected]> AuthorDate: Fri Nov 29 10:47:31 2019 +0800 [FLINK-14762][client] Implement JobClient#getAccumulators --- .../deployment/ClusterClientJobClientAdapter.java | 6 +++++ .../apache/flink/client/program/ClusterClient.java | 5 ++-- .../flink/client/program/MiniClusterClient.java | 18 +++++--------- .../client/program/rest/RestClusterClient.java | 24 ++++++++----------- .../flink/client/program/TestingClusterClient.java | 3 +-- .../client/program/rest/RestClusterClientTest.java | 19 ++++++--------- .../api/common/accumulators/AccumulatorHelper.java | 28 ++++++++++++++++++++-- .../org/apache/flink/core/execution/JobClient.java | 7 ++++++ .../apache/flink/api/java/TestingJobClient.java | 6 +++++ .../streaming/environment/TestingJobClient.java | 6 +++++ .../test/accumulators/AccumulatorLiveITCase.java | 2 +- .../utils/SavepointMigrationTestBase.java | 19 ++++++--------- 12 files changed, 85 insertions(+), 58 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java index f31b9b7..91a9c91 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java @@ -31,6 +31,7 @@ import org.apache.flink.util.function.FunctionUtils; import javax.annotation.Nullable; import java.io.IOException; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicBoolean; @@ -74,6 +75,11 @@ public class ClusterClientJobClientAdapter<ClusterID> implements JobClient { } @Override + public CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader) { + return clusterClient.getAccumulators(jobID, classLoader); + } + + @Override public CompletableFuture<JobExecutionResult> getJobExecutionResult(final ClassLoader userClassloader) { checkNotNull(userClassloader); diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index 8208465..eb19109 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -26,7 +26,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.util.FlinkException; -import org.apache.flink.util.OptionalFailure; import javax.annotation.Nullable; @@ -114,7 +113,7 @@ public interface ClusterClient<T> extends AutoCloseable { * @param jobID The job identifier of a job. * @return A Map containing the accumulator's name and its value. */ - default CompletableFuture<Map<String, OptionalFailure<Object>>> getAccumulators(JobID jobID) { + default CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID) { return getAccumulators(jobID, ClassLoader.getSystemClassLoader()); } @@ -125,7 +124,7 @@ public interface ClusterClient<T> extends AutoCloseable { * @param loader The class loader for deserializing the accumulator results. * @return A Map containing the accumulator's name and its value. */ - CompletableFuture<Map<String, OptionalFailure<Object>>> getAccumulators(JobID jobID, ClassLoader loader); + CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader); /** * Cancels a job identified by the job id. diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java index 1f32c0a..7f47552 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java @@ -20,6 +20,7 @@ package org.apache.flink.client.program; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; @@ -29,8 +30,6 @@ import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.OptionalFailure; -import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +38,6 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.Collection; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -106,20 +104,16 @@ public class MiniClusterClient implements ClusterClient<MiniClusterClient.MiniCl } @Override - public CompletableFuture<Map<String, OptionalFailure<Object>>> getAccumulators(JobID jobID, ClassLoader loader) { + public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader) { return miniCluster .getExecutionGraph(jobID) .thenApply(AccessExecutionGraph::getAccumulatorsSerialized) .thenApply(accumulators -> { - Map<String, OptionalFailure<Object>> result = new HashMap<>(accumulators.size()); - for (Map.Entry<String, SerializedValue<OptionalFailure<Object>>> acc : accumulators.entrySet()) { - try { - result.put(acc.getKey(), acc.getValue().deserializeValue(loader)); - } catch (Exception e) { - throw new CompletionException("Cannot deserialize accumulators.", e); - } + try { + return AccumulatorHelper.deserializeAndUnwrapAccumulators(accumulators, loader); + } catch (Exception e) { + throw new CompletionException("Cannot deserialize and unwrap accumulators properly.", e); } - return result; }); } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index bacbe15..607c320 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -88,7 +88,6 @@ import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.FlinkException; -import org.apache.flink.util.OptionalFailure; import org.apache.flink.util.function.CheckedSupplier; import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException; @@ -405,7 +404,7 @@ public class RestClusterClient<T> implements ClusterClient<T> { } @Override - public CompletableFuture<Map<String, OptionalFailure<Object>>> getAccumulators(JobID jobID, ClassLoader loader) { + public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader) { final JobAccumulatorsHeaders accumulatorsHeaders = JobAccumulatorsHeaders.getInstance(); final JobAccumulatorsMessageParameters accMsgParams = accumulatorsHeaders.getUnresolvedMessageParameters(); accMsgParams.jobPathParameter.resolve(jobID); @@ -415,18 +414,15 @@ public class RestClusterClient<T> implements ClusterClient<T> { accumulatorsHeaders, accMsgParams); - return responseFuture.thenApply((JobAccumulatorsInfo accumulatorsInfo) -> { - try { - return AccumulatorHelper.deserializeAccumulators( - accumulatorsInfo.getSerializedUserAccumulators(), - loader); - } catch (Exception e) { - throw new CompletionException( - new FlinkException( - String.format("Deserialization of accumulators for job %s failed.", jobID), - e)); - } - }); + return responseFuture + .thenApply(JobAccumulatorsInfo::getSerializedUserAccumulators) + .thenApply(accumulators -> { + try { + return AccumulatorHelper.deserializeAndUnwrapAccumulators(accumulators, loader); + } catch (Exception e) { + throw new CompletionException("Cannot deserialize and unwrap accumulators properly.", e); + } + }); } private CompletableFuture<SavepointInfo> pollSavepointAsync( diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java index a711c55..f3be219 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java @@ -25,7 +25,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.util.OptionalFailure; import org.apache.flink.util.function.TriFunction; import javax.annotation.Nonnull; @@ -109,7 +108,7 @@ public class TestingClusterClient<T> implements ClusterClient<T> { } @Override - public CompletableFuture<Map<String, OptionalFailure<Object>>> getAccumulators(JobID jobID, ClassLoader loader) { + public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader) { throw new UnsupportedOperationException(); } diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index 16599c5..d3a381c 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -131,6 +131,7 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** @@ -512,21 +513,15 @@ public class RestClusterClientTest extends TestLogger { TestAccumulatorHandler accumulatorHandler = new TestAccumulatorHandler(); try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(accumulatorHandler)){ - RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort()); - try { + try (RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) { JobID id = new JobID(); + Map<String, Object> accumulators = restClusterClient.getAccumulators(id).get(); + assertNotNull(accumulators); + assertEquals(1, accumulators.size()); - { - Map<String, OptionalFailure<Object>> accumulators = restClusterClient.getAccumulators(id).get(); - assertNotNull(accumulators); - assertEquals(1, accumulators.size()); - - assertEquals(true, accumulators.containsKey("testKey")); - assertEquals("testValue", accumulators.get("testKey").get().toString()); - } - } finally { - restClusterClient.close(); + assertTrue(accumulators.containsKey("testKey")); + assertEquals("testValue", accumulators.get("testKey").toString()); } } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java index 9bc1299..b6d3971 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java @@ -178,8 +178,6 @@ public class AccumulatorHelper { * @param serializedAccumulators The serialized accumulator results. * @param loader The class loader to use. * @return The deserialized accumulator results. - * @throws IOException - * @throws ClassNotFoundException */ public static Map<String, OptionalFailure<Object>> deserializeAccumulators( Map<String, SerializedValue<OptionalFailure<Object>>> serializedAccumulators, @@ -203,4 +201,30 @@ public class AccumulatorHelper { return accumulators; } + + /** + * Takes the serialized accumulator results and tries to deserialize them using the provided + * class loader, and then try to unwrap the value unchecked. + * @param serializedAccumulators The serialized accumulator results. + * @param loader The class loader to use. + * @return The deserialized and unwrapped accumulator results. + */ + public static Map<String, Object> deserializeAndUnwrapAccumulators( + Map<String, SerializedValue<OptionalFailure<Object>>> serializedAccumulators, + ClassLoader loader) throws IOException, ClassNotFoundException { + + Map<String, OptionalFailure<Object>> deserializedAccumulators = deserializeAccumulators(serializedAccumulators, loader); + + if (deserializedAccumulators.isEmpty()) { + return Collections.emptyMap(); + } + + Map<String, Object> accumulators = new HashMap<>(serializedAccumulators.size()); + + for (Map.Entry<String, OptionalFailure<Object>> entry : deserializedAccumulators.entrySet()) { + accumulators.put(entry.getKey(), entry.getValue().getUnchecked()); + } + + return accumulators; + } } diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java index d514bc4..5bec503 100644 --- a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java +++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID; import javax.annotation.Nullable; +import java.util.Map; import java.util.concurrent.CompletableFuture; /** @@ -65,6 +66,12 @@ public interface JobClient extends AutoCloseable { CompletableFuture<String> triggerSavepoint(@Nullable String savepointDirectory); /** + * Requests the accumulators of the associated job. Accumulators can be requested while it is running + * or after it has finished. The class loader is used to deserialize the incoming accumulator results. + */ + CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader); + + /** * Returns the {@link JobExecutionResult result of the job execution} of the submitted job. * * @param userClassloader the classloader used to de-serialize the accumulators of the job. diff --git a/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java b/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java index d1725a5..b5c9873 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java @@ -25,6 +25,7 @@ import org.apache.flink.core.execution.JobClient; import javax.annotation.Nullable; import java.util.Collections; +import java.util.Map; import java.util.concurrent.CompletableFuture; /** @@ -57,4 +58,9 @@ public class TestingJobClient implements JobClient { return CompletableFuture.completedFuture("null"); } + @Override + public CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader) { + return CompletableFuture.completedFuture(Collections.emptyMap()); + } + } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java index ede0e19..bcc02ee 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java @@ -25,6 +25,7 @@ import org.apache.flink.core.execution.JobClient; import javax.annotation.Nullable; import java.util.Collections; +import java.util.Map; import java.util.concurrent.CompletableFuture; /** @@ -57,4 +58,9 @@ public class TestingJobClient implements JobClient { return CompletableFuture.completedFuture("null"); } + @Override + public CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader) { + return CompletableFuture.completedFuture(Collections.emptyMap()); + } + } diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java index 20195d5..32e41e7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java @@ -169,7 +169,7 @@ public class AccumulatorLiveITCase extends TestLogger { deadline, accumulators -> accumulators.size() == 1 && accumulators.containsKey(ACCUMULATOR_NAME) - && (int) accumulators.get(ACCUMULATOR_NAME).getUnchecked() == NUM_ITERATIONS, + && (int) accumulators.get(ACCUMULATOR_NAME) == NUM_ITERATIONS, TestingUtils.defaultScheduledExecutor() ).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java index 3c2446f..2436cb8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java @@ -38,7 +38,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG; -import org.apache.flink.util.OptionalFailure; import org.apache.commons.io.FileUtils; import org.junit.BeforeClass; @@ -58,8 +57,8 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import static junit.framework.Assert.fail; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.fail; /** * Test savepoint migration. @@ -146,21 +145,17 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils { boolean done = false; while (deadLine.hasTimeLeft()) { Thread.sleep(100); - Map<String, OptionalFailure<Object>> accumulators = client.getAccumulators(jobSubmissionResult.getJobID()).get(); + Map<String, Object> accumulators = client.getAccumulators(jobSubmissionResult.getJobID()).get(); boolean allDone = true; for (Tuple2<String, Integer> acc : expectedAccumulators) { - OptionalFailure<Object> accumOpt = accumulators.get(acc.f0); + Object accumOpt = accumulators.get(acc.f0); if (accumOpt == null) { allDone = false; break; } - Integer numFinished = (Integer) accumOpt.get(); - if (numFinished == null) { - allDone = false; - break; - } + Integer numFinished = (Integer) accumOpt; if (!numFinished.equals(acc.f1)) { allDone = false; break; @@ -226,16 +221,16 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils { } Thread.sleep(100); - Map<String, OptionalFailure<Object>> accumulators = client.getAccumulators(jobId).get(); + Map<String, Object> accumulators = client.getAccumulators(jobId).get(); boolean allDone = true; for (Tuple2<String, Integer> acc : expectedAccumulators) { - OptionalFailure<Object> numFinished = accumulators.get(acc.f0); + Object numFinished = accumulators.get(acc.f0); if (numFinished == null) { allDone = false; break; } - if (!numFinished.get().equals(acc.f1)) { + if (!numFinished.equals(acc.f1)) { allDone = false; break; }
