Repository: incubator-beam
Updated Branches:
  refs/heads/master 460505175 -> f8f3745a8


Add ModelEnforcements

ModelEnforcements ensure that user-written code conforms to the model,
in order to ensure that it is portable between runners.

Add EncodabilityEnforcement and ImmutabilityEnforcement.
EncodabilityEnforcement ensures that all values can be encoded.
ImmutabilityEnforcement ensures that values are not mutated within the
processing of a Bundle.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4343e6f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4343e6f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4343e6f9

Branch: refs/heads/master
Commit: 4343e6f99b8a88d38bcdaf340fc8d2f0a5125b94
Parents: 9247ad7
Author: Thomas Groh <[email protected]>
Authored: Fri Mar 4 08:53:50 2016 -0800
Committer: Kenneth Knowles <[email protected]>
Committed: Mon Mar 28 10:18:39 2016 -0700

----------------------------------------------------------------------
 .../inprocess/AbstractModelEnforcement.java     |  36 +++
 .../EncodabilityEnforcementFactory.java         |  69 +++++
 .../ImmutabilityEnforcementFactory.java         |  94 +++++++
 .../sdk/runners/inprocess/ModelEnforcement.java |  61 +++++
 .../inprocess/ModelEnforcementFactory.java      |  28 ++
 .../EncodabilityEnforcementFactoryTest.java     | 260 +++++++++++++++++++
 .../ImmutabilityEnforcementFactoryTest.java     | 130 ++++++++++
 7 files changed, 678 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4343e6f9/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java
new file mode 100644
index 0000000..32b2a67
--- /dev/null
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+
+/**
+ * An abstract {@link ModelEnforcement} that provides default empty 
implementations for each method.
+ */
+abstract class AbstractModelEnforcement<T> implements ModelEnforcement<T> {
+  @Override
+  public void beforeElement(WindowedValue<T> element) {}
+
+  @Override
+  public void afterElement(WindowedValue<T> element) {}
+
+  @Override
+  public void afterFinish(
+      CommittedBundle<T> input,
+      InProcessTransformResult result,
+      Iterable<? extends CommittedBundle<?>> outputs) {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4343e6f9/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java
new file mode 100644
index 0000000..0e38b55
--- /dev/null
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.util.CoderUtils;
+import com.google.cloud.dataflow.sdk.util.SerializableUtils;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+/**
+ * Enforces that all elements in a {@link PCollection} can be encoded using 
that
+ * {@link PCollection PCollection's} {@link Coder}.
+ */
+class EncodabilityEnforcementFactory implements ModelEnforcementFactory {
+  public static EncodabilityEnforcementFactory create() {
+    return new EncodabilityEnforcementFactory();
+  }
+
+  @Override
+  public <T> ModelEnforcement<T> forBundle(
+      CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) {
+    return new EncodabilityEnforcement<>(input);
+  }
+
+  private static class EncodabilityEnforcement<T> extends 
AbstractModelEnforcement<T> {
+    private Coder<T> coder;
+
+    public EncodabilityEnforcement(CommittedBundle<T> input) {
+      coder = SerializableUtils.clone(input.getPCollection().getCoder());
+    }
+
+    @Override
+    public void beforeElement(WindowedValue<T> element) {
+      try {
+        T clone = CoderUtils.clone(coder, element.getValue());
+        if (coder.consistentWithEquals()) {
+          checkArgument(
+              
coder.structuralValue(element.getValue()).equals(coder.structuralValue(clone)),
+              "Coder %s of class %s does not maintain structural value 
equality"
+                  + " on input element %s",
+              coder,
+              coder.getClass().getSimpleName(),
+              element.getValue());
+        }
+      } catch (Exception e) {
+        throw UserCodeException.wrap(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4343e6f9/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
new file mode 100644
index 0000000..dfc56a9
--- /dev/null
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
+import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
+import com.google.cloud.dataflow.sdk.util.MutationDetector;
+import com.google.cloud.dataflow.sdk.util.MutationDetectors;
+import com.google.cloud.dataflow.sdk.util.SerializableUtils;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+
+import java.util.IdentityHashMap;
+import java.util.Map;
+
+/**
+ * {@link ModelEnforcement} that enforces elements are not modified over the 
course of processing
+ * an element.
+ *
+ * <p>Implies {@link EncodabilityEnforcment}.
+ */
+class ImmutabilityEnforcementFactory implements ModelEnforcementFactory {
+  public static ModelEnforcementFactory create() {
+    return new ImmutabilityEnforcementFactory();
+  }
+
+  @Override
+  public <T> ModelEnforcement<T> forBundle(
+      CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) {
+    return new ImmutabilityCheckingEnforcement<T>(input, consumer);
+  }
+
+  private static class ImmutabilityCheckingEnforcement<T> extends 
AbstractModelEnforcement<T> {
+    private final AppliedPTransform<?, ?, ?> transform;
+    private final Map<WindowedValue<T>, MutationDetector> mutationElements;
+    private final Coder<T> coder;
+
+    private ImmutabilityCheckingEnforcement(
+        CommittedBundle<T> input, AppliedPTransform<?, ?, ?> transform) {
+      this.transform = transform;
+      coder = SerializableUtils.clone(input.getPCollection().getCoder());
+      mutationElements = new IdentityHashMap<>();
+    }
+
+    @Override
+    public void beforeElement(WindowedValue<T> element) {
+      try {
+        mutationElements.put(
+            element, MutationDetectors.forValueWithCoder(element.getValue(), 
coder));
+      } catch (CoderException e) {
+        throw UserCodeException.wrap(e);
+      }
+    }
+
+    @Override
+    public void afterFinish(
+        CommittedBundle<T> input,
+        InProcessTransformResult result,
+        Iterable<? extends CommittedBundle<?>> outputs) {
+      for (MutationDetector detector : mutationElements.values()) {
+        try {
+          detector.verifyUnmodified();
+        } catch (IllegalMutationException e) {
+          throw UserCodeException.wrap(
+              new IllegalMutationException(
+                  String.format(
+                      "PTransform %s illegaly mutated value %s of class %s."
+                          + " Input values must not be mutated in any way.",
+                      transform.getFullName(),
+                      e.getSavedValue(),
+                      e.getSavedValue().getClass()),
+                  e.getSavedValue(),
+                  e.getNewValue()));
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4343e6f9/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java
new file mode 100644
index 0000000..66bea37
--- /dev/null
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+/**
+ * Enforcement tools that verify that executing code conforms to the model.
+ *
+ * <p>ModelEnforcement is performed on a per-element and per-bundle basis. The
+ * {@link ModelEnforcement} is provided with the input bundle as part of
+ * {@link ModelEnforcementFactory#forBundle(CommittedBundle, 
AppliedPTransform)}, each element
+ * before and after that element is provided to an underlying {@link 
TransformEvaluator}, and the
+ * output {@link InProcessTransformResult} and committed output bundles after 
the
+ * {@link TransformEvaluator} has completed.
+ *
+ * <p>Typically, {@link ModelEnforcement} will obtain required metadata (such 
as the {@link Coder}
+ * of the input {@link PCollection} on construction, and then enforce 
per-element behavior
+ * (such as the immutability of input elements). When the element is output or 
the bundle is
+ * completed, the required conditions can be enforced across all elements.
+ */
+public interface ModelEnforcement<T> {
+  /**
+   * Called before a call to {@link 
TransformEvaluator#processElement(WindowedValue)} on the
+   * provided {@link WindowedValue}.
+   */
+  void beforeElement(WindowedValue<T> element);
+
+  /**
+   * Called after a call to {@link 
TransformEvaluator#processElement(WindowedValue)} on the
+   * provided {@link WindowedValue}.
+   */
+  void afterElement(WindowedValue<T> element);
+
+  /**
+   * Called after a bundle has been completed and {@link 
TransformEvaluator#finishBundle()} has been
+   * called, producing the provided {@link InProcessTransformResult} and
+   * {@link CommittedBundle output bundles}.
+   */
+  void afterFinish(
+      CommittedBundle<T> input,
+      InProcessTransformResult result,
+      Iterable<? extends CommittedBundle<?>> outputs);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4343e6f9/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java
new file mode 100644
index 0000000..66c01b3
--- /dev/null
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+
+/**
+ * Creates {@link ModelEnforcement} instances for an {@link AppliedPTransform} 
on an input
+ * {@link CommittedBundle bundle}. {@link ModelEnforcement} instances are 
created before the
+ * {@link TransformEvaluator} is created.
+ */
+public interface ModelEnforcementFactory {
+  <T> ModelEnforcement<T> forBundle(CommittedBundle<T> input, 
AppliedPTransform<?, ?, ?> consumer);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4343e6f9/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
new file mode 100644
index 0000000..dcc9775
--- /dev/null
+++ 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
@@ -0,0 +1,260 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static org.hamcrest.Matchers.isA;
+
+import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
+import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
+import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+
+/**
+ * Tests for {@link EncodabilityEnforcementFactory}.
+ */
+public class EncodabilityEnforcementFactoryTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  private EncodabilityEnforcementFactory factory = 
EncodabilityEnforcementFactory.create();
+
+  @Test
+  public void encodeFailsThrows() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Record> unencodable =
+        p.apply(Create.of(new Record()).withCoder(new RecordNoEncodeCoder()));
+    AppliedPTransform<?, ?, ?> consumer =
+        
unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
+
+    WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new 
Record());
+    CommittedBundle<Record> input =
+        InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
+    ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(CoderException.class));
+    thrown.expectMessage("Encode not allowed");
+    enforcement.beforeElement(record);
+  }
+
+  @Test
+  public void decodeFailsThrows() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Record> unencodable =
+        p.apply(Create.of(new Record()).withCoder(new RecordNoDecodeCoder()));
+    AppliedPTransform<?, ?, ?> consumer =
+        
unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
+    WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new 
Record());
+
+    CommittedBundle<Record> input =
+        InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
+    ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(CoderException.class));
+    thrown.expectMessage("Decode not allowed");
+    enforcement.beforeElement(record);
+  }
+
+  @Test
+  public void consistentWithEqualsStructuralValueNotEqualThrows() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Record> unencodable =
+        p.apply(Create.of(new Record()).withCoder(new 
RecordStructuralValueCoder()));
+    AppliedPTransform<?, ?, ?> consumer =
+        
unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
+
+    WindowedValue<Record> record =
+        WindowedValue.<Record>valueInGlobalWindow(
+            new Record() {
+              @Override
+              public String toString() {
+                return "OriginalRecord";
+              }
+            });
+
+    CommittedBundle<Record> input =
+        InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
+    ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(IllegalArgumentException.class));
+    thrown.expectMessage("does not maintain structural value equality");
+    thrown.expectMessage(RecordStructuralValueCoder.class.getSimpleName());
+    thrown.expectMessage("OriginalRecord");
+    enforcement.beforeElement(record);
+  }
+
+  @Test
+  public void notConsistentWithEqualsStructuralValueNotEqualSucceeds() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Record> unencodable =
+        p.apply(
+            Create.of(new Record())
+                .withCoder(new 
RecordNotConsistentWithEqualsStructuralValueCoder()));
+    AppliedPTransform<?, ?, ?> consumer =
+        
unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
+
+    WindowedValue<Record> record = 
WindowedValue.<Record>valueInGlobalWindow(new Record());
+
+    CommittedBundle<Record> input =
+        InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
+    ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
+
+    enforcement.beforeElement(record);
+    enforcement.afterElement(record);
+    enforcement.afterFinish(
+        input,
+        StepTransformResult.withoutHold(consumer).build(),
+        Collections.<CommittedBundle<?>>emptyList());
+  }
+
+  @Test
+  public void structurallyEqualResultsSucceeds() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Integer> unencodable = 
p.apply(Create.of(1).withCoder(VarIntCoder.of()));
+    AppliedPTransform<?, ?, ?> consumer =
+        
unencodable.apply(Count.<Integer>globally()).getProducingTransformInternal();
+
+    WindowedValue<Integer> value = WindowedValue.valueInGlobalWindow(1);
+
+    CommittedBundle<Integer> input =
+        InProcessBundle.unkeyed(unencodable).add(value).commit(Instant.now());
+    ModelEnforcement<Integer> enforcement = factory.forBundle(input, consumer);
+
+    enforcement.beforeElement(value);
+    enforcement.afterElement(value);
+    enforcement.afterFinish(
+        input,
+        StepTransformResult.withoutHold(consumer).build(),
+        Collections.<CommittedBundle<?>>emptyList());
+  }
+
+  private static class Record {}
+  private static class RecordNoEncodeCoder extends AtomicCoder<Record> {
+
+    @Override
+    public void encode(
+        Record value,
+        OutputStream outStream,
+        com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+        throws CoderException, IOException {
+      throw new CoderException("Encode not allowed");
+    }
+
+    @Override
+    public Record decode(
+        InputStream inStream, 
com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+        throws CoderException, IOException {
+      return null;
+    }
+  }
+
+  private static class RecordNoDecodeCoder extends AtomicCoder<Record> {
+    @Override
+    public void encode(
+        Record value,
+        OutputStream outStream,
+        com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+        throws CoderException, IOException {}
+
+    @Override
+    public Record decode(
+        InputStream inStream, 
com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+        throws CoderException, IOException {
+      throw new CoderException("Decode not allowed");
+    }
+  }
+
+  private static class RecordStructuralValueCoder extends AtomicCoder<Record> {
+    @Override
+    public void encode(
+        Record value,
+        OutputStream outStream,
+        com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+        throws CoderException, IOException {}
+
+    @Override
+    public Record decode(
+        InputStream inStream, 
com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+        throws CoderException, IOException {
+      return new Record() {
+        @Override
+        public String toString() {
+          return "DecodedRecord";
+        }
+      };
+    }
+
+    @Override
+    public boolean consistentWithEquals() {
+      return true;
+    }
+
+    @Override
+    public Object structuralValue(Record value) {
+      return value;
+    }
+  }
+
+  private static class RecordNotConsistentWithEqualsStructuralValueCoder
+      extends AtomicCoder<Record> {
+    @Override
+    public void encode(
+        Record value,
+        OutputStream outStream,
+        com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+        throws CoderException, IOException {}
+
+    @Override
+    public Record decode(
+        InputStream inStream, 
com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+        throws CoderException, IOException {
+      return new Record() {
+        @Override
+        public String toString() {
+          return "DecodedRecord";
+        }
+      };
+    }
+
+    @Override
+    public boolean consistentWithEquals() {
+      return false;
+    }
+
+    @Override
+    public Object structuralValue(Record value) {
+      return value;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4343e6f9/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
new file mode 100644
index 0000000..87e12ce
--- /dev/null
+++ 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static org.hamcrest.Matchers.isA;
+
+import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+import java.util.Collections;
+
+/**
+ * Tests for {@link ImmutabilityEnforcementFactory}.
+ */
+@RunWith(JUnit4.class)
+public class ImmutabilityEnforcementFactoryTest implements Serializable {
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+  private transient ImmutabilityEnforcementFactory factory;
+  private transient PCollection<byte[]> pcollection;
+  private transient AppliedPTransform<?, ?, ?> consumer;
+
+  @Before
+  public void setup() {
+    factory = new ImmutabilityEnforcementFactory();
+    TestPipeline p = TestPipeline.create();
+    pcollection =
+        p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes()))
+            .apply(
+                ParDo.of(
+                    new DoFn<byte[], byte[]>() {
+                      @Override
+                      public void processElement(DoFn<byte[], 
byte[]>.ProcessContext c)
+                          throws Exception {
+                        c.element()[0] = 'b';
+                      }
+                    }));
+    consumer = 
pcollection.apply(Count.<byte[]>globally()).getProducingTransformInternal();
+  }
+
+  @Test
+  public void unchangedSucceeds() {
+    WindowedValue<byte[]> element = 
WindowedValue.valueInGlobalWindow("bar".getBytes());
+    CommittedBundle<byte[]> elements =
+        
InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now());
+
+    ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, 
consumer);
+    enforcement.beforeElement(element);
+    enforcement.afterElement(element);
+    enforcement.afterFinish(
+        elements,
+        StepTransformResult.withoutHold(consumer).build(),
+        Collections.<CommittedBundle<?>>emptyList());
+  }
+
+  @Test
+  public void mutatedDuringProcessElementThrows() {
+    WindowedValue<byte[]> element = 
WindowedValue.valueInGlobalWindow("bar".getBytes());
+    CommittedBundle<byte[]> elements =
+        
InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now());
+
+    ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, 
consumer);
+    enforcement.beforeElement(element);
+    element.getValue()[0] = 'f';
+    thrown.equals(UserCodeException.class);
+    thrown.expectCause(isA(IllegalMutationException.class));
+    thrown.expectMessage(consumer.getFullName());
+    thrown.expectMessage("illegaly mutated");
+    thrown.expectMessage("Input values must not be mutated");
+    enforcement.afterElement(element);
+    enforcement.afterFinish(
+        elements,
+        StepTransformResult.withoutHold(consumer).build(),
+        Collections.<CommittedBundle<?>>emptyList());
+  }
+
+  @Test
+  public void mutatedAfterProcessElementFails() {
+
+    WindowedValue<byte[]> element = 
WindowedValue.valueInGlobalWindow("bar".getBytes());
+    CommittedBundle<byte[]> elements =
+        
InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now());
+
+    ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, 
consumer);
+    enforcement.beforeElement(element);
+    enforcement.afterElement(element);
+
+    element.getValue()[0] = 'f';
+    thrown.equals(UserCodeException.class);
+    thrown.expectCause(isA(IllegalMutationException.class));
+    thrown.expectMessage(consumer.getFullName());
+    thrown.expectMessage("illegaly mutated");
+    thrown.expectMessage("Input values must not be mutated");
+    enforcement.afterFinish(
+        elements,
+        StepTransformResult.withoutHold(consumer).build(),
+        Collections.<CommittedBundle<?>>emptyList());
+  }
+}
+

Reply via email to