gemini-code-assist[bot] commented on code in PR #39043:
URL: https://github.com/apache/beam/pull/39043#discussion_r3446666769
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java:
##########
@@ -110,27 +110,32 @@ public WithKeys<K, V> withKeyType(TypeDescriptor<K>
keyType) {
@Override
public PCollection<KV<K, V>> expand(PCollection<V> in) {
+ TypeDescriptor<V> inputType = in.getTypeDescriptor();
+ TypeDescriptor<KV<K, V>> outputType = getOutputTypeDescriptor(inputType);
PCollection<KV<K, V>> result =
in.apply(
"AddKeys",
- MapElements.via(
- new SimpleFunction<V, KV<K, V>>() {
- @Override
- public KV<K, V> apply(V element) {
- return KV.of(fn.apply(element), element);
- }
- }));
+ outputType == null
+ ? MapElements.via(
+ new SimpleFunction<V, KV<K, V>>() {
+ @Override
+ public KV<K, V> apply(V element) {
+ return KV.of(fn.apply(element), element);
+ }
+ })
+ : MapElements.into(outputType)
+ .via(
+ (SerializableFunction<V, KV<K, V>>)
+ element -> KV.of(fn.apply(element), element)));
Review Comment:

The lambda expression `element -> KV.of(fn.apply(element), element)`
captures the instance field `fn`, which implicitly captures the outer
`WithKeys` instance (`this`). In Apache Beam, capturing the `PTransform`
instance in a user-defined function/lambda can lead to
`NotSerializableException` at runtime if the transform contains
non-serializable fields, and it unnecessarily increases the serialized payload
size sent to workers.
To avoid capturing `this`, copy `this.fn` to a local variable (e.g.,
`localFn`) before the lambda and reference that local variable instead.
```suggestion
SerializableFunction<V, K> localFn = this.fn;
TypeDescriptor<V> inputType = in.getTypeDescriptor();
TypeDescriptor<KV<K, V>> outputType = getOutputTypeDescriptor(inputType);
PCollection<KV<K, V>> result =
in.apply(
"AddKeys",
outputType == null
? MapElements.via(
new SimpleFunction<V, KV<K, V>>() {
@Override
public KV<K, V> apply(V element) {
return KV.of(localFn.apply(element), element);
}
})
: MapElements.into(outputType)
.via(
(SerializableFunction<V, KV<K, V>>)
element -> KV.of(localFn.apply(element),
element)));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]