Repository: flink
Updated Branches:
  refs/heads/master 363e8d2d6 -> e0bc37bef


[FLINK-8756][Client] Support ClusterClient.getAccumulators() in 
RestClusterClient

This closes #5573.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0bc37be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0bc37be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0bc37be

Branch: refs/heads/master
Commit: e0bc37bef69f5376d03214578e9b95816add661b
Parents: 363e8d2
Author: vinoyang <[email protected]>
Authored: Sat Feb 24 14:50:55 2018 +0800
Committer: Till Rohrmann <[email protected]>
Committed: Wed Mar 21 15:10:43 2018 +0100

----------------------------------------------------------------------
 .../client/program/rest/RestClusterClient.java  | 41 +++++++++++-
 .../program/rest/RestClusterClientTest.java     | 68 ++++++++++++++++++++
 .../handler/job/JobAccumulatorsHandler.java     | 35 +++++++---
 ...orsIncludeSerializedValueQueryParameter.java | 41 ++++++++++++
 .../rest/messages/JobAccumulatorsHeaders.java   |  6 +-
 .../rest/messages/JobAccumulatorsInfo.java      | 46 ++++++++++++-
 .../JobAccumulatorsMessageParameters.java       | 36 +++++++++++
 .../json/SerializedValueDeserializer.java       |  6 ++
 .../json/SerializedValueSerializer.java         |  6 ++
 .../rest/messages/JobAccumulatorsInfoTest.java  |  2 +-
 10 files changed, 273 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e0bc37be/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 5558461..f3f1961 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.program.rest;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -52,6 +53,9 @@ import 
org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
+import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo;
+import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
@@ -101,6 +105,7 @@ import java.net.URL;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -403,6 +408,40 @@ public class RestClusterClient<T> extends ClusterClient<T> 
{
                });
        }
 
+       @Override
+       public Map<String, Object> getAccumulators(final JobID jobID, 
ClassLoader loader) throws Exception {
+               final JobAccumulatorsHeaders accumulatorsHeaders = 
JobAccumulatorsHeaders.getInstance();
+               final JobAccumulatorsMessageParameters accMsgParams = 
accumulatorsHeaders.getUnresolvedMessageParameters();
+               accMsgParams.jobPathParameter.resolve(jobID);
+               
accMsgParams.includeSerializedAccumulatorsParameter.resolve(Collections.singletonList(true));
+
+               CompletableFuture<JobAccumulatorsInfo> responseFuture = 
sendRequest(
+                       accumulatorsHeaders,
+                       accMsgParams
+               );
+
+               Map<String, Object> result = Collections.emptyMap();
+
+               try {
+                       result = responseFuture.thenApply((JobAccumulatorsInfo 
accumulatorsInfo) -> {
+                               try {
+                                       return 
AccumulatorHelper.deserializeAccumulators(
+                                               
accumulatorsInfo.getSerializedUserAccumulators(),
+                                               loader);
+                               } catch (Exception e) {
+                                       throw new CompletionException(
+                                               new FlinkException(
+                                                       
String.format("Deserialization of accumulators for job %s failed.", jobID),
+                                                       e));
+                               }
+                       }).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+               } catch (ExecutionException ee) {
+                       
ExceptionUtils.rethrowException(ExceptionUtils.stripExecutionException(ee));
+               }
+
+               return result;
+       }
+
        private CompletableFuture<SavepointInfo> pollSavepointAsync(
                        final JobID jobId,
                        final TriggerId triggerID) {
@@ -661,7 +700,7 @@ public class RestClusterClient<T> extends ClusterClient<T> {
                                TimeUnit.MILLISECONDS)
                        .thenApplyAsync(leaderAddressSessionId -> {
                                final String address = 
leaderAddressSessionId.f0;
-                               final Optional<String> host = 
ScalaUtils.toJava(AddressFromURIString.parse(address).host());
+                               final Optional<String> host = 
ScalaUtils.<String>toJava(AddressFromURIString.parse(address).host());
 
                                return host.orElseGet(() -> {
                                        // if the dispatcher address does not 
contain a host part, then assume it's running

http://git-wip-us.apache.org/repos/asf/flink/blob/e0bc37be/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index ca2ba22..e108a0b 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -47,11 +47,15 @@ import 
org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
 import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
+import 
org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValueQueryParameter;
 import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
 import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
+import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo;
+import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
@@ -102,8 +106,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -118,6 +124,7 @@ import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -542,6 +549,67 @@ public class RestClusterClientTest extends TestLogger {
                }
        }
 
+       @Test
+       public void testGetAccumulators() throws Exception {
+               TestAccumulatorHandler accumulatorHandler = new 
TestAccumulatorHandler();
+
+               try (TestRestServerEndpoint ignored = 
createRestServerEndpoint(accumulatorHandler)){
+
+                       JobID id = new JobID();
+
+                       {
+                               Map<String, Object> accumulators = 
restClusterClient.getAccumulators(id);
+                               assertNotNull(accumulators);
+                               assertEquals(1, accumulators.size());
+
+                               assertEquals(true, 
accumulators.containsKey("testKey"));
+                               assertEquals("testValue", 
accumulators.get("testKey").toString());
+                       }
+               }
+       }
+
+       private class TestAccumulatorHandler extends 
TestHandler<EmptyRequestBody, JobAccumulatorsInfo, 
JobAccumulatorsMessageParameters> {
+
+               public TestAccumulatorHandler() {
+                       super(JobAccumulatorsHeaders.getInstance());
+               }
+
+               @Override
+               protected CompletableFuture<JobAccumulatorsInfo> handleRequest(
+                       @Nonnull HandlerRequest<EmptyRequestBody,
+                               JobAccumulatorsMessageParameters> request,
+                       @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
+                       JobAccumulatorsInfo accumulatorsInfo;
+                       List<Boolean> queryParams = 
request.getQueryParameter(AccumulatorsIncludeSerializedValueQueryParameter.class);
+
+                       final boolean includeSerializedValue;
+                       if (!queryParams.isEmpty()) {
+                               includeSerializedValue = queryParams.get(0);
+                       } else {
+                               includeSerializedValue = false;
+                       }
+
+                       List<JobAccumulatorsInfo.UserTaskAccumulator> 
userTaskAccumulators = new ArrayList<>(1);
+
+                       userTaskAccumulators.add(new 
JobAccumulatorsInfo.UserTaskAccumulator("testName", "testType", "testValue"));
+
+                       if (includeSerializedValue) {
+                               Map<String, SerializedValue<Object>> 
serializedUserTaskAccumulators = new HashMap<>(1);
+                               try {
+                                       
serializedUserTaskAccumulators.put("testKey", new 
SerializedValue<>("testValue"));
+                               } catch (IOException e) {
+                                       throw new RuntimeException(e);
+                               }
+
+                               accumulatorsInfo = new 
JobAccumulatorsInfo(Collections.emptyList(), userTaskAccumulators, 
serializedUserTaskAccumulators);
+                       } else {
+                               accumulatorsInfo = new 
JobAccumulatorsInfo(Collections.emptyList(), userTaskAccumulators, 
Collections.emptyMap());
+                       }
+
+                       return 
CompletableFuture.completedFuture(accumulatorsInfo);
+               }
+       }
+
        private class TestListJobsHandler extends TestHandler<EmptyRequestBody, 
MultipleJobsDetails, EmptyMessageParameters> {
 
                private TestListJobsHandler() {

http://git-wip-us.apache.org/repos/asf/flink/blob/e0bc37be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
index 7dd5ff0..0fe9201 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
@@ -24,12 +24,14 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import 
org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValueQueryParameter;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo;
-import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.SerializedValue;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -41,14 +43,14 @@ import java.util.concurrent.Executor;
 /**
  * Request handler that returns the aggregated accumulators of a job.
  */
-public class JobAccumulatorsHandler extends 
AbstractExecutionGraphHandler<JobAccumulatorsInfo, JobMessageParameters> {
+public class JobAccumulatorsHandler extends 
AbstractExecutionGraphHandler<JobAccumulatorsInfo, 
JobAccumulatorsMessageParameters> {
 
        public JobAccumulatorsHandler(
                        CompletableFuture<String> localRestAddress,
                        GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
                        Time timeout,
                        Map<String, String> responseHeaders,
-                       MessageHeaders<EmptyRequestBody, JobAccumulatorsInfo, 
JobMessageParameters> messageHeaders,
+                       MessageHeaders<EmptyRequestBody, JobAccumulatorsInfo, 
JobAccumulatorsMessageParameters> messageHeaders,
                        ExecutionGraphCache executionGraphCache,
                        Executor executor) {
                super(
@@ -62,11 +64,21 @@ public class JobAccumulatorsHandler extends 
AbstractExecutionGraphHandler<JobAcc
        }
 
        @Override
-       protected JobAccumulatorsInfo 
handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, 
AccessExecutionGraph graph) throws RestHandlerException {
-               StringifiedAccumulatorResult[] accs = 
graph.getAccumulatorResultsStringified();
-               List<JobAccumulatorsInfo.UserTaskAccumulator> 
userTaskAccumulators = new ArrayList<>(accs.length);
+       protected JobAccumulatorsInfo 
handleRequest(HandlerRequest<EmptyRequestBody, 
JobAccumulatorsMessageParameters> request, AccessExecutionGraph graph) throws 
RestHandlerException {
+               JobAccumulatorsInfo accumulatorsInfo;
+               List<Boolean> queryParams = 
request.getQueryParameter(AccumulatorsIncludeSerializedValueQueryParameter.class);
 
-               for (StringifiedAccumulatorResult acc : accs) {
+               final boolean includeSerializedValue;
+               if (!queryParams.isEmpty()) {
+                       includeSerializedValue = queryParams.get(0);
+               } else {
+                       includeSerializedValue = false;
+               }
+
+               StringifiedAccumulatorResult[] stringifiedAccs = 
graph.getAccumulatorResultsStringified();
+               List<JobAccumulatorsInfo.UserTaskAccumulator> 
userTaskAccumulators = new ArrayList<>(stringifiedAccs.length);
+
+               for (StringifiedAccumulatorResult acc : stringifiedAccs) {
                        userTaskAccumulators.add(
                                new JobAccumulatorsInfo.UserTaskAccumulator(
                                        acc.getName(),
@@ -74,6 +86,13 @@ public class JobAccumulatorsHandler extends 
AbstractExecutionGraphHandler<JobAcc
                                        acc.getValue()));
                }
 
-               return new JobAccumulatorsInfo(Collections.emptyList(), 
userTaskAccumulators);
+               if (includeSerializedValue) {
+                       Map<String, SerializedValue<Object>> 
serializedUserTaskAccumulators = graph.getAccumulatorsSerialized();
+                       accumulatorsInfo = new 
JobAccumulatorsInfo(Collections.emptyList(), userTaskAccumulators, 
serializedUserTaskAccumulators);
+               } else {
+                       accumulatorsInfo = new 
JobAccumulatorsInfo(Collections.emptyList(), userTaskAccumulators, 
Collections.emptyMap());
+               }
+
+               return accumulatorsInfo;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0bc37be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/AccumulatorsIncludeSerializedValueQueryParameter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/AccumulatorsIncludeSerializedValueQueryParameter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/AccumulatorsIncludeSerializedValueQueryParameter.java
new file mode 100644
index 0000000..1f685c2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/AccumulatorsIncludeSerializedValueQueryParameter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * Query parameter for job's accumulator handler {@link 
org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler}.
+ */
+public class AccumulatorsIncludeSerializedValueQueryParameter extends 
MessageQueryParameter<Boolean> {
+
+       private static final String key = "includeSerializedValue";
+
+       public AccumulatorsIncludeSerializedValueQueryParameter() {
+               super(key, MessageParameterRequisiteness.OPTIONAL);
+       }
+
+       @Override
+       public String convertValueToString(Boolean value) {
+               return String.valueOf(value);
+       }
+
+       @Override
+       public Boolean convertStringToValue(String value) {
+               return Boolean.valueOf(value);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e0bc37be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java
index 00f4fd5..2e00c91 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 /**
  * Message headers for the {@link JobAccumulatorsHandler}.
  */
-public class JobAccumulatorsHeaders implements 
MessageHeaders<EmptyRequestBody, JobAccumulatorsInfo, JobMessageParameters> {
+public class JobAccumulatorsHeaders implements 
MessageHeaders<EmptyRequestBody, JobAccumulatorsInfo, 
JobAccumulatorsMessageParameters> {
 
        private static final JobAccumulatorsHeaders INSTANCE = new 
JobAccumulatorsHeaders();
 
@@ -53,8 +53,8 @@ public class JobAccumulatorsHeaders implements 
MessageHeaders<EmptyRequestBody,
        }
 
        @Override
-       public JobMessageParameters getUnresolvedMessageParameters() {
-               return new JobMessageParameters();
+       public JobAccumulatorsMessageParameters 
getUnresolvedMessageParameters() {
+               return new JobAccumulatorsMessageParameters();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e0bc37be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
index 367a38b..2262120 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
@@ -19,12 +19,19 @@
 package org.apache.flink.runtime.rest.messages;
 
 import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
+import org.apache.flink.runtime.rest.messages.json.SerializedValueDeserializer;
+import org.apache.flink.runtime.rest.messages.json.SerializedValueSerializer;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 /**
@@ -33,6 +40,7 @@ import java.util.Objects;
 public class JobAccumulatorsInfo implements ResponseBody {
        public static final String FIELD_NAME_JOB_ACCUMULATORS = 
"job-accumulators";
        public static final String FIELD_NAME_USER_TASK_ACCUMULATORS = 
"user-task-accumulators";
+       public static final String FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS 
= "serialized-user-task-accumulators";
 
        @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS)
        private List<JobAccumulator> jobAccumulators;
@@ -40,12 +48,33 @@ public class JobAccumulatorsInfo implements ResponseBody {
        @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS)
        private List<UserTaskAccumulator> userAccumulators;
 
+       @JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS)
+       @JsonSerialize(contentUsing = SerializedValueSerializer.class)
+       private Map<String, SerializedValue<Object>> serializedUserAccumulators;
+
        @JsonCreator
        public JobAccumulatorsInfo(
                        @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) 
List<JobAccumulator> jobAccumulators,
-                       @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) 
List<UserTaskAccumulator> userAccumulators) {
+                       @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) 
List<UserTaskAccumulator> userAccumulators,
+                       @JsonDeserialize(contentUsing = 
SerializedValueDeserializer.class) 
@JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) Map<String, 
SerializedValue<Object>> serializedUserAccumulators) {
                this.jobAccumulators = 
Preconditions.checkNotNull(jobAccumulators);
                this.userAccumulators = 
Preconditions.checkNotNull(userAccumulators);
+               this.serializedUserAccumulators = 
Preconditions.checkNotNull(serializedUserAccumulators);
+       }
+
+       @JsonIgnore
+       public List<JobAccumulator> getJobAccumulators() {
+               return jobAccumulators;
+       }
+
+       @JsonIgnore
+       public List<UserTaskAccumulator> getUserAccumulators() {
+               return userAccumulators;
+       }
+
+       @JsonIgnore
+       public Map<String, SerializedValue<Object>> 
getSerializedUserAccumulators() {
+               return serializedUserAccumulators;
        }
 
        @Override
@@ -104,6 +133,21 @@ public class JobAccumulatorsInfo implements ResponseBody {
                        this.value = Preconditions.checkNotNull(value);
                }
 
+               @JsonIgnore
+               public String getName() {
+                       return name;
+               }
+
+               @JsonIgnore
+               public String getType() {
+                       return type;
+               }
+
+               @JsonIgnore
+               public String getValue() {
+                       return value;
+               }
+
                @Override
                public boolean equals(Object o) {
                        if (this == o) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e0bc37be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsMessageParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsMessageParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsMessageParameters.java
new file mode 100644
index 0000000..ef23560
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsMessageParameters.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Request parameter for job accumulator's handler {@link 
org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler}.
+ */
+public class JobAccumulatorsMessageParameters extends JobMessageParameters {
+
+       public final AccumulatorsIncludeSerializedValueQueryParameter
+               includeSerializedAccumulatorsParameter = new 
AccumulatorsIncludeSerializedValueQueryParameter();
+
+       @Override
+       public Collection<MessageQueryParameter<?>> getQueryParameters() {
+               return 
Collections.singleton(includeSerializedAccumulatorsParameter);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e0bc37be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java
index 6a2eadb..d7c321d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java
@@ -21,9 +21,11 @@ package org.apache.flink.runtime.rest.messages.json;
 import org.apache.flink.util.SerializedValue;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory;
 
 import java.io.IOException;
 
@@ -34,6 +36,10 @@ public class SerializedValueDeserializer extends 
StdDeserializer<SerializedValue
 
        private static final long serialVersionUID = 1L;
 
+       public SerializedValueDeserializer() {
+               super(TypeFactory.defaultInstance().constructType(new 
TypeReference<SerializedValue<Object>>() {}));
+       }
+
        public SerializedValueDeserializer(final JavaType valueType) {
                super(valueType);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0bc37be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializer.java
index 0383d99..b63b1ef 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializer.java
@@ -21,9 +21,11 @@ package org.apache.flink.runtime.rest.messages.json;
 import org.apache.flink.util.SerializedValue;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory;
 
 import java.io.IOException;
 
@@ -36,6 +38,10 @@ public class SerializedValueSerializer extends 
StdSerializer<SerializedValue<?>>
 
        private static final long serialVersionUID = 1L;
 
+       public SerializedValueSerializer() {
+               super(TypeFactory.defaultInstance().constructType(new 
TypeReference<SerializedValue<Object>>() {}));
+       }
+
        public SerializedValueSerializer(final JavaType javaType) {
                super(javaType);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0bc37be/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java
index baaa551..e0e9649 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java
@@ -47,6 +47,6 @@ public class JobAccumulatorsInfoTest extends 
RestResponseMarshallingTestBase<Job
                        "uta3.type",
                        "uta3.value"));
 
-               return new JobAccumulatorsInfo(Collections.emptyList(), 
userAccumulatorList);
+               return new JobAccumulatorsInfo(Collections.emptyList(), 
userAccumulatorList, Collections.EMPTY_MAP);
        }
 }

Reply via email to