gemini-code-assist[bot] commented on code in PR #39043:
URL: https://github.com/apache/beam/pull/39043#discussion_r3446666769


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java:
##########
@@ -110,27 +110,32 @@ public WithKeys<K, V> withKeyType(TypeDescriptor<K> 
keyType) {
 
   @Override
   public PCollection<KV<K, V>> expand(PCollection<V> in) {
+    TypeDescriptor<V> inputType = in.getTypeDescriptor();
+    TypeDescriptor<KV<K, V>> outputType = getOutputTypeDescriptor(inputType);
     PCollection<KV<K, V>> result =
         in.apply(
             "AddKeys",
-            MapElements.via(
-                new SimpleFunction<V, KV<K, V>>() {
-                  @Override
-                  public KV<K, V> apply(V element) {
-                    return KV.of(fn.apply(element), element);
-                  }
-                }));
+            outputType == null
+                ? MapElements.via(
+                    new SimpleFunction<V, KV<K, V>>() {
+                      @Override
+                      public KV<K, V> apply(V element) {
+                        return KV.of(fn.apply(element), element);
+                      }
+                    })
+                : MapElements.into(outputType)
+                    .via(
+                        (SerializableFunction<V, KV<K, V>>)
+                            element -> KV.of(fn.apply(element), element)));

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The lambda expression `element -> KV.of(fn.apply(element), element)` 
captures the instance field `fn`, which implicitly captures the outer 
`WithKeys` instance (`this`). In Apache Beam, capturing the `PTransform` 
instance in a user-defined function/lambda can lead to 
`NotSerializableException` at runtime if the transform contains 
non-serializable fields, and it unnecessarily increases the serialized payload 
size sent to workers.
   
   To avoid capturing `this`, copy `this.fn` to a local variable (e.g., 
`localFn`) before the lambda and reference that local variable instead.
   
   ```suggestion
       SerializableFunction<V, K> localFn = this.fn;
       TypeDescriptor<V> inputType = in.getTypeDescriptor();
       TypeDescriptor<KV<K, V>> outputType = getOutputTypeDescriptor(inputType);
       PCollection<KV<K, V>> result =
           in.apply(
               "AddKeys",
               outputType == null
                   ? MapElements.via(
                       new SimpleFunction<V, KV<K, V>>() {
                         @Override
                         public KV<K, V> apply(V element) {
                           return KV.of(localFn.apply(element), element);
                         }
                       })
                   : MapElements.into(outputType)
                       .via(
                           (SerializableFunction<V, KV<K, V>>)
                               element -> KV.of(localFn.apply(element), 
element)));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to