This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 77e2862  In SdkComponents (portability support lib), use equality to 
compare Coders (#7462)
77e2862 is described below

commit 77e286299aa6d02930398e89df98254bf436cc3c
Author: CraigChambersG <[email protected]>
AuthorDate: Thu Jan 24 16:19:42 2019 -0800

    In SdkComponents (portability support lib), use equality to compare Coders 
(#7462)
    
    Previously, identity was used to compare Coders, instead of Coder equals(). 
This leads to equivalent Coders being created leading to separate Coder ids and 
entries in the model pipeline proto etc. It also leads to Coders created during 
internal expansion of PTransforms (e.g. Combine.perKey()'s accumulatorCoder) 
appearing to be different (and getting a different id) than the equivalent 
Coder known when the model pipeline proto was computed. This PR switches to 
equals() to compare Coder [...]
---
 .../runners/core/construction/SdkComponents.java     |  8 +++-----
 .../core/construction/PipelineTranslationTest.java   |  5 ++---
 .../runners/core/construction/SdkComponentsTest.java | 20 ++++----------------
 3 files changed, 9 insertions(+), 24 deletions(-)

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
index 40c367b..e44d724 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.NameUtils;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Equivalence;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.BiMap;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.HashBiMap;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
@@ -50,8 +49,7 @@ public class SdkComponents {
   private final BiMap<WindowingStrategy<?, ?>, String> windowingStrategyIds = 
HashBiMap.create();
 
   /** A map of Coder to IDs. Coders are stored here with identity equivalence. 
*/
-  private final BiMap<Equivalence.Wrapper<? extends Coder<?>>, String> 
coderIds =
-      HashBiMap.create();
+  private final BiMap<Coder<?>, String> coderIds = HashBiMap.create();
 
   private final BiMap<Environment, String> environmentIds = HashBiMap.create();
 
@@ -205,13 +203,13 @@ public class SdkComponents {
    * same coder.
    */
   public String registerCoder(Coder<?> coder) throws IOException {
-    String existing = coderIds.get(Equivalence.identity().wrap(coder));
+    String existing = coderIds.get(coder);
     if (existing != null) {
       return existing;
     }
     String baseName = NameUtils.approximateSimpleName(coder);
     String name = uniqify(baseName, coderIds.values());
-    coderIds.put(Equivalence.identity().wrap(coder), name);
+    coderIds.put(coder, name);
     RunnerApi.Coder coderProto = CoderTranslation.toProto(coder, this);
     componentsBuilder.putCoders(name, coderProto);
     return name;
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
index 3a6ff39..131118a 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
@@ -52,7 +52,6 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Equivalence;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
 import org.joda.time.Duration;
 import org.junit.Test;
@@ -125,7 +124,7 @@ public class PipelineTranslationTest {
     private boolean useDeprecatedViewTransforms;
     Set<Node> transforms;
     Set<PCollection<?>> pcollections;
-    Set<Equivalence.Wrapper<? extends Coder<?>>> coders;
+    Set<Coder<?>> coders;
     Set<WindowingStrategy<?, ?>> windowingStrategies;
     int missingViewTransforms = 0;
 
@@ -195,7 +194,7 @@ public class PipelineTranslationTest {
     }
 
     private void addCoders(Coder<?> coder) {
-      coders.add(Equivalence.identity().wrap(coder));
+      coders.add(coder);
       if (CoderTranslation.KNOWN_CODER_URNS.containsKey(coder.getClass())) {
         for (Coder<?> component : ((StructuredCoder<?>) 
coder).getComponents()) {
           addCoders(component);
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
index 660fcbf..9c813d9 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
@@ -68,7 +68,10 @@ public class SdkComponentsTest {
     String id = components.registerCoder(coder);
     assertThat(components.registerCoder(coder), equalTo(id));
     assertThat(id, not(isEmptyOrNullString()));
-    VarLongCoder otherCoder = VarLongCoder.of();
+    Coder<?> equalCoder =
+        KvCoder.of(StringUtf8Coder.of(), 
IterableCoder.of(SetCoder.of(ByteArrayCoder.of())));
+    assertThat(components.registerCoder(equalCoder), equalTo(id));
+    Coder<?> otherCoder = VarLongCoder.of();
     assertThat(components.registerCoder(otherCoder), not(equalTo(id)));
 
     components.toComponents().getCodersOrThrow(id);
@@ -76,21 +79,6 @@ public class SdkComponentsTest {
   }
 
   @Test
-  public void registerCoderEqualsNotSame() throws IOException {
-    Coder<?> coder =
-        KvCoder.of(StringUtf8Coder.of(), 
IterableCoder.of(SetCoder.of(ByteArrayCoder.of())));
-    Coder<?> otherCoder =
-        KvCoder.of(StringUtf8Coder.of(), 
IterableCoder.of(SetCoder.of(ByteArrayCoder.of())));
-    assertThat(coder, equalTo(otherCoder));
-    String id = components.registerCoder(coder);
-    String otherId = components.registerCoder(otherCoder);
-    assertThat(otherId, not(equalTo(id)));
-
-    components.toComponents().getCodersOrThrow(id);
-    components.toComponents().getCodersOrThrow(otherId);
-  }
-
-  @Test
   public void registerTransformNoChildren() throws IOException {
     Create.Values<Integer> create = Create.of(1, 2, 3);
     PCollection<Integer> pt = pipeline.apply(create);

Reply via email to