Repository: incubator-beam Updated Branches: refs/heads/master 3b36a65b2 -> baae9013c
Stop cloning coders in the InProcessRunner This is excessively slow and also not useful, as coders are required to be thread-safe Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2a44214e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2a44214e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2a44214e Branch: refs/heads/master Commit: 2a44214e4aef5a5d377378b9a0ac3ff3360acefc Parents: a9387fc Author: Thomas Groh <[email protected]> Authored: Thu Apr 28 15:27:03 2016 -0700 Committer: Thomas Groh <[email protected]> Committed: Thu Apr 28 15:53:43 2016 -0700 ---------------------------------------------------------------------- .../sdk/runners/inprocess/EncodabilityEnforcementFactory.java | 3 +-- .../sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java | 3 +-- .../sdk/runners/inprocess/ImmutabilityEnforcementFactory.java | 3 +-- 3 files changed, 3 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2a44214e/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactory.java index 02a36cf..d234d4f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactory.java @@ -23,7 +23,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; @@ -47,7 +46,7 @@ class EncodabilityEnforcementFactory implements ModelEnforcementFactory { private Coder<T> coder; public EncodabilityEnforcement(CommittedBundle<T> input) { - coder = SerializableUtils.clone(input.getPCollection().getCoder()); + coder = input.getPCollection().getCoder(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2a44214e/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java index 0852269..04ece1c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java @@ -27,7 +27,6 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.util.IllegalMutationException; import org.apache.beam.sdk.util.MutationDetector; import org.apache.beam.sdk.util.MutationDetectors; -import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; @@ -87,7 +86,7 @@ class ImmutabilityCheckingBundleFactory implements BundleFactory { public ImmutabilityEnforcingBundle(UncommittedBundle<T> underlying) { this.underlying = underlying; mutationDetectors = HashMultimap.create(); - coder = SerializableUtils.clone(getPCollection().getCoder()); + coder = getPCollection().getCoder(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2a44214e/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java index 028870a..2f21032 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java @@ -24,7 +24,6 @@ import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.util.IllegalMutationException; import org.apache.beam.sdk.util.MutationDetector; import org.apache.beam.sdk.util.MutationDetectors; -import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; @@ -56,7 +55,7 @@ class ImmutabilityEnforcementFactory implements ModelEnforcementFactory { private ImmutabilityCheckingEnforcement( CommittedBundle<T> input, AppliedPTransform<?, ?, ?> transform) { this.transform = transform; - coder = SerializableUtils.clone(input.getPCollection().getCoder()); + coder = input.getPCollection().getCoder(); mutationElements = new IdentityHashMap<>(); }
