Repository: flink Updated Branches: refs/heads/master 363e8d2d6 -> e0bc37bef
[FLINK-8756][Client] Support ClusterClient.getAccumulators() in RestClusterClient This closes #5573. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0bc37be Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0bc37be Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0bc37be Branch: refs/heads/master Commit: e0bc37bef69f5376d03214578e9b95816add661b Parents: 363e8d2 Author: vinoyang <[email protected]> Authored: Sat Feb 24 14:50:55 2018 +0800 Committer: Till Rohrmann <[email protected]> Committed: Wed Mar 21 15:10:43 2018 +0100 ---------------------------------------------------------------------- .../client/program/rest/RestClusterClient.java | 41 +++++++++++- .../program/rest/RestClusterClientTest.java | 68 ++++++++++++++++++++ .../handler/job/JobAccumulatorsHandler.java | 35 +++++++--- ...orsIncludeSerializedValueQueryParameter.java | 41 ++++++++++++ .../rest/messages/JobAccumulatorsHeaders.java | 6 +- .../rest/messages/JobAccumulatorsInfo.java | 46 ++++++++++++- .../JobAccumulatorsMessageParameters.java | 36 +++++++++++ .../json/SerializedValueDeserializer.java | 6 ++ .../json/SerializedValueSerializer.java | 6 ++ .../rest/messages/JobAccumulatorsInfoTest.java | 2 +- 10 files changed, 273 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e0bc37be/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 5558461..f3f1961 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -21,6 +21,7 @@ package org.apache.flink.client.program.rest; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.time.Time; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ProgramInvocationException; @@ -52,6 +53,9 @@ import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders; +import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo; +import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters; import org.apache.flink.runtime.rest.messages.JobMessageParameters; import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; @@ -101,6 +105,7 @@ import java.net.URL; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -403,6 +408,40 @@ public class RestClusterClient<T> extends ClusterClient<T> { }); } + @Override + public Map<String, Object> getAccumulators(final JobID jobID, ClassLoader loader) throws Exception { + final JobAccumulatorsHeaders accumulatorsHeaders = JobAccumulatorsHeaders.getInstance(); + final JobAccumulatorsMessageParameters accMsgParams = accumulatorsHeaders.getUnresolvedMessageParameters(); + accMsgParams.jobPathParameter.resolve(jobID); + accMsgParams.includeSerializedAccumulatorsParameter.resolve(Collections.singletonList(true)); + + CompletableFuture<JobAccumulatorsInfo> responseFuture = sendRequest( + accumulatorsHeaders, + accMsgParams + ); + + Map<String, Object> result = Collections.emptyMap(); + + try { + result = responseFuture.thenApply((JobAccumulatorsInfo accumulatorsInfo) -> { + try { + return AccumulatorHelper.deserializeAccumulators( + accumulatorsInfo.getSerializedUserAccumulators(), + loader); + } catch (Exception e) { + throw new CompletionException( + new FlinkException( + String.format("Deserialization of accumulators for job %s failed.", jobID), + e)); + } + }).get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (ExecutionException ee) { + ExceptionUtils.rethrowException(ExceptionUtils.stripExecutionException(ee)); + } + + return result; + } + private CompletableFuture<SavepointInfo> pollSavepointAsync( final JobID jobId, final TriggerId triggerID) { @@ -661,7 +700,7 @@ public class RestClusterClient<T> extends ClusterClient<T> { TimeUnit.MILLISECONDS) .thenApplyAsync(leaderAddressSessionId -> { final String address = leaderAddressSessionId.f0; - final Optional<String> host = ScalaUtils.toJava(AddressFromURIString.parse(address).host()); + final Optional<String> host = ScalaUtils.<String>toJava(AddressFromURIString.parse(address).host()); return host.orElseGet(() -> { // if the dispatcher address does not contain a host part, then assume it's running http://git-wip-us.apache.org/repos/asf/flink/blob/e0bc37be/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index ca2ba22..e108a0b 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -47,11 +47,15 @@ import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult; import org.apache.flink.runtime.rest.handler.async.TriggerResponse; +import org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValueQueryParameter; import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders; +import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo; +import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters; import org.apache.flink.runtime.rest.messages.JobMessageParameters; import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; @@ -102,8 +106,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -118,6 +124,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -542,6 +549,67 @@ public class RestClusterClientTest extends TestLogger { } } + @Test + public void testGetAccumulators() throws Exception { + TestAccumulatorHandler accumulatorHandler = new TestAccumulatorHandler(); + + try (TestRestServerEndpoint ignored = createRestServerEndpoint(accumulatorHandler)){ + + JobID id = new JobID(); + + { + Map<String, Object> accumulators = restClusterClient.getAccumulators(id); + assertNotNull(accumulators); + assertEquals(1, accumulators.size()); + + assertEquals(true, accumulators.containsKey("testKey")); + assertEquals("testValue", accumulators.get("testKey").toString()); + } + } + } + + private class TestAccumulatorHandler extends TestHandler<EmptyRequestBody, JobAccumulatorsInfo, JobAccumulatorsMessageParameters> { + + public TestAccumulatorHandler() { + super(JobAccumulatorsHeaders.getInstance()); + } + + @Override + protected CompletableFuture<JobAccumulatorsInfo> handleRequest( + @Nonnull HandlerRequest<EmptyRequestBody, + JobAccumulatorsMessageParameters> request, + @Nonnull DispatcherGateway gateway) throws RestHandlerException { + JobAccumulatorsInfo accumulatorsInfo; + List<Boolean> queryParams = request.getQueryParameter(AccumulatorsIncludeSerializedValueQueryParameter.class); + + final boolean includeSerializedValue; + if (!queryParams.isEmpty()) { + includeSerializedValue = queryParams.get(0); + } else { + includeSerializedValue = false; + } + + List<JobAccumulatorsInfo.UserTaskAccumulator> userTaskAccumulators = new ArrayList<>(1); + + userTaskAccumulators.add(new JobAccumulatorsInfo.UserTaskAccumulator("testName", "testType", "testValue")); + + if (includeSerializedValue) { + Map<String, SerializedValue<Object>> serializedUserTaskAccumulators = new HashMap<>(1); + try { + serializedUserTaskAccumulators.put("testKey", new SerializedValue<>("testValue")); + } catch (IOException e) { + throw new RuntimeException(e); + } + + accumulatorsInfo = new JobAccumulatorsInfo(Collections.emptyList(), userTaskAccumulators, serializedUserTaskAccumulators); + } else { + accumulatorsInfo = new JobAccumulatorsInfo(Collections.emptyList(), userTaskAccumulators, Collections.emptyMap()); + } + + return CompletableFuture.completedFuture(accumulatorsInfo); + } + } + private class TestListJobsHandler extends TestHandler<EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> { private TestListJobsHandler() { http://git-wip-us.apache.org/repos/asf/flink/blob/e0bc37be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java index 7dd5ff0..0fe9201 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java @@ -24,12 +24,14 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValueQueryParameter; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo; -import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters; import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.SerializedValue; import java.util.ArrayList; import java.util.Collections; @@ -41,14 +43,14 @@ import java.util.concurrent.Executor; /** * Request handler that returns the aggregated accumulators of a job. */ -public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAccumulatorsInfo, JobMessageParameters> { +public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAccumulatorsInfo, JobAccumulatorsMessageParameters> { public JobAccumulatorsHandler( CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, - MessageHeaders<EmptyRequestBody, JobAccumulatorsInfo, JobMessageParameters> messageHeaders, + MessageHeaders<EmptyRequestBody, JobAccumulatorsInfo, JobAccumulatorsMessageParameters> messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor) { super( @@ -62,11 +64,21 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAcc } @Override - protected JobAccumulatorsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph graph) throws RestHandlerException { - StringifiedAccumulatorResult[] accs = graph.getAccumulatorResultsStringified(); - List<JobAccumulatorsInfo.UserTaskAccumulator> userTaskAccumulators = new ArrayList<>(accs.length); + protected JobAccumulatorsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobAccumulatorsMessageParameters> request, AccessExecutionGraph graph) throws RestHandlerException { + JobAccumulatorsInfo accumulatorsInfo; + List<Boolean> queryParams = request.getQueryParameter(AccumulatorsIncludeSerializedValueQueryParameter.class); - for (StringifiedAccumulatorResult acc : accs) { + final boolean includeSerializedValue; + if (!queryParams.isEmpty()) { + includeSerializedValue = queryParams.get(0); + } else { + includeSerializedValue = false; + } + + StringifiedAccumulatorResult[] stringifiedAccs = graph.getAccumulatorResultsStringified(); + List<JobAccumulatorsInfo.UserTaskAccumulator> userTaskAccumulators = new ArrayList<>(stringifiedAccs.length); + + for (StringifiedAccumulatorResult acc : stringifiedAccs) { userTaskAccumulators.add( new JobAccumulatorsInfo.UserTaskAccumulator( acc.getName(), @@ -74,6 +86,13 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAcc acc.getValue())); } - return new JobAccumulatorsInfo(Collections.emptyList(), userTaskAccumulators); + if (includeSerializedValue) { + Map<String, SerializedValue<Object>> serializedUserTaskAccumulators = graph.getAccumulatorsSerialized(); + accumulatorsInfo = new JobAccumulatorsInfo(Collections.emptyList(), userTaskAccumulators, serializedUserTaskAccumulators); + } else { + accumulatorsInfo = new JobAccumulatorsInfo(Collections.emptyList(), userTaskAccumulators, Collections.emptyMap()); + } + + return accumulatorsInfo; } } http://git-wip-us.apache.org/repos/asf/flink/blob/e0bc37be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/AccumulatorsIncludeSerializedValueQueryParameter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/AccumulatorsIncludeSerializedValueQueryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/AccumulatorsIncludeSerializedValueQueryParameter.java new file mode 100644 index 0000000..1f685c2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/AccumulatorsIncludeSerializedValueQueryParameter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +/** + * Query parameter for job's accumulator handler {@link org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler}. + */ +public class AccumulatorsIncludeSerializedValueQueryParameter extends MessageQueryParameter<Boolean> { + + private static final String key = "includeSerializedValue"; + + public AccumulatorsIncludeSerializedValueQueryParameter() { + super(key, MessageParameterRequisiteness.OPTIONAL); + } + + @Override + public String convertValueToString(Boolean value) { + return String.valueOf(value); + } + + @Override + public Boolean convertStringToValue(String value) { + return Boolean.valueOf(value); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e0bc37be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java index 00f4fd5..2e00c91 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java @@ -26,7 +26,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt /** * Message headers for the {@link JobAccumulatorsHandler}. */ -public class JobAccumulatorsHeaders implements MessageHeaders<EmptyRequestBody, JobAccumulatorsInfo, JobMessageParameters> { +public class JobAccumulatorsHeaders implements MessageHeaders<EmptyRequestBody, JobAccumulatorsInfo, JobAccumulatorsMessageParameters> { private static final JobAccumulatorsHeaders INSTANCE = new JobAccumulatorsHeaders(); @@ -53,8 +53,8 @@ public class JobAccumulatorsHeaders implements MessageHeaders<EmptyRequestBody, } @Override - public JobMessageParameters getUnresolvedMessageParameters() { - return new JobMessageParameters(); + public JobAccumulatorsMessageParameters getUnresolvedMessageParameters() { + return new JobAccumulatorsMessageParameters(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/e0bc37be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java index 367a38b..2262120 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java @@ -19,12 +19,19 @@ package org.apache.flink.runtime.rest.messages; import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler; +import org.apache.flink.runtime.rest.messages.json.SerializedValueDeserializer; +import org.apache.flink.runtime.rest.messages.json.SerializedValueSerializer; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedValue; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; import java.util.List; +import java.util.Map; import java.util.Objects; /** @@ -33,6 +40,7 @@ import java.util.Objects; public class JobAccumulatorsInfo implements ResponseBody { public static final String FIELD_NAME_JOB_ACCUMULATORS = "job-accumulators"; public static final String FIELD_NAME_USER_TASK_ACCUMULATORS = "user-task-accumulators"; + public static final String FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS = "serialized-user-task-accumulators"; @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) private List<JobAccumulator> jobAccumulators; @@ -40,12 +48,33 @@ public class JobAccumulatorsInfo implements ResponseBody { @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) private List<UserTaskAccumulator> userAccumulators; + @JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) + @JsonSerialize(contentUsing = SerializedValueSerializer.class) + private Map<String, SerializedValue<Object>> serializedUserAccumulators; + @JsonCreator public JobAccumulatorsInfo( @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) List<JobAccumulator> jobAccumulators, - @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) List<UserTaskAccumulator> userAccumulators) { + @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) List<UserTaskAccumulator> userAccumulators, + @JsonDeserialize(contentUsing = SerializedValueDeserializer.class) @JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) Map<String, SerializedValue<Object>> serializedUserAccumulators) { this.jobAccumulators = Preconditions.checkNotNull(jobAccumulators); this.userAccumulators = Preconditions.checkNotNull(userAccumulators); + this.serializedUserAccumulators = Preconditions.checkNotNull(serializedUserAccumulators); + } + + @JsonIgnore + public List<JobAccumulator> getJobAccumulators() { + return jobAccumulators; + } + + @JsonIgnore + public List<UserTaskAccumulator> getUserAccumulators() { + return userAccumulators; + } + + @JsonIgnore + public Map<String, SerializedValue<Object>> getSerializedUserAccumulators() { + return serializedUserAccumulators; } @Override @@ -104,6 +133,21 @@ public class JobAccumulatorsInfo implements ResponseBody { this.value = Preconditions.checkNotNull(value); } + @JsonIgnore + public String getName() { + return name; + } + + @JsonIgnore + public String getType() { + return type; + } + + @JsonIgnore + public String getValue() { + return value; + } + @Override public boolean equals(Object o) { if (this == o) { http://git-wip-us.apache.org/repos/asf/flink/blob/e0bc37be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsMessageParameters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsMessageParameters.java new file mode 100644 index 0000000..ef23560 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsMessageParameters.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import java.util.Collection; +import java.util.Collections; + +/** + * Request parameter for job accumulator's handler {@link org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler}. + */ +public class JobAccumulatorsMessageParameters extends JobMessageParameters { + + public final AccumulatorsIncludeSerializedValueQueryParameter + includeSerializedAccumulatorsParameter = new AccumulatorsIncludeSerializedValueQueryParameter(); + + @Override + public Collection<MessageQueryParameter<?>> getQueryParameters() { + return Collections.singleton(includeSerializedAccumulatorsParameter); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e0bc37be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java index 6a2eadb..d7c321d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java @@ -21,9 +21,11 @@ package org.apache.flink.runtime.rest.messages.json; import org.apache.flink.util.SerializedValue; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory; import java.io.IOException; @@ -34,6 +36,10 @@ public class SerializedValueDeserializer extends StdDeserializer<SerializedValue private static final long serialVersionUID = 1L; + public SerializedValueDeserializer() { + super(TypeFactory.defaultInstance().constructType(new TypeReference<SerializedValue<Object>>() {})); + } + public SerializedValueDeserializer(final JavaType valueType) { super(valueType); } http://git-wip-us.apache.org/repos/asf/flink/blob/e0bc37be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializer.java index 0383d99..b63b1ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializer.java @@ -21,9 +21,11 @@ package org.apache.flink.runtime.rest.messages.json; import org.apache.flink.util.SerializedValue; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory; import java.io.IOException; @@ -36,6 +38,10 @@ public class SerializedValueSerializer extends StdSerializer<SerializedValue<?>> private static final long serialVersionUID = 1L; + public SerializedValueSerializer() { + super(TypeFactory.defaultInstance().constructType(new TypeReference<SerializedValue<Object>>() {})); + } + public SerializedValueSerializer(final JavaType javaType) { super(javaType); } http://git-wip-us.apache.org/repos/asf/flink/blob/e0bc37be/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java index baaa551..e0e9649 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java @@ -47,6 +47,6 @@ public class JobAccumulatorsInfoTest extends RestResponseMarshallingTestBase<Job "uta3.type", "uta3.value")); - return new JobAccumulatorsInfo(Collections.emptyList(), userAccumulatorList); + return new JobAccumulatorsInfo(Collections.emptyList(), userAccumulatorList, Collections.EMPTY_MAP); } }
