damccorm commented on code in PR #38303:
URL: https://github.com/apache/beam/pull/38303#discussion_r3275134529
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java:
##########
@@ -124,31 +124,27 @@ public KV<K, V> apply(V element) {
try {
Coder<K> keyCoder;
CoderRegistry coderRegistry = in.getPipeline().getCoderRegistry();
+
if (keyType == null) {
- keyCoder = coderRegistry.getOutputCoder(fn, in.getCoder());
+ try {
+ keyCoder = coderRegistry.getOutputCoder(fn, in.getCoder());
+ } catch (CannotProvideCoderException e) {
+ // fallback for lambda (Integer output)
+ keyCoder = (Coder<K>) VarIntCoder.of();
+ }
} else {
keyCoder = coderRegistry.getCoder(keyType);
}
- // TODO: Remove when we can set the coder inference context.
- result.setCoder(KvCoder.of(keyCoder, in.getCoder()));
+
+ result.setCoder(
+ KvCoder.of((Coder<K>) (Object)
SerializableCoder.of(Serializable.class), in.getCoder()));
+
} catch (CannotProvideCoderException exc) {
- if (keyType != null) {
- try {
- SchemaRegistry schemaRegistry = SchemaRegistry.createDefault();
- SchemaCoder<K> schemaCoder =
- SchemaCoder.of(
- schemaRegistry.getSchema(keyType),
- keyType,
- schemaRegistry.getToRowFunction(keyType),
- schemaRegistry.getFromRowFunction(keyType));
- result.setCoder(KvCoder.of(schemaCoder, in.getCoder()));
- } catch (NoSuchSchemaException exception) {
- // No Schema.
- }
- }
- // let lazy coder inference have a try
+ // Fallback: use SerializableCoder. We use a wildcard cast to avoid
+ // raw type warnings while still allowing the fallback to function.
+ Coder<K> fallbackCoder = (Coder<K>) (Coder<?>)
SerializableCoder.of(Serializable.class);
Review Comment:
This doesn't make sense - why would we do this instead of the existing
schema inference
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java:
##########
@@ -124,31 +124,27 @@ public KV<K, V> apply(V element) {
try {
Coder<K> keyCoder;
CoderRegistry coderRegistry = in.getPipeline().getCoderRegistry();
+
if (keyType == null) {
- keyCoder = coderRegistry.getOutputCoder(fn, in.getCoder());
+ try {
+ keyCoder = coderRegistry.getOutputCoder(fn, in.getCoder());
+ } catch (CannotProvideCoderException e) {
+ // fallback for lambda (Integer output)
+ keyCoder = (Coder<K>) VarIntCoder.of();
Review Comment:
Why would we assume an int here?
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java:
##########
@@ -124,31 +124,27 @@ public KV<K, V> apply(V element) {
try {
Coder<K> keyCoder;
CoderRegistry coderRegistry = in.getPipeline().getCoderRegistry();
+
if (keyType == null) {
- keyCoder = coderRegistry.getOutputCoder(fn, in.getCoder());
+ try {
+ keyCoder = coderRegistry.getOutputCoder(fn, in.getCoder());
+ } catch (CannotProvideCoderException e) {
+ // fallback for lambda (Integer output)
+ keyCoder = (Coder<K>) VarIntCoder.of();
+ }
} else {
keyCoder = coderRegistry.getCoder(keyType);
}
- // TODO: Remove when we can set the coder inference context.
- result.setCoder(KvCoder.of(keyCoder, in.getCoder()));
+
+ result.setCoder(
+ KvCoder.of((Coder<K>) (Object)
SerializableCoder.of(Serializable.class), in.getCoder()));
Review Comment:
Why would we change this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]