[FLINK-3126] [core] Remove accumulator type from "value" in web frontend

This closes #1868


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db85f385
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db85f385
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db85f385

Branch: refs/heads/master
Commit: db85f385846a6541c0707ed1ba6fed78446423b5
Parents: 342db48
Author: Zack Pierce <[email protected]>
Authored: Mon Apr 11 10:17:34 2016 -0700
Committer: Stephan Ewen <[email protected]>
Committed: Wed Apr 13 01:10:55 2016 +0200

----------------------------------------------------------------------
 .../StringifiedAccumulatorResult.java           |  14 +-
 .../StringifiedAccumulatorResultTest.java       | 138 +++++++++++++++++++
 2 files changed, 149 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/db85f385/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
index a0d1eda..c4faad1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
@@ -55,6 +55,9 @@ public class StringifiedAccumulatorResult implements 
java.io.Serializable{
        //  Utilities
        // 
------------------------------------------------------------------------
 
+       /**
+        * Flatten a map of accumulator names to Accumulator instances into an 
array of StringifiedAccumulatorResult values
+     */
        public static StringifiedAccumulatorResult[] 
stringifyAccumulatorResults(Map<String, Accumulator<?, ?>> accs) {
                if (accs == null || accs.isEmpty()) {
                        return new StringifiedAccumulatorResult[0];
@@ -65,9 +68,14 @@ public class StringifiedAccumulatorResult implements 
java.io.Serializable{
                        int i = 0;
                        for (Map.Entry<String, Accumulator<?, ?>> entry : 
accs.entrySet()) {
                                StringifiedAccumulatorResult result;
-                               Accumulator<?, ?> value = entry.getValue();
-                               if (value != null) {
-                                       result = new 
StringifiedAccumulatorResult(entry.getKey(), value.getClass().getSimpleName(), 
value.toString());
+                               Accumulator<?, ?> accumulator = 
entry.getValue();
+                               if (accumulator != null) {
+                                       Object localValue = 
accumulator.getLocalValue();
+                                       if (localValue != null) {
+                                               result = new 
StringifiedAccumulatorResult(entry.getKey(), 
accumulator.getClass().getSimpleName(), localValue.toString());
+                                       } else {
+                                               result = new 
StringifiedAccumulatorResult(entry.getKey(), 
accumulator.getClass().getSimpleName(), "null");
+                                       }
                                } else {
                                        result = new 
StringifiedAccumulatorResult(entry.getKey(), "null", "null");
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/db85f385/flink-runtime/src/test/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResultTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResultTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResultTest.java
new file mode 100644
index 0000000..e6d637b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResultTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.accumulators;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.SimpleAccumulator;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class StringifiedAccumulatorResultTest {
+
+       @Test
+       public void testSerialization() throws IOException {
+               final String name = "a";
+               final String type = "b";
+               final String value = "c";
+               final StringifiedAccumulatorResult original = new 
StringifiedAccumulatorResult(name, type, value);
+
+               // Confirm no funny business in the constructor to getter 
pathway
+               assertEquals(name, original.getName());
+               assertEquals(type, original.getType());
+               assertEquals(value, original.getValue());
+
+               final StringifiedAccumulatorResult copy = 
CommonTestUtils.createCopySerializable(original);
+
+               // Copy should have equivalent core fields
+               assertEquals(name, copy.getName());
+               assertEquals(type, copy.getType());
+               assertEquals(value, copy.getValue());
+       }
+
+       @Test
+       public void 
stringifyingResultsShouldIncorporateAccumulatorLocalValueDirectly() {
+               final String name = "a";
+               final int targetValue = 314159;
+               final IntCounter acc = new IntCounter();
+               acc.add(targetValue);
+               final Map<String, Accumulator<?, ?>> accumulatorMap = new 
HashMap<>();
+               accumulatorMap.put(name, acc);
+
+
+               final StringifiedAccumulatorResult[] results = 
StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
+
+               assertEquals(1, results.length);
+
+               final StringifiedAccumulatorResult firstResult = results[0];
+               assertEquals(name, firstResult.getName());
+               assertEquals("IntCounter", firstResult.getType());
+               assertEquals(Integer.toString(targetValue), 
firstResult.getValue());
+       }
+
+       @Test
+       public void 
stringifyingResultsShouldReportNullLocalValueAsNonnullValueString() {
+               final String name = "a";
+               final NullBearingAccumulator acc = new NullBearingAccumulator();
+               final Map<String, Accumulator<?, ?>> accumulatorMap = new 
HashMap<>();
+               accumulatorMap.put(name, acc);
+
+               final StringifiedAccumulatorResult[] results = 
StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
+
+               assertEquals(1, results.length);
+
+               // Note the use of a String with a content of "null" rather 
than a null value
+               final StringifiedAccumulatorResult firstResult = results[0];
+               assertEquals(name, firstResult.getName());
+               assertEquals("NullBearingAccumulator", firstResult.getType());
+               assertEquals("null", firstResult.getValue());
+       }
+
+       @Test
+       public void 
stringifyingResultsShouldReportNullAccumulatorWithNonnullValueAndTypeString() {
+               final String name = "a";
+               final Map<String, Accumulator<?, ?>> accumulatorMap = new 
HashMap<>();
+               accumulatorMap.put(name, null);
+
+               final StringifiedAccumulatorResult[] results = 
StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
+
+               assertEquals(1, results.length);
+
+               // Note the use of String values with content of "null" rather 
than null values
+               final StringifiedAccumulatorResult firstResult = results[0];
+               assertEquals(name, firstResult.getName());
+               assertEquals("null", firstResult.getType());
+               assertEquals("null", firstResult.getValue());
+       }
+
+       private static class NullBearingAccumulator implements 
SimpleAccumulator<Serializable> {
+
+               @Override
+               public void add(Serializable value) {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public Serializable getLocalValue() {
+                       return null;
+               }
+
+               @Override
+               public void resetLocal() {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public void merge(Accumulator<Serializable, Serializable> 
other) {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public Accumulator<Serializable, Serializable> clone() {
+                       return new NullBearingAccumulator();
+               }
+       }
+}

Reply via email to