This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 77e2862 In SdkComponents (portability support lib), use equality to
compare Coders (#7462)
77e2862 is described below
commit 77e286299aa6d02930398e89df98254bf436cc3c
Author: CraigChambersG <[email protected]>
AuthorDate: Thu Jan 24 16:19:42 2019 -0800
In SdkComponents (portability support lib), use equality to compare Coders
(#7462)
Previously, identity was used to compare Coders, instead of Coder equals().
This leads to equivalent Coders being created leading to separate Coder ids and
entries in the model pipeline proto etc. It also leads to Coders created during
internal expansion of PTransforms (e.g. Combine.perKey()'s accumulatorCoder)
appearing to be different (and getting a different id) than the equivalent
Coder known when the model pipeline proto was computed. This PR switches to
equals() to compare Coder [...]
---
.../runners/core/construction/SdkComponents.java | 8 +++-----
.../core/construction/PipelineTranslationTest.java | 5 ++---
.../runners/core/construction/SdkComponentsTest.java | 20 ++++----------------
3 files changed, 9 insertions(+), 24 deletions(-)
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
index 40c367b..e44d724 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.NameUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Equivalence;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.HashBiMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
@@ -50,8 +49,7 @@ public class SdkComponents {
private final BiMap<WindowingStrategy<?, ?>, String> windowingStrategyIds =
HashBiMap.create();
/** A map of Coder to IDs. Coders are stored here with identity equivalence.
*/
- private final BiMap<Equivalence.Wrapper<? extends Coder<?>>, String>
coderIds =
- HashBiMap.create();
+ private final BiMap<Coder<?>, String> coderIds = HashBiMap.create();
private final BiMap<Environment, String> environmentIds = HashBiMap.create();
@@ -205,13 +203,13 @@ public class SdkComponents {
* same coder.
*/
public String registerCoder(Coder<?> coder) throws IOException {
- String existing = coderIds.get(Equivalence.identity().wrap(coder));
+ String existing = coderIds.get(coder);
if (existing != null) {
return existing;
}
String baseName = NameUtils.approximateSimpleName(coder);
String name = uniqify(baseName, coderIds.values());
- coderIds.put(Equivalence.identity().wrap(coder), name);
+ coderIds.put(coder, name);
RunnerApi.Coder coderProto = CoderTranslation.toProto(coder, this);
componentsBuilder.putCoders(name, coderProto);
return name;
diff --git
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
index 3a6ff39..131118a 100644
---
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
+++
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
@@ -52,7 +52,6 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Equivalence;
import
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.junit.Test;
@@ -125,7 +124,7 @@ public class PipelineTranslationTest {
private boolean useDeprecatedViewTransforms;
Set<Node> transforms;
Set<PCollection<?>> pcollections;
- Set<Equivalence.Wrapper<? extends Coder<?>>> coders;
+ Set<Coder<?>> coders;
Set<WindowingStrategy<?, ?>> windowingStrategies;
int missingViewTransforms = 0;
@@ -195,7 +194,7 @@ public class PipelineTranslationTest {
}
private void addCoders(Coder<?> coder) {
- coders.add(Equivalence.identity().wrap(coder));
+ coders.add(coder);
if (CoderTranslation.KNOWN_CODER_URNS.containsKey(coder.getClass())) {
for (Coder<?> component : ((StructuredCoder<?>)
coder).getComponents()) {
addCoders(component);
diff --git
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
index 660fcbf..9c813d9 100644
---
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
+++
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
@@ -68,7 +68,10 @@ public class SdkComponentsTest {
String id = components.registerCoder(coder);
assertThat(components.registerCoder(coder), equalTo(id));
assertThat(id, not(isEmptyOrNullString()));
- VarLongCoder otherCoder = VarLongCoder.of();
+ Coder<?> equalCoder =
+ KvCoder.of(StringUtf8Coder.of(),
IterableCoder.of(SetCoder.of(ByteArrayCoder.of())));
+ assertThat(components.registerCoder(equalCoder), equalTo(id));
+ Coder<?> otherCoder = VarLongCoder.of();
assertThat(components.registerCoder(otherCoder), not(equalTo(id)));
components.toComponents().getCodersOrThrow(id);
@@ -76,21 +79,6 @@ public class SdkComponentsTest {
}
@Test
- public void registerCoderEqualsNotSame() throws IOException {
- Coder<?> coder =
- KvCoder.of(StringUtf8Coder.of(),
IterableCoder.of(SetCoder.of(ByteArrayCoder.of())));
- Coder<?> otherCoder =
- KvCoder.of(StringUtf8Coder.of(),
IterableCoder.of(SetCoder.of(ByteArrayCoder.of())));
- assertThat(coder, equalTo(otherCoder));
- String id = components.registerCoder(coder);
- String otherId = components.registerCoder(otherCoder);
- assertThat(otherId, not(equalTo(id)));
-
- components.toComponents().getCodersOrThrow(id);
- components.toComponents().getCodersOrThrow(otherId);
- }
-
- @Test
public void registerTransformNoChildren() throws IOException {
Create.Values<Integer> create = Create.of(1, 2, 3);
PCollection<Integer> pt = pipeline.apply(create);