This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0e83270 [BEAM-6979] Fix Dataflow's handling of the new well known
Double coder. (#8288)
0e83270 is described below
commit 0e83270608169dba8aa05d5e29107d7ee85469fa
Author: Lukasz Cwik <[email protected]>
AuthorDate: Fri Apr 12 10:38:39 2019 -0700
[BEAM-6979] Fix Dataflow's handling of the new well known Double coder.
(#8288)
---
.../beam/runners/dataflow/util/CloudObjects.java | 27 ++++++++++++++++++++--
.../worker/graph/LengthPrefixUnknownCoders.java | 3 ++-
2 files changed, 27 insertions(+), 3 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
index 01c9383..ee78f6f 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
@@ -22,17 +22,40 @@ import static
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi
import java.util.Map;
import java.util.ServiceLoader;
+import java.util.Set;
import javax.annotation.Nullable;
-import org.apache.beam.runners.core.construction.ModelCoderRegistrar;
import org.apache.beam.runners.core.construction.SdkComponents;
+import org.apache.beam.runners.core.construction.Timer;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import
org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
/** Utilities for converting an object to a {@link CloudObject}. */
public class CloudObjects {
private CloudObjects() {}
+ // All the coders the Dataflow service understands. This is a subset of all
Beam Model coders.
+ static final Set<Class<? extends Coder>> DATAFLOW_KNOWN_CODERS =
+ ImmutableSet.of(
+ ByteArrayCoder.class,
+ KvCoder.class,
+ VarLongCoder.class,
+ IntervalWindowCoder.class,
+ IterableCoder.class,
+ Timer.Coder.class,
+ LengthPrefixCoder.class,
+ GlobalWindow.Coder.class,
+ FullWindowedValueCoder.class);
+
static final Map<Class<? extends Coder>, CloudObjectTranslator<? extends
Coder>>
CODER_TRANSLATORS = populateCoderTranslators();
static final Map<String, CloudObjectTranslator<? extends Coder>>
@@ -77,7 +100,7 @@ public class CloudObjects {
DefaultCoderCloudObjectTranslatorRegistrar.class.getSimpleName());
encoding = customCoderTranslator.toCloudObject(coder, sdkComponents);
}
- if (sdkComponents != null && !ModelCoderRegistrar.isKnownCoder(coder)) {
+ if (sdkComponents != null &&
!DATAFLOW_KNOWN_CODERS.contains(coder.getClass())) {
try {
String coderId = sdkComponents.registerCoder(coder);
Structs.addString(encoding, PropertyNames.PIPELINE_PROTO_CODER_ID,
coderId);
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
index 1ad7835..d4b9e6c 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
@@ -71,7 +71,8 @@ public class LengthPrefixUnknownCoders {
"kind:fixed_big_endian_int32",
"kind:fixed_big_endian_int64",
"kind:var_int32",
- "kind:void");
+ "kind:void",
+ "org.apache.beam.sdk.coders.DoubleCoder");
private static final String LENGTH_PREFIX_CODER_TYPE = "kind:length_prefix";