This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e6b571e [BEAM-11154] Check coder proto to avoid registering same
coder under different name in step translation phase (#13225)
e6b571e is described below
commit e6b571e6bc04fb6ecc6e75f5064e09337e6341e2
Author: Yichi Zhang <[email protected]>
AuthorDate: Mon Nov 2 11:06:08 2020 -0800
[BEAM-11154] Check coder proto to avoid registering same coder under
different name in step translation phase (#13225)
---
.../apache/beam/runners/core/construction/SdkComponents.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
index 123a95e..fe4bc0b 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -54,6 +54,7 @@ public class SdkComponents {
private final BiMap<WindowingStrategy<?, ?>, String> windowingStrategyIds =
HashBiMap.create();
private final BiMap<Coder<?>, String> coderIds = HashBiMap.create();
private final BiMap<Environment, String> environmentIds = HashBiMap.create();
+ private final BiMap<RunnerApi.Coder, String> coderProtoToId =
HashBiMap.create();
private final Set<String> requirements;
private final Set<String> reservedIds = new HashSet<>();
@@ -127,6 +128,7 @@ public class SdkComponents {
reservedIds.addAll(components.getEnvironmentsMap().keySet());
components.getEnvironmentsMap().forEach(environmentIds.inverse()::forcePut);
+ components.getCodersMap().forEach(coderProtoToId.inverse()::forcePut);
if (requirements != null) {
this.requirements.addAll(requirements);
@@ -264,10 +266,17 @@ public class SdkComponents {
if (existing != null) {
return existing;
}
+ // Unlike StructuredCoder, custom coders may not have proper
implementation of hashCode() and
+ // equals(), this lead to unnecessary duplications. In order to avoid this
we examine already
+ // registered coders and see if we can find a matching proto, and consider
them same coder.
+ RunnerApi.Coder coderProto = CoderTranslation.toProto(coder, this);
+ if (coderProtoToId.containsKey(coderProto)) {
+ return coderProtoToId.get(coderProto);
+ }
String baseName = NameUtils.approximateSimpleName(coder);
String name = uniqify(baseName, coderIds.values());
coderIds.put(coder, name);
- RunnerApi.Coder coderProto = CoderTranslation.toProto(coder, this);
+ coderProtoToId.put(coderProto, name);
componentsBuilder.putCoders(name, coderProto);
return name;
}