Repository: beam Updated Branches: refs/heads/mr-runner 5fa0b14d2 -> b6f22aa76
Changed the mutation detector to be based on structural value only Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e3f6d6f1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e3f6d6f1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e3f6d6f1 Branch: refs/heads/mr-runner Commit: e3f6d6f1f0c1f9c9ca00ade17c4afedb7d3fef6b Parents: 239319b Author: Innocent Djiofack <[email protected]> Authored: Tue Jul 25 23:41:02 2017 -0400 Committer: Luke Cwik <[email protected]> Committed: Wed Nov 1 10:40:43 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/util/CoderUtils.java | 2 +- .../apache/beam/sdk/util/MutationDetectors.java | 79 +++++++------------- .../beam/sdk/util/MutationDetectorsTest.java | 56 ++++++++++++++ 3 files changed, 85 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e3f6d6f1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java index da77829..cfd8fde 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java @@ -153,7 +153,7 @@ public final class CoderUtils { * {@link Coder}. */ public static <T> T clone(Coder<T> coder, T value) throws CoderException { - return decodeFromByteArray(coder, encodeToByteArray(coder, value, Coder.Context.OUTER)); + return decodeFromByteArray(coder, encodeToByteArray(coder, value)); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/e3f6d6f1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MutationDetectors.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MutationDetectors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MutationDetectors.java index 3b593bf..3556667 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MutationDetectors.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MutationDetectors.java @@ -17,8 +17,6 @@ */ package org.apache.beam.sdk.util; -import java.util.Arrays; -import java.util.Objects; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -30,12 +28,12 @@ public class MutationDetectors { private MutationDetectors() {} /** - * Creates a new {@code MutationDetector} for the provided {@code value} that uses the provided - * {@link Coder} to perform deep copies and comparisons by serializing and deserializing values. - * - * <p>It is permissible for {@code value} to be {@code null}. Since {@code null} is immutable, - * the mutation check will always succeed. - */ + * Creates a new {@code MutationDetector} for the provided {@code value} that uses the provided + * {@link Coder} to perform deep copies and comparisons by serializing and deserializing values. + * + * <p>It is permissible for {@code value} to be {@code null}. Since {@code null} is immutable, + * the mutation check will always succeed. + */ public static <T> MutationDetector forValueWithCoder(T value, Coder<T> coder) throws CoderException { if (value == null) { @@ -59,7 +57,6 @@ public class MutationDetectors { * A {@link MutationDetector} for {@code null}, which is immutable. */ private static class NoopMutationDetector implements MutationDetector { - @Override public void verifyUnmodified() { } @@ -76,6 +73,7 @@ public class MutationDetectors { private static class CodedValueMutationDetector<T> implements MutationDetector { private final Coder<T> coder; + private final T clonedOriginalValue; /** * A saved pointer to an in-memory value provided upon construction, which we will check for @@ -97,11 +95,23 @@ public class MutationDetectors { private final T clonedOriginalObject; /** + * The structural value from {@link #possiblyModifiedObject}. It will be used during every call + * to {@link #verifyUnmodified}, which could be called many times throughout the lifetime of + * this {@link CodedValueMutationDetector}. + */ + private final Object originalStructuralValue; + + /** * Create a mutation detector for the provided {@code value}, using the provided {@link Coder} * for cloning and checking serialized forms for equality. */ public CodedValueMutationDetector(T value, Coder<T> coder) throws CoderException { this.coder = coder; + // We need to clone the original value before getting it's structural value. + // If the object is consistent with equals, the Structural value will be the exact + // same object reference making it impossible to detect changes. + clonedOriginalValue = CoderUtils.clone(coder, value); + this.originalStructuralValue = coder.structuralValue(clonedOriginalValue); this.possiblyModifiedObject = value; this.encodedOriginalObject = CoderUtils.encodeToByteArray(coder, value); this.clonedOriginalObject = CoderUtils.decodeFromByteArray(coder, encodedOriginalObject); @@ -117,49 +127,16 @@ public class MutationDetectors { } private void verifyUnmodifiedThrowingCheckedExceptions() throws CoderException { - // If either object believes they are equal, we trust that and short-circuit deeper checks. - if (Objects.equals(possiblyModifiedObject, clonedOriginalObject) - || Objects.equals(clonedOriginalObject, possiblyModifiedObject)) { - return; - } - - // Since retainedObject is in general an instance of a subclass of T, when it is cloned to - // clonedObject using a Coder<T>, the two will generally be equivalent viewed as a T, but in - // general neither retainedObject.equals(clonedObject) nor clonedObject.equals(retainedObject) - // will hold. - // - // For example, CoderUtils.clone(IterableCoder<Integer>, IterableSubclass<Integer>) will - // produce an ArrayList<Integer> with the same contents as the IterableSubclass, but the - // latter will quite reasonably not consider itself equivalent to an ArrayList (and vice - // versa). - // - // To enable a reasonable comparison, we clone retainedObject again here, converting it to - // the same sort of T that the Coder<T> output when it created clonedObject. - T clonedPossiblyModifiedObject = CoderUtils.clone(coder, possiblyModifiedObject); - - // If deepEquals() then we trust the equals implementation. - // This deliberately allows fields to escape this check. - if (Objects.deepEquals(clonedPossiblyModifiedObject, clonedOriginalObject)) { - return; + // Since there is no guarantee that cloning an object via the coder will + // return the exact same type as value, We are cloning the possiblyModifiedObject + // before getting it's structural value. This way we are guaranteed to compare the same + // types. + T possiblyModifiedClonedValue = CoderUtils.clone(coder, possiblyModifiedObject); + Object newStructuralValue = coder.structuralValue(possiblyModifiedClonedValue); + if (originalStructuralValue.equals(newStructuralValue)) { + return; } - - // If not deepEquals(), the class may just have a poor equals() implementation. - // So we next try checking their serialized forms. We re-serialize instead of checking - // encodedObject, because the Coder may treat it differently. - // - // For example, an unbounded Iterable will be encoded in an unbounded way, but decoded into an - // ArrayList, which will then be re-encoded in a bounded format. So we really do need to - // encode-decode-encode retainedObject. - if (Arrays.equals( - CoderUtils.encodeToByteArray(coder, clonedOriginalObject), - CoderUtils.encodeToByteArray(coder, clonedPossiblyModifiedObject))) { - return; - } - - // If we got here, then they are not deepEquals() and do not have deepEquals() encodings. - // Even if there is some conceptual sense in which the objects are equivalent, it has not - // been adequately expressed in code. - illegalMutation(clonedOriginalObject, clonedPossiblyModifiedObject); + illegalMutation(clonedOriginalObject, possiblyModifiedClonedValue); } private void illegalMutation(T previousValue, T newValue) throws CoderException { http://git-wip-us.apache.org/repos/asf/beam/blob/e3f6d6f1/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MutationDetectorsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MutationDetectorsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MutationDetectorsTest.java index ebd8297..29e727b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MutationDetectorsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MutationDetectorsTest.java @@ -20,11 +20,17 @@ package org.apache.beam.sdk.util; import com.google.common.collect.FluentIterable; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.VarIntCoder; @@ -39,10 +45,48 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class MutationDetectorsTest { + /** + * Solely used to test that immutability is enforced from the SDK's perspective and not from + * Java's {@link Object#equals} method. Note that we do not expect users to create such + * an implementation. + */ + private class ForSDKMutationDetectionTestCoder extends AtomicCoder<Object> { + // Use a unique instance that is returned as the structural value making all structural + // values of this coder equivalent to each other. + private final Object uniqueInstance = new Object(); + + @Override + public void encode(Object value, OutputStream outStream) throws IOException { + } + + @Override + public Object decode(InputStream inStream) throws IOException { + return new AtomicInteger(); + } + + @Override + public Object structuralValue(Object value) { + return uniqueInstance; + } + } @Rule public ExpectedException thrown = ExpectedException.none(); /** + * Tests that mutation detection is enforced from the SDK point of view + * (Based on the {@link Coder#structuralValue}) and not from the Java's equals method. + */ + @Test + public void testMutationBasedOnStructuralValue() throws Exception { + AtomicInteger value = new AtomicInteger(); + MutationDetector detector = + MutationDetectors.forValueWithCoder(value, new ForSDKMutationDetectionTestCoder()); + // Even though we modified the value, we are relying on the fact that the structural + // value will be used to compare equality + value.incrementAndGet(); + detector.verifyUnmodified(); + } + /** * Tests that {@link MutationDetectors#forValueWithCoder} detects a mutation to a list. */ @Test @@ -93,6 +137,18 @@ public class MutationDetectorsTest { } /** + * Tests that {@link MutationDetectors#forValueWithCoder} does not false positive on a + * {@link Set} coded as an {@link Iterable}. + */ + @Test + public void testStructuralValue() throws Exception { + Set<Integer> value = Sets.newHashSet(Arrays.asList(1, 2, 3, 4)); + MutationDetector detector = + MutationDetectors.forValueWithCoder(value, IterableCoder.of(VarIntCoder.of())); + detector.verifyUnmodified(); + } + + /** * Tests that {@link MutationDetectors#forValueWithCoder} does not false positive on an * {@link Iterable} that is not known to be bounded; after coder-based cloning the bound * will be known and it will be a {@link List} so it will encode more compactly the second
