Repository: incubator-beam Updated Branches: refs/heads/master f8f3745a8 -> 4f91c2eae
Move Model Enforcements into the proper module Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/111e3932 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/111e3932 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/111e3932 Branch: refs/heads/master Commit: 111e393219adfd7a41f8545f66a80c638dc38165 Parents: f8f3745 Author: Thomas Groh <[email protected]> Authored: Mon Mar 28 12:02:58 2016 -0700 Committer: Thomas Groh <[email protected]> Committed: Mon Mar 28 13:46:04 2016 -0700 ---------------------------------------------------------------------- .../inprocess/AbstractModelEnforcement.java | 36 -------- .../EncodabilityEnforcementFactory.java | 69 -------------- .../ImmutabilityEnforcementFactory.java | 94 -------------------- .../sdk/runners/inprocess/ModelEnforcement.java | 61 ------------- .../inprocess/ModelEnforcementFactory.java | 28 ------ .../inprocess/AbstractModelEnforcement.java | 36 ++++++++ .../EncodabilityEnforcementFactory.java | 69 ++++++++++++++ .../ImmutabilityEnforcementFactory.java | 94 ++++++++++++++++++++ .../sdk/runners/inprocess/ModelEnforcement.java | 61 +++++++++++++ .../inprocess/ModelEnforcementFactory.java | 28 ++++++ 10 files changed, 288 insertions(+), 288 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java deleted file mode 100644 index 32b2a67..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (C) 2016 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.dataflow.sdk.runners.inprocess; - -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.util.WindowedValue; - -/** - * An abstract {@link ModelEnforcement} that provides default empty implementations for each method. - */ -abstract class AbstractModelEnforcement<T> implements ModelEnforcement<T> { - @Override - public void beforeElement(WindowedValue<T> element) {} - - @Override - public void afterElement(WindowedValue<T> element) {} - - @Override - public void afterFinish( - CommittedBundle<T> input, - InProcessTransformResult result, - Iterable<? extends CommittedBundle<?>> outputs) {} -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java deleted file mode 100644 index 0e38b55..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (C) 2016 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.dataflow.sdk.runners.inprocess; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; -import com.google.cloud.dataflow.sdk.util.CoderUtils; -import com.google.cloud.dataflow.sdk.util.SerializableUtils; -import com.google.cloud.dataflow.sdk.util.UserCodeException; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.values.PCollection; - -/** - * Enforces that all elements in a {@link PCollection} can be encoded using that - * {@link PCollection PCollection's} {@link Coder}. - */ -class EncodabilityEnforcementFactory implements ModelEnforcementFactory { - public static EncodabilityEnforcementFactory create() { - return new EncodabilityEnforcementFactory(); - } - - @Override - public <T> ModelEnforcement<T> forBundle( - CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) { - return new EncodabilityEnforcement<>(input); - } - - private static class EncodabilityEnforcement<T> extends AbstractModelEnforcement<T> { - private Coder<T> coder; - - public EncodabilityEnforcement(CommittedBundle<T> input) { - coder = SerializableUtils.clone(input.getPCollection().getCoder()); - } - - @Override - public void beforeElement(WindowedValue<T> element) { - try { - T clone = CoderUtils.clone(coder, element.getValue()); - if (coder.consistentWithEquals()) { - checkArgument( - coder.structuralValue(element.getValue()).equals(coder.structuralValue(clone)), - "Coder %s of class %s does not maintain structural value equality" - + " on input element %s", - coder, - coder.getClass().getSimpleName(), - element.getValue()); - } - } catch (Exception e) { - throw UserCodeException.wrap(e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java deleted file mode 100644 index dfc56a9..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright (C) 2016 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.dataflow.sdk.runners.inprocess; - -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.CoderException; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; -import com.google.cloud.dataflow.sdk.util.IllegalMutationException; -import com.google.cloud.dataflow.sdk.util.MutationDetector; -import com.google.cloud.dataflow.sdk.util.MutationDetectors; -import com.google.cloud.dataflow.sdk.util.SerializableUtils; -import com.google.cloud.dataflow.sdk.util.UserCodeException; -import com.google.cloud.dataflow.sdk.util.WindowedValue; - -import java.util.IdentityHashMap; -import java.util.Map; - -/** - * {@link ModelEnforcement} that enforces elements are not modified over the course of processing - * an element. - * - * <p>Implies {@link EncodabilityEnforcment}. - */ -class ImmutabilityEnforcementFactory implements ModelEnforcementFactory { - public static ModelEnforcementFactory create() { - return new ImmutabilityEnforcementFactory(); - } - - @Override - public <T> ModelEnforcement<T> forBundle( - CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) { - return new ImmutabilityCheckingEnforcement<T>(input, consumer); - } - - private static class ImmutabilityCheckingEnforcement<T> extends AbstractModelEnforcement<T> { - private final AppliedPTransform<?, ?, ?> transform; - private final Map<WindowedValue<T>, MutationDetector> mutationElements; - private final Coder<T> coder; - - private ImmutabilityCheckingEnforcement( - CommittedBundle<T> input, AppliedPTransform<?, ?, ?> transform) { - this.transform = transform; - coder = SerializableUtils.clone(input.getPCollection().getCoder()); - mutationElements = new IdentityHashMap<>(); - } - - @Override - public void beforeElement(WindowedValue<T> element) { - try { - mutationElements.put( - element, MutationDetectors.forValueWithCoder(element.getValue(), coder)); - } catch (CoderException e) { - throw UserCodeException.wrap(e); - } - } - - @Override - public void afterFinish( - CommittedBundle<T> input, - InProcessTransformResult result, - Iterable<? extends CommittedBundle<?>> outputs) { - for (MutationDetector detector : mutationElements.values()) { - try { - detector.verifyUnmodified(); - } catch (IllegalMutationException e) { - throw UserCodeException.wrap( - new IllegalMutationException( - String.format( - "PTransform %s illegaly mutated value %s of class %s." - + " Input values must not be mutated in any way.", - transform.getFullName(), - e.getSavedValue(), - e.getSavedValue().getClass()), - e.getSavedValue(), - e.getNewValue())); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java deleted file mode 100644 index 66bea37..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright (C) 2016 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.dataflow.sdk.runners.inprocess; - -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.values.PCollection; - -/** - * Enforcement tools that verify that executing code conforms to the model. - * - * <p>ModelEnforcement is performed on a per-element and per-bundle basis. The - * {@link ModelEnforcement} is provided with the input bundle as part of - * {@link ModelEnforcementFactory#forBundle(CommittedBundle, AppliedPTransform)}, each element - * before and after that element is provided to an underlying {@link TransformEvaluator}, and the - * output {@link InProcessTransformResult} and committed output bundles after the - * {@link TransformEvaluator} has completed. - * - * <p>Typically, {@link ModelEnforcement} will obtain required metadata (such as the {@link Coder} - * of the input {@link PCollection} on construction, and then enforce per-element behavior - * (such as the immutability of input elements). When the element is output or the bundle is - * completed, the required conditions can be enforced across all elements. - */ -public interface ModelEnforcement<T> { - /** - * Called before a call to {@link TransformEvaluator#processElement(WindowedValue)} on the - * provided {@link WindowedValue}. - */ - void beforeElement(WindowedValue<T> element); - - /** - * Called after a call to {@link TransformEvaluator#processElement(WindowedValue)} on the - * provided {@link WindowedValue}. - */ - void afterElement(WindowedValue<T> element); - - /** - * Called after a bundle has been completed and {@link TransformEvaluator#finishBundle()} has been - * called, producing the provided {@link InProcessTransformResult} and - * {@link CommittedBundle output bundles}. - */ - void afterFinish( - CommittedBundle<T> input, - InProcessTransformResult result, - Iterable<? extends CommittedBundle<?>> outputs); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java deleted file mode 100644 index 66c01b3..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (C) 2016 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.dataflow.sdk.runners.inprocess; - -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; - -/** - * Creates {@link ModelEnforcement} instances for an {@link AppliedPTransform} on an input - * {@link CommittedBundle bundle}. {@link ModelEnforcement} instances are created before the - * {@link TransformEvaluator} is created. - */ -public interface ModelEnforcementFactory { - <T> ModelEnforcement<T> forBundle(CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java new file mode 100644 index 0000000..32b2a67 --- /dev/null +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.util.WindowedValue; + +/** + * An abstract {@link ModelEnforcement} that provides default empty implementations for each method. + */ +abstract class AbstractModelEnforcement<T> implements ModelEnforcement<T> { + @Override + public void beforeElement(WindowedValue<T> element) {} + + @Override + public void afterElement(WindowedValue<T> element) {} + + @Override + public void afterFinish( + CommittedBundle<T> input, + InProcessTransformResult result, + Iterable<? extends CommittedBundle<?>> outputs) {} +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java new file mode 100644 index 0000000..0e38b55 --- /dev/null +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.util.CoderUtils; +import com.google.cloud.dataflow.sdk.util.SerializableUtils; +import com.google.cloud.dataflow.sdk.util.UserCodeException; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.PCollection; + +/** + * Enforces that all elements in a {@link PCollection} can be encoded using that + * {@link PCollection PCollection's} {@link Coder}. + */ +class EncodabilityEnforcementFactory implements ModelEnforcementFactory { + public static EncodabilityEnforcementFactory create() { + return new EncodabilityEnforcementFactory(); + } + + @Override + public <T> ModelEnforcement<T> forBundle( + CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) { + return new EncodabilityEnforcement<>(input); + } + + private static class EncodabilityEnforcement<T> extends AbstractModelEnforcement<T> { + private Coder<T> coder; + + public EncodabilityEnforcement(CommittedBundle<T> input) { + coder = SerializableUtils.clone(input.getPCollection().getCoder()); + } + + @Override + public void beforeElement(WindowedValue<T> element) { + try { + T clone = CoderUtils.clone(coder, element.getValue()); + if (coder.consistentWithEquals()) { + checkArgument( + coder.structuralValue(element.getValue()).equals(coder.structuralValue(clone)), + "Coder %s of class %s does not maintain structural value equality" + + " on input element %s", + coder, + coder.getClass().getSimpleName(), + element.getValue()); + } + } catch (Exception e) { + throw UserCodeException.wrap(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java new file mode 100644 index 0000000..dfc56a9 --- /dev/null +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.CoderException; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.util.IllegalMutationException; +import com.google.cloud.dataflow.sdk.util.MutationDetector; +import com.google.cloud.dataflow.sdk.util.MutationDetectors; +import com.google.cloud.dataflow.sdk.util.SerializableUtils; +import com.google.cloud.dataflow.sdk.util.UserCodeException; +import com.google.cloud.dataflow.sdk.util.WindowedValue; + +import java.util.IdentityHashMap; +import java.util.Map; + +/** + * {@link ModelEnforcement} that enforces elements are not modified over the course of processing + * an element. + * + * <p>Implies {@link EncodabilityEnforcment}. + */ +class ImmutabilityEnforcementFactory implements ModelEnforcementFactory { + public static ModelEnforcementFactory create() { + return new ImmutabilityEnforcementFactory(); + } + + @Override + public <T> ModelEnforcement<T> forBundle( + CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) { + return new ImmutabilityCheckingEnforcement<T>(input, consumer); + } + + private static class ImmutabilityCheckingEnforcement<T> extends AbstractModelEnforcement<T> { + private final AppliedPTransform<?, ?, ?> transform; + private final Map<WindowedValue<T>, MutationDetector> mutationElements; + private final Coder<T> coder; + + private ImmutabilityCheckingEnforcement( + CommittedBundle<T> input, AppliedPTransform<?, ?, ?> transform) { + this.transform = transform; + coder = SerializableUtils.clone(input.getPCollection().getCoder()); + mutationElements = new IdentityHashMap<>(); + } + + @Override + public void beforeElement(WindowedValue<T> element) { + try { + mutationElements.put( + element, MutationDetectors.forValueWithCoder(element.getValue(), coder)); + } catch (CoderException e) { + throw UserCodeException.wrap(e); + } + } + + @Override + public void afterFinish( + CommittedBundle<T> input, + InProcessTransformResult result, + Iterable<? extends CommittedBundle<?>> outputs) { + for (MutationDetector detector : mutationElements.values()) { + try { + detector.verifyUnmodified(); + } catch (IllegalMutationException e) { + throw UserCodeException.wrap( + new IllegalMutationException( + String.format( + "PTransform %s illegaly mutated value %s of class %s." + + " Input values must not be mutated in any way.", + transform.getFullName(), + e.getSavedValue(), + e.getSavedValue().getClass()), + e.getSavedValue(), + e.getNewValue())); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java new file mode 100644 index 0000000..66bea37 --- /dev/null +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.PCollection; + +/** + * Enforcement tools that verify that executing code conforms to the model. + * + * <p>ModelEnforcement is performed on a per-element and per-bundle basis. The + * {@link ModelEnforcement} is provided with the input bundle as part of + * {@link ModelEnforcementFactory#forBundle(CommittedBundle, AppliedPTransform)}, each element + * before and after that element is provided to an underlying {@link TransformEvaluator}, and the + * output {@link InProcessTransformResult} and committed output bundles after the + * {@link TransformEvaluator} has completed. + * + * <p>Typically, {@link ModelEnforcement} will obtain required metadata (such as the {@link Coder} + * of the input {@link PCollection} on construction, and then enforce per-element behavior + * (such as the immutability of input elements). When the element is output or the bundle is + * completed, the required conditions can be enforced across all elements. + */ +public interface ModelEnforcement<T> { + /** + * Called before a call to {@link TransformEvaluator#processElement(WindowedValue)} on the + * provided {@link WindowedValue}. + */ + void beforeElement(WindowedValue<T> element); + + /** + * Called after a call to {@link TransformEvaluator#processElement(WindowedValue)} on the + * provided {@link WindowedValue}. + */ + void afterElement(WindowedValue<T> element); + + /** + * Called after a bundle has been completed and {@link TransformEvaluator#finishBundle()} has been + * called, producing the provided {@link InProcessTransformResult} and + * {@link CommittedBundle output bundles}. + */ + void afterFinish( + CommittedBundle<T> input, + InProcessTransformResult result, + Iterable<? extends CommittedBundle<?>> outputs); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java new file mode 100644 index 0000000..66c01b3 --- /dev/null +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; + +/** + * Creates {@link ModelEnforcement} instances for an {@link AppliedPTransform} on an input + * {@link CommittedBundle bundle}. {@link ModelEnforcement} instances are created before the + * {@link TransformEvaluator} is created. + */ +public interface ModelEnforcementFactory { + <T> ModelEnforcement<T> forBundle(CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer); +}
