[FLINK-3126] [core] Remove accumulator type from "value" in web frontend
This closes #1868 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db85f385 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db85f385 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db85f385 Branch: refs/heads/master Commit: db85f385846a6541c0707ed1ba6fed78446423b5 Parents: 342db48 Author: Zack Pierce <[email protected]> Authored: Mon Apr 11 10:17:34 2016 -0700 Committer: Stephan Ewen <[email protected]> Committed: Wed Apr 13 01:10:55 2016 +0200 ---------------------------------------------------------------------- .../StringifiedAccumulatorResult.java | 14 +- .../StringifiedAccumulatorResultTest.java | 138 +++++++++++++++++++ 2 files changed, 149 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/db85f385/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java index a0d1eda..c4faad1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java @@ -55,6 +55,9 @@ public class StringifiedAccumulatorResult implements java.io.Serializable{ // Utilities // ------------------------------------------------------------------------ + /** + * Flatten a map of accumulator names to Accumulator instances into an array of StringifiedAccumulatorResult values + */ public static StringifiedAccumulatorResult[] stringifyAccumulatorResults(Map<String, Accumulator<?, ?>> accs) { if (accs == null || accs.isEmpty()) { return new StringifiedAccumulatorResult[0]; @@ -65,9 +68,14 @@ public class StringifiedAccumulatorResult implements java.io.Serializable{ int i = 0; for (Map.Entry<String, Accumulator<?, ?>> entry : accs.entrySet()) { StringifiedAccumulatorResult result; - Accumulator<?, ?> value = entry.getValue(); - if (value != null) { - result = new StringifiedAccumulatorResult(entry.getKey(), value.getClass().getSimpleName(), value.toString()); + Accumulator<?, ?> accumulator = entry.getValue(); + if (accumulator != null) { + Object localValue = accumulator.getLocalValue(); + if (localValue != null) { + result = new StringifiedAccumulatorResult(entry.getKey(), accumulator.getClass().getSimpleName(), localValue.toString()); + } else { + result = new StringifiedAccumulatorResult(entry.getKey(), accumulator.getClass().getSimpleName(), "null"); + } } else { result = new StringifiedAccumulatorResult(entry.getKey(), "null", "null"); } http://git-wip-us.apache.org/repos/asf/flink/blob/db85f385/flink-runtime/src/test/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResultTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResultTest.java new file mode 100644 index 0000000..e6d637b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResultTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.accumulators; + +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.accumulators.SimpleAccumulator; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.junit.Test; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class StringifiedAccumulatorResultTest { + + @Test + public void testSerialization() throws IOException { + final String name = "a"; + final String type = "b"; + final String value = "c"; + final StringifiedAccumulatorResult original = new StringifiedAccumulatorResult(name, type, value); + + // Confirm no funny business in the constructor to getter pathway + assertEquals(name, original.getName()); + assertEquals(type, original.getType()); + assertEquals(value, original.getValue()); + + final StringifiedAccumulatorResult copy = CommonTestUtils.createCopySerializable(original); + + // Copy should have equivalent core fields + assertEquals(name, copy.getName()); + assertEquals(type, copy.getType()); + assertEquals(value, copy.getValue()); + } + + @Test + public void stringifyingResultsShouldIncorporateAccumulatorLocalValueDirectly() { + final String name = "a"; + final int targetValue = 314159; + final IntCounter acc = new IntCounter(); + acc.add(targetValue); + final Map<String, Accumulator<?, ?>> accumulatorMap = new HashMap<>(); + accumulatorMap.put(name, acc); + + + final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap); + + assertEquals(1, results.length); + + final StringifiedAccumulatorResult firstResult = results[0]; + assertEquals(name, firstResult.getName()); + assertEquals("IntCounter", firstResult.getType()); + assertEquals(Integer.toString(targetValue), firstResult.getValue()); + } + + @Test + public void stringifyingResultsShouldReportNullLocalValueAsNonnullValueString() { + final String name = "a"; + final NullBearingAccumulator acc = new NullBearingAccumulator(); + final Map<String, Accumulator<?, ?>> accumulatorMap = new HashMap<>(); + accumulatorMap.put(name, acc); + + final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap); + + assertEquals(1, results.length); + + // Note the use of a String with a content of "null" rather than a null value + final StringifiedAccumulatorResult firstResult = results[0]; + assertEquals(name, firstResult.getName()); + assertEquals("NullBearingAccumulator", firstResult.getType()); + assertEquals("null", firstResult.getValue()); + } + + @Test + public void stringifyingResultsShouldReportNullAccumulatorWithNonnullValueAndTypeString() { + final String name = "a"; + final Map<String, Accumulator<?, ?>> accumulatorMap = new HashMap<>(); + accumulatorMap.put(name, null); + + final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap); + + assertEquals(1, results.length); + + // Note the use of String values with content of "null" rather than null values + final StringifiedAccumulatorResult firstResult = results[0]; + assertEquals(name, firstResult.getName()); + assertEquals("null", firstResult.getType()); + assertEquals("null", firstResult.getValue()); + } + + private static class NullBearingAccumulator implements SimpleAccumulator<Serializable> { + + @Override + public void add(Serializable value) { + throw new UnsupportedOperationException(); + } + + @Override + public Serializable getLocalValue() { + return null; + } + + @Override + public void resetLocal() { + throw new UnsupportedOperationException(); + } + + @Override + public void merge(Accumulator<Serializable, Serializable> other) { + throw new UnsupportedOperationException(); + } + + @Override + public Accumulator<Serializable, Serializable> clone() { + return new NullBearingAccumulator(); + } + } +}
