Move shared DelegatingAggregator out of OldDoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/139437bd Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/139437bd Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/139437bd Branch: refs/heads/master Commit: 139437bdca8872a11f6a87a9f54347985523faf2 Parents: 0d500ef Author: Kenneth Knowles <[email protected]> Authored: Fri Oct 21 11:45:38 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Sun Oct 23 19:52:51 2016 -0700 ---------------------------------------------------------------------- .../sdk/transforms/DelegatingAggregator.java | 125 +++++++++++++++++++ .../org/apache/beam/sdk/transforms/DoFn.java | 30 ++--- .../org/apache/beam/sdk/transforms/OldDoFn.java | 97 -------------- .../DoFnDelegatingAggregatorTest.java | 5 +- 4 files changed, 142 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/139437bd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java new file mode 100644 index 0000000..d92bb71 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.MoreObjects; +import java.io.Serializable; +import java.util.Objects; +import java.util.UUID; +import org.apache.beam.sdk.transforms.Combine.CombineFn; + +/** + * An {@link Aggregator} that delegates calls to {@link #addValue} to another aggregator. + * + * <p>This {@link Aggregator} is designed to be constructed without a delegate, at pipeline + * construction time, and serialized within a {@link DoFn}. The delegate aggregator to which it + * submits values must be provided by the runner at execution time. + * + * @param <AggInputT> the type of input element + * @param <AggOutputT> the type of output element + */ +class DelegatingAggregator<AggInputT, AggOutputT> + implements Aggregator<AggInputT, AggOutputT>, Serializable { + private final UUID id; + + private final String name; + + private final CombineFn<AggInputT, ?, AggOutputT> combineFn; + + private Aggregator<AggInputT, ?> delegate; + + public DelegatingAggregator(String name, + CombineFn<? super AggInputT, ?, AggOutputT> combiner) { + this.id = UUID.randomUUID(); + this.name = checkNotNull(name, "name cannot be null"); + // Safe contravariant cast + @SuppressWarnings("unchecked") + CombineFn<AggInputT, ?, AggOutputT> specificCombiner = + (CombineFn<AggInputT, ?, AggOutputT>) checkNotNull(combiner, "combineFn cannot be null"); + this.combineFn = specificCombiner; + } + + @Override + public void addValue(AggInputT value) { + if (delegate == null) { + throw new IllegalStateException( + String.format( + "addValue cannot be called on Aggregator outside of the execution of a %s.", + DoFn.class.getSimpleName())); + } else { + delegate.addValue(value); + } + } + + @Override + public String getName() { + return name; + } + + @Override + public CombineFn<AggInputT, ?, AggOutputT> getCombineFn() { + return combineFn; + } + + /** + * Sets the current delegate of the Aggregator. + * + * @param delegate the delegate to set in this aggregator + */ + public void setDelegate(Aggregator<AggInputT, ?> delegate) { + this.delegate = delegate; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("name", name) + .add("combineFn", combineFn) + .toString(); + } + + @Override + public int hashCode() { + return Objects.hash(id, name, combineFn.getClass()); + } + + /** + * Indicates whether some other object is "equal to" this one. + * + * <p>{@code DelegatingAggregator} instances are equal if they have the same name, their + * CombineFns are the same class, and they have identical IDs. + */ + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (o == null) { + return false; + } + if (o instanceof DelegatingAggregator) { + DelegatingAggregator<?, ?> that = (DelegatingAggregator<?, ?>) o; + return Objects.equals(this.id, that.id) + && Objects.equals(this.name, that.name) + && Objects.equals(this.combineFn.getClass(), that.combineFn.getClass()); + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/139437bd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 8b3aaf8..0531cbb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -35,8 +35,8 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; @@ -775,31 +775,31 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD } /** - * Returns an {@link Aggregator} with aggregation logic specified by the - * {@link CombineFn} argument. The name provided must be unique across - * {@link Aggregator}s created within the {@link DoFn}. Aggregators can only be created - * during pipeline construction. + * Returns an {@link Aggregator} with aggregation logic specified by the {@link CombineFn} + * argument. The name provided must be unique across {@link Aggregator}s created within the {@link + * DoFn}. Aggregators can only be created during pipeline construction. * * @param name the name of the aggregator * @param combiner the {@link CombineFn} to use in the aggregator - * @return an aggregator for the provided name and combiner in the scope of - * this {@link DoFn} + * @return an aggregator for the provided name and combiner in the scope of this {@link DoFn} * @throws NullPointerException if the name or combiner is null - * @throws IllegalArgumentException if the given name collides with another - * aggregator in this scope + * @throws IllegalArgumentException if the given name collides with another aggregator in this + * scope * @throws IllegalStateException if called during pipeline execution. */ - public final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> - createAggregator(String name, Combine.CombineFn<? super AggInputT, ?, AggOutputT> combiner) { + public final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator( + String name, Combine.CombineFn<? super AggInputT, ?, AggOutputT> combiner) { checkNotNull(name, "name cannot be null"); checkNotNull(combiner, "combiner cannot be null"); - checkArgument(!aggregators.containsKey(name), + checkArgument( + !aggregators.containsKey(name), "Cannot create aggregator with name %s." - + " An Aggregator with that name already exists within this scope.", + + " An Aggregator with that name already exists within this scope.", name); - checkState(!aggregatorsAreFinal, + checkState( + !aggregatorsAreFinal, "Cannot create an aggregator during pipeline execution." - + " Aggregators should be registered during pipeline construction."); + + " Aggregators should be registered during pipeline construction."); DelegatingAggregator<AggInputT, AggOutputT> aggregator = new DelegatingAggregator<>(name, combiner); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/139437bd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java index 72c2965..b269f47 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java @@ -21,14 +21,11 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import com.google.common.base.MoreObjects; import java.io.Serializable; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.Objects; -import java.util.UUID; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.options.PipelineOptions; @@ -505,100 +502,6 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl } /** - * An {@link Aggregator} that delegates calls to addValue to another - * aggregator. - * - * @param <AggInputT> the type of input element - * @param <AggOutputT> the type of output element - */ - static class DelegatingAggregator<AggInputT, AggOutputT> implements - Aggregator<AggInputT, AggOutputT>, Serializable { - private final UUID id; - - private final String name; - - private final CombineFn<AggInputT, ?, AggOutputT> combineFn; - - private Aggregator<AggInputT, ?> delegate; - - public DelegatingAggregator(String name, - CombineFn<? super AggInputT, ?, AggOutputT> combiner) { - this.id = UUID.randomUUID(); - this.name = checkNotNull(name, "name cannot be null"); - // Safe contravariant cast - @SuppressWarnings("unchecked") - CombineFn<AggInputT, ?, AggOutputT> specificCombiner = - (CombineFn<AggInputT, ?, AggOutputT>) checkNotNull(combiner, "combineFn cannot be null"); - this.combineFn = specificCombiner; - } - - @Override - public void addValue(AggInputT value) { - if (delegate == null) { - throw new IllegalStateException( - "addValue cannot be called on Aggregator outside of the execution of a OldDoFn."); - } else { - delegate.addValue(value); - } - } - - @Override - public String getName() { - return name; - } - - @Override - public CombineFn<AggInputT, ?, AggOutputT> getCombineFn() { - return combineFn; - } - - /** - * Sets the current delegate of the Aggregator. - * - * @param delegate the delegate to set in this aggregator - */ - public void setDelegate(Aggregator<AggInputT, ?> delegate) { - this.delegate = delegate; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("name", name) - .add("combineFn", combineFn) - .toString(); - } - - @Override - public int hashCode() { - return Objects.hash(id, name, combineFn.getClass()); - } - - /** - * Indicates whether some other object is "equal to" this one. - * - * <p>{@code DelegatingAggregator} instances are equal if they have the same name, their - * CombineFns are the same class, and they have identical IDs. - */ - @Override - public boolean equals(Object o) { - if (o == this) { - return true; - } - if (o == null) { - return false; - } - if (o instanceof DelegatingAggregator) { - DelegatingAggregator<?, ?> that = (DelegatingAggregator<?, ?>) o; - return Objects.equals(this.id, that.id) - && Objects.equals(this.name, that.name) - && Objects.equals(this.combineFn.getClass(), that.combineFn.getClass()); - } - return false; - } - } - - /** * A {@link Context} for an {@link OldDoFn} via a context for a proper {@link DoFn}. */ private class AdaptedContext extends Context { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/139437bd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java index c072fd7..f51a6b0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java @@ -24,7 +24,6 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -35,7 +34,7 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; /** - * Tests for {@link OldDoFn.DelegatingAggregator}. + * Tests for {@link DelegatingAggregator}. */ @RunWith(JUnit4.class) public class DoFnDelegatingAggregatorTest { @@ -63,7 +62,7 @@ public class DoFnDelegatingAggregatorTest { thrown.expect(IllegalStateException.class); thrown.expectMessage("cannot be called"); - thrown.expectMessage("OldDoFn"); + thrown.expectMessage("DoFn"); aggregator.addValue(21.2); }
