gemini-code-assist[bot] commented on code in PR #38255:
URL: https://github.com/apache/beam/pull/38255#discussion_r3116807119


##########
runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import static 
org.apache.beam.repackaged.core.org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.batch.GroupByKeyHelpers.eligibleForGlobalGroupBy;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.batch.GroupByKeyHelpers.eligibleForGroupByWindow;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.batch.GroupByKeyHelpers.explodeWindowedKey;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.batch.GroupByKeyHelpers.valueKey;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.batch.GroupByKeyHelpers.valueValue;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.batch.GroupByKeyHelpers.windowedKV;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.CoderHelpers.toByteArray;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.collectionEncoder;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.encoderOf;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.kvEncoder;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.windowedValueEncoder;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.concat;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.fun1;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.fun2;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.javaIterator;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.seqOf;
+import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.collect_list;
+import static org.apache.spark.sql.functions.explode;
+import static org.apache.spark.sql.functions.max;
+import static org.apache.spark.sql.functions.min;
+import static org.apache.spark.sql.functions.struct;
+
+import java.io.Serializable;
+import org.apache.beam.runners.core.InMemoryStateInternals;
+import org.apache.beam.runners.core.ReduceFnRunner;
+import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.spark.SparkCommonPipelineOptions;
+import 
org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
+import 
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.sdk.values.WindowedValues;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.TypedColumn;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import scala.Tuple2;
+import scala.collection.Iterator;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.List;
+
+/**
+ * Translator for {@link GroupByKey} using {@link Dataset#groupByKey} with the 
build-in aggregation

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Typo in Javadoc: `build-in` should be `built-in`.
   
   ```suggestion
    * Translator for {@link GroupByKey} using {@link Dataset#groupByKey} with 
the built-in aggregation
   ```



##########
runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java:
##########
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
+
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderFactory.invoke;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderFactory.invokeIfNotNull;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.match;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.replace;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.seqOf;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.tuple;
+import static org.apache.spark.sql.types.DataTypes.BinaryType;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.LongType;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.sdk.values.WindowedValues;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.catalyst.SerializerBuildHelper;
+import 
org.apache.spark.sql.catalyst.SerializerBuildHelper.MapElementInformation;
+import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal;
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.expressions.BoundReference;
+import org.apache.spark.sql.catalyst.expressions.Coalesce;
+import org.apache.spark.sql.catalyst.expressions.CreateNamedStruct;
+import org.apache.spark.sql.catalyst.expressions.EqualTo;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.expressions.GetStructField;
+import org.apache.spark.sql.catalyst.expressions.If;
+import org.apache.spark.sql.catalyst.expressions.IsNotNull;
+import org.apache.spark.sql.catalyst.expressions.IsNull;
+import org.apache.spark.sql.catalyst.expressions.Literal;
+import org.apache.spark.sql.catalyst.expressions.Literal$;
+import org.apache.spark.sql.catalyst.expressions.MapKeys;
+import org.apache.spark.sql.catalyst.expressions.MapValues;
+import org.apache.spark.sql.catalyst.expressions.objects.MapObjects$;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.MapType;
+import org.apache.spark.sql.types.ObjectType;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.MutablePair;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.collection.IndexedSeq;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+/** {@link Encoders} utility class. */
+public class EncoderHelpers {
+  private static final DataType OBJECT_TYPE = new ObjectType(Object.class);
+  private static final DataType TUPLE2_TYPE = new ObjectType(Tuple2.class);
+  private static final DataType WINDOWED_VALUE = new 
ObjectType(WindowedValue.class);
+  private static final DataType KV_TYPE = new ObjectType(KV.class);
+  private static final DataType MUTABLE_PAIR_TYPE = new 
ObjectType(MutablePair.class);
+  private static final DataType LIST_TYPE = new ObjectType(List.class);
+
+  // Collections / maps of these types can be (de)serialized without 
(de)serializing each member
+  private static final Set<Class<?>> PRIMITIV_TYPES =

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Typo in constant name: `PRIMITIV_TYPES` should be `PRIMITIVE_TYPES`.
   
   ```suggestion
     private static final Set<Class<?>> PRIMITIVE_TYPES =
   ```



##########
runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.batch.GroupByKeyHelpers.eligibleForGlobalGroupBy;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.batch.GroupByKeyHelpers.eligibleForGroupByWindow;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.batch.GroupByKeyHelpers.explodeWindowedKey;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.batch.GroupByKeyHelpers.value;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.batch.GroupByKeyHelpers.valueKey;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.batch.GroupByKeyHelpers.valueValue;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.batch.GroupByKeyHelpers.windowedKV;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.fun1;
+
+import java.util.Collection;
+import 
org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
+import 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop;
+import 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.Fun1;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.sdk.values.WindowedValues;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.expressions.Aggregator;
+import scala.Tuple2;
+import scala.collection.IterableOnce;
+
+/**
+ * Translator for {@link Combine.PerKey} using {@link Dataset#groupByKey} with 
a Spark {@link
+ * Aggregator}.
+ *
+ * <ul>
+ *   <li>When using the default global window, window information is dropped 
and restored after the
+ *       aggregation.
+ *   <li>For non-merging windows, windows are exploded and moved into a 
composite key for better
+ *       distribution. After the aggregation, windowed values are restored 
from the composite key.
+ *   <li>All other cases use an aggregator on windowed values that is 
optimized for the current
+ *       windowing strategy.
+ * </ul>
+ *
+ * TODOs:
+ * <li>combine with context (CombineFnWithContext)?
+ * <li>combine with sideInputs?
+ * <li>other there other missing features?

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Grammar issue in TODO comment: `other there other missing features?` should 
be `are there other missing features?`.
   
   ```suggestion
    * <li>are there other missing features?
   ```



##########
runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java:
##########
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
+
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderFactory.invoke;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderFactory.invokeIfNotNull;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.match;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.replace;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.seqOf;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.tuple;
+import static org.apache.spark.sql.types.DataTypes.BinaryType;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.LongType;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.sdk.values.WindowedValues;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.catalyst.SerializerBuildHelper;
+import 
org.apache.spark.sql.catalyst.SerializerBuildHelper.MapElementInformation;
+import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal;
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.expressions.BoundReference;
+import org.apache.spark.sql.catalyst.expressions.Coalesce;
+import org.apache.spark.sql.catalyst.expressions.CreateNamedStruct;
+import org.apache.spark.sql.catalyst.expressions.EqualTo;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.expressions.GetStructField;
+import org.apache.spark.sql.catalyst.expressions.If;
+import org.apache.spark.sql.catalyst.expressions.IsNotNull;
+import org.apache.spark.sql.catalyst.expressions.IsNull;
+import org.apache.spark.sql.catalyst.expressions.Literal;
+import org.apache.spark.sql.catalyst.expressions.Literal$;
+import org.apache.spark.sql.catalyst.expressions.MapKeys;
+import org.apache.spark.sql.catalyst.expressions.MapValues;
+import org.apache.spark.sql.catalyst.expressions.objects.MapObjects$;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.MapType;
+import org.apache.spark.sql.types.ObjectType;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.MutablePair;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.collection.IndexedSeq;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+/** {@link Encoders} utility class. */
+public class EncoderHelpers {
+  private static final DataType OBJECT_TYPE = new ObjectType(Object.class);
+  private static final DataType TUPLE2_TYPE = new ObjectType(Tuple2.class);
+  private static final DataType WINDOWED_VALUE = new 
ObjectType(WindowedValue.class);
+  private static final DataType KV_TYPE = new ObjectType(KV.class);
+  private static final DataType MUTABLE_PAIR_TYPE = new 
ObjectType(MutablePair.class);
+  private static final DataType LIST_TYPE = new ObjectType(List.class);
+
+  // Collections / maps of these types can be (de)serialized without 
(de)serializing each member
+  private static final Set<Class<?>> PRIMITIV_TYPES =
+      ImmutableSet.of(
+          Boolean.class,
+          Byte.class,
+          Short.class,
+          Integer.class,
+          Long.class,
+          Float.class,
+          Double.class);
+
+  // Default encoders by class
+  private static final Map<Class<?>, Encoder<?>> DEFAULT_ENCODERS = new 
ConcurrentHashMap<>();
+
+  // Factory for default encoders by class
+  private static @Nullable Encoder<?> encoderFactory(Class<?> cls) {
+    if (cls.equals(PaneInfo.class)) {
+      return paneInfoEncoder();
+    } else if (cls.equals(GlobalWindow.class)) {
+      return binaryEncoder(GlobalWindow.Coder.INSTANCE, false);
+    } else if (cls.equals(IntervalWindow.class)) {
+      return binaryEncoder(IntervalWindowCoder.of(), false);
+    } else if (cls.equals(Instant.class)) {
+      return instantEncoder();
+    } else if (cls.equals(String.class)) {
+      return Encoders.STRING();
+    } else if (cls.equals(Boolean.class)) {
+      return Encoders.BOOLEAN();
+    } else if (cls.equals(Integer.class)) {
+      return Encoders.INT();
+    } else if (cls.equals(Long.class)) {
+      return Encoders.LONG();
+    } else if (cls.equals(Float.class)) {
+      return Encoders.FLOAT();
+    } else if (cls.equals(Double.class)) {
+      return Encoders.DOUBLE();
+    } else if (cls.equals(BigDecimal.class)) {
+      return Encoders.DECIMAL();
+    } else if (cls.equals(byte[].class)) {
+      return Encoders.BINARY();
+    } else if (cls.equals(Byte.class)) {
+      return Encoders.BYTE();
+    } else if (cls.equals(Short.class)) {
+      return Encoders.SHORT();
+    }
+    return null;
+  }
+
+  @SuppressWarnings({"nullness", "methodref.return"}) // computeIfAbsent 
allows null returns
+  private static <T> @Nullable Encoder<T> getOrCreateDefaultEncoder(Class<? 
super T> cls) {
+    return (Encoder<T>) DEFAULT_ENCODERS.computeIfAbsent(cls, 
EncoderHelpers::encoderFactory);
+  }
+
+  /** Gets or creates a default {@link Encoder} for {@link T}. */
+  public static <T> Encoder<T> encoderOf(Class<? super T> cls) {
+    Encoder<T> enc = getOrCreateDefaultEncoder(cls);
+    if (enc == null) {
+      throw new IllegalArgumentException("No default coder available for class 
" + cls);
+    }
+    return enc;
+  }
+
+  /**
+   * Creates a Spark {@link Encoder} for {@link T} of {@link 
DataTypes#BinaryType BinaryType}
+   * delegating to a Beam {@link Coder} underneath.
+   *
+   * <p>Note: For common types, if available, default Spark {@link Encoder}s 
are used instead.
+   *
+   * @param coder Beam {@link Coder}
+   */
+  public static <T> Encoder<T> encoderFor(Coder<T> coder) {
+    Encoder<T> enc = 
getOrCreateDefaultEncoder(coder.getEncodedTypeDescriptor().getRawType());
+    return enc != null ? enc : binaryEncoder(coder, true);
+  }
+
+  /**
+   * Creates a Spark {@link Encoder} for {@link T} of {@link StructType} with 
fields {@code value},
+   * {@code timestamp}, {@code window} and {@code pane}.
+   *
+   * @param value {@link Encoder} to encode field `{@code value}`.
+   * @param window {@link Encoder} to encode individual windows in field 
`{@code window}`
+   */
+  public static <T, W extends BoundedWindow> Encoder<WindowedValue<T>> 
windowedValueEncoder(
+      Encoder<T> value, Encoder<W> window) {
+    Encoder<Instant> timestamp = encoderOf(Instant.class);
+    Encoder<PaneInfo> paneInfo = encoderOf(PaneInfo.class);
+    Encoder<Collection<W>> windows = collectionEncoder(window);
+    Expression serializer =
+        serializeWindowedValue(rootRef(WINDOWED_VALUE, true), value, 
timestamp, windows, paneInfo);
+    Expression deserializer =
+        deserializeWindowedValue(
+            rootCol(serializer.dataType()), value, timestamp, windows, 
paneInfo);
+    return EncoderFactory.create(serializer, deserializer, 
WindowedValue.class);
+  }
+
+  /**
+   * Creates a one-of Spark {@link Encoder} of {@link StructType} where each 
alternative is
+   * represented as colum / field named by its index with a separate {@link 
Encoder} each.
+   *
+   * <p>Externally this is represented as tuple {@code (index, data)} where an 
index corresponds to
+   * an {@link Encoder} in the provided list.
+   *
+   * @param encoders {@link Encoder}s for each alternative.
+   */
+  public static <T> Encoder<Tuple2<Integer, T>> oneOfEncoder(List<Encoder<T>> 
encoders) {
+    Expression serializer = serializeOneOf(rootRef(TUPLE2_TYPE, true), 
encoders);
+    Expression deserializer = deserializeOneOf(rootCol(serializer.dataType()), 
encoders);
+    return EncoderFactory.create(serializer, deserializer, Tuple2.class);
+  }
+
+  /**
+   * Creates a Spark {@link Encoder} for {@link KV} of {@link StructType} with 
fields {@code key}
+   * and {@code value}.
+   *
+   * @param key {@link Encoder} to encode field `{@code key}`.
+   * @param value {@link Encoder} to encode field `{@code value}`
+   */
+  public static <K, V> Encoder<KV<K, V>> kvEncoder(Encoder<K> key, Encoder<V> 
value) {
+    Expression serializer = serializeKV(rootRef(KV_TYPE, true), key, value);
+    Expression deserializer = deserializeKV(rootCol(serializer.dataType()), 
key, value);
+    return EncoderFactory.create(serializer, deserializer, KV.class);
+  }
+
+  /**
+   * Creates a Spark {@link Encoder} of {@link ArrayType} for Java {@link 
Collection}s with nullable
+   * elements.
+   *
+   * @param enc {@link Encoder} to encode collection elements
+   */
+  public static <T> Encoder<Collection<T>> collectionEncoder(Encoder<T> enc) {
+    return collectionEncoder(enc, true);
+  }
+
+  /**
+   * Creates a Spark {@link Encoder} of {@link ArrayType} for Java {@link 
Collection}s.
+   *
+   * @param enc {@link Encoder} to encode collection elements
+   * @param nullable Allow nullable collection elements
+   */
+  public static <T> Encoder<Collection<T>> collectionEncoder(Encoder<T> enc, 
boolean nullable) {
+    DataType type = new ObjectType(Collection.class);
+    Expression serializer = serializeSeq(rootRef(type, true), enc, nullable);
+    Expression deserializer = deserializeSeq(rootCol(serializer.dataType()), 
enc, nullable, true);
+    return EncoderFactory.create(serializer, deserializer, Collection.class);
+  }
+
+  /**
+   * Creates a Spark {@link Encoder} of {@link MapType} that deserializes to 
{@link MapT}.
+   *
+   * @param key {@link Encoder} to encode keys
+   * @param value {@link Encoder} to encode values
+   * @param cls Specific class to use, supported are {@link HashMap} and 
{@link TreeMap}
+   */
+  public static <MapT extends Map<K, V>, K, V> Encoder<MapT> mapEncoder(
+      Encoder<K> key, Encoder<V> value, Class<MapT> cls) {
+    Expression serializer = mapSerializer(rootRef(new ObjectType(cls), true), 
key, value);
+    Expression deserializer = mapDeserializer(rootCol(serializer.dataType()), 
key, value, cls);
+    return EncoderFactory.create(serializer, deserializer, cls);
+  }
+
+  /**
+   * Creates a Spark {@link Encoder} for Spark's {@link MutablePair} of {@link 
StructType} with
+   * fields `{@code _1}` and `{@code _2}`.
+   *
+   * <p>This is intended to be used in places such as aggregators.
+   *
+   * @param enc1 {@link Encoder} to encode `{@code _1}`
+   * @param enc2 {@link Encoder} to encode `{@code _2}`
+   */
+  public static <T1, T2> Encoder<MutablePair<T1, T2>> mutablePairEncoder(
+      Encoder<T1> enc1, Encoder<T2> enc2) {
+    Expression serializer = serializeMutablePair(rootRef(MUTABLE_PAIR_TYPE, 
true), enc1, enc2);
+    Expression deserializer = 
deserializeMutablePair(rootCol(serializer.dataType()), enc1, enc2);
+    return EncoderFactory.create(serializer, deserializer, MutablePair.class);
+  }
+
+  /**
+   * Creates a Spark {@link Encoder} for {@link PaneInfo} of {@link 
DataTypes#BinaryType
+   * BinaryType}.
+   */
+  private static Encoder<PaneInfo> paneInfoEncoder() {
+    DataType type = new ObjectType(PaneInfo.class);
+    return EncoderFactory.create(
+        invokeIfNotNull(Utils.class, "paneInfoToBytes", BinaryType, 
rootRef(type, false)),
+        invokeIfNotNull(Utils.class, "paneInfoFromBytes", type, 
rootCol(BinaryType)),
+        PaneInfo.class);
+  }
+
+  /**
+   * Creates a Spark {@link Encoder} for Joda {@link Instant} of {@link 
DataTypes#LongType
+   * LongType}.
+   */
+  private static Encoder<Instant> instantEncoder() {
+    DataType type = new ObjectType(Instant.class);
+    Expression instant = rootRef(type, true);
+    Expression millis = rootCol(LongType);
+    return EncoderFactory.create(
+        nullSafe(instant, invoke(instant, "getMillis", LongType, false)),
+        nullSafe(millis, invoke(Instant.class, "ofEpochMilli", type, millis)),
+        Instant.class);
+  }
+
+  /**
+   * Creates a Spark {@link Encoder} for {@link T} of {@link 
DataTypes#BinaryType BinaryType}
+   * delegating to a Beam {@link Coder} underneath.
+   *
+   * @param coder Beam {@link Coder}
+   * @param nullable If to allow nullable items
+   */
+  private static <T> Encoder<T> binaryEncoder(Coder<T> coder, boolean 
nullable) {
+    Literal litCoder = lit(coder, Coder.class);
+    // T could be private, use OBJECT_TYPE for code generation to not risk an 
IllegalAccessError
+    return EncoderFactory.create(
+        invokeIfNotNull(
+            CoderHelpers.class,
+            "toByteArray",
+            BinaryType,
+            rootRef(OBJECT_TYPE, nullable),
+            litCoder),
+        invokeIfNotNull(
+            CoderHelpers.class, "fromByteArray", OBJECT_TYPE, 
rootCol(BinaryType), litCoder),
+        coder.getEncodedTypeDescriptor().getRawType());
+  }
+
+  private static <T, W extends BoundedWindow> Expression 
serializeWindowedValue(
+      Expression in,
+      Encoder<T> valueEnc,
+      Encoder<Instant> timestampEnc,
+      Encoder<Collection<W>> windowsEnc,
+      Encoder<PaneInfo> paneEnc) {
+    return serializerObject(
+        in,
+        tuple("value", serializeField(in, valueEnc, "getValue")),
+        tuple("timestamp", serializeField(in, timestampEnc, "getTimestamp")),
+        tuple("windows", serializeField(in, windowsEnc, "getWindows")),
+        tuple("paneInfo", serializeField(in, paneEnc, "getPaneInfo")));
+  }
+
+  private static Expression serializerObject(Expression in, Tuple2<String, 
Expression>... fields) {
+    return SerializerBuildHelper.createSerializerForObject(in, seqOf(fields));
+  }
+
+  private static <T, W extends BoundedWindow> Expression 
deserializeWindowedValue(
+      Expression in,
+      Encoder<T> valueEnc,
+      Encoder<Instant> timestampEnc,
+      Encoder<Collection<W>> windowsEnc,
+      Encoder<PaneInfo> paneEnc) {
+    Expression value = deserializeField(in, valueEnc, 0, "value");
+    Expression windows = deserializeField(in, windowsEnc, 2, "windows");
+    Expression timestamp = deserializeField(in, timestampEnc, 1, "timestamp");
+    Expression paneInfo = deserializeField(in, paneEnc, 3, "paneInfo");
+    // set timestamp to end of window (maxTimestamp) if null
+    timestamp =
+        ifNotNull(timestamp, invoke(Utils.class, "maxTimestamp", 
timestamp.dataType(), windows));
+    Expression[] fields = new Expression[] {value, timestamp, windows, 
paneInfo};
+
+    return nullSafe(paneInfo, invoke(WindowedValues.class, "of", 
WINDOWED_VALUE, fields));
+  }
+
+  private static <K, V> Expression serializeMutablePair(
+      Expression in, Encoder<K> enc1, Encoder<V> enc2) {
+    return serializerObject(
+        in,
+        tuple("_1", serializeField(in, enc1, "_1")),
+        tuple("_2", serializeField(in, enc2, "_2")));
+  }
+
+  private static <K, V> Expression deserializeMutablePair(
+      Expression in, Encoder<K> enc1, Encoder<V> enc2) {
+    Expression field1 = deserializeField(in, enc1, 0, "_1");
+    Expression field2 = deserializeField(in, enc2, 1, "_2");
+    return invoke(MutablePair.class, "apply", MUTABLE_PAIR_TYPE, field1, 
field2);
+  }
+
+  private static <K, V> Expression serializeKV(
+      Expression in, Encoder<K> keyEnc, Encoder<V> valueEnc) {
+    return serializerObject(
+        in,
+        tuple("key", serializeField(in, keyEnc, "getKey")),
+        tuple("value", serializeField(in, valueEnc, "getValue")));
+  }
+
+  private static <K, V> Expression deserializeKV(
+      Expression in, Encoder<K> keyEnc, Encoder<V> valueEnc) {
+    Expression key = deserializeField(in, keyEnc, 0, "key");
+    Expression value = deserializeField(in, valueEnc, 1, "value");
+    return invoke(KV.class, "of", KV_TYPE, key, value);
+  }
+
+  public static <T> Expression serializeOneOf(Expression in, List<Encoder<T>> 
encoders) {
+    Expression type = invoke(in, "_1", IntegerType, false);
+    Expression[] args = new Expression[encoders.size() * 2];
+    for (int i = 0; i < encoders.size(); i++) {
+      args[i * 2] = lit(String.valueOf(i));
+      args[i * 2 + 1] = serializeOneOfField(in, type, encoders.get(i), i);
+    }
+    return new CreateNamedStruct(seqOf(args));
+  }
+
+  public static <T> Expression deserializeOneOf(Expression in, 
List<Encoder<T>> encoders) {
+    Expression[] args = new Expression[encoders.size()];
+    for (int i = 0; i < encoders.size(); i++) {
+      args[i] = deserializeOneOfField(in, encoders.get(i), i);
+    }
+    return new Coalesce(seqOf(args));
+  }
+
+  private static <T> Expression serializeOneOfField(
+      Expression in, Expression type, Encoder<T> enc, int typeIdx) {
+    Expression litNull = lit(null, serializedType(enc));
+    Expression value = invoke(in, "_2", deserializedType(enc), false);
+    return new If(new EqualTo(type, lit(typeIdx)), serialize(value, enc), 
litNull);
+  }
+
+  private static <T> Expression deserializeOneOfField(Expression in, 
Encoder<T> enc, int idx) {
+    GetStructField field = new GetStructField(in, idx, Option.empty());
+    Expression litNull = lit(null, TUPLE2_TYPE);
+    Expression newTuple =
+        EncoderFactory.newInstance(Tuple2.class, TUPLE2_TYPE, lit(idx), 
deserialize(field, enc));
+    return new If(new IsNull(field), litNull, newTuple);
+  }
+
+  private static <T> Expression serializeField(Expression in, Encoder<T> enc, 
String getterName) {
+    Expression ref = 
serializer(enc).collect(match(BoundReference.class)).head();
+    return serialize(invoke(in, getterName, ref.dataType(), ref.nullable()), 
enc);
+  }
+
+  private static <T> Expression deserializeField(
+      Expression in, Encoder<T> enc, int idx, String name) {
+    return deserialize(new GetStructField(in, idx, new Some<>(name)), enc);
+  }
+
+  // Note: Currently this doesn't support nullable primitive values
+  private static <K, V> Expression mapSerializer(Expression map, Encoder<K> 
key, Encoder<V> value) {
+    DataType keyType = deserializedType(key);
+    DataType valueType = deserializedType(value);
+    return SerializerBuildHelper.createSerializerForMap(
+        map,
+        new MapElementInformation(keyType, false, e -> serialize(e, key)),
+        new MapElementInformation(valueType, false, e -> serialize(e, value)));
+  }
+
+  private static <MapT extends Map<K, V>, K, V> Expression mapDeserializer(
+      Expression in, Encoder<K> key, Encoder<V> value, Class<MapT> cls) {
+    Preconditions.checkArgument(cls.isAssignableFrom(HashMap.class) || 
cls.equals(TreeMap.class));
+    Expression keys = deserializeSeq(new MapKeys(in), key, false, false);
+    Expression values = deserializeSeq(new MapValues(in), value, false, false);
+    String fn = cls.equals(TreeMap.class) ? "toTreeMap" : "toMap";
+    return invoke(
+        Utils.class, fn, new ObjectType(cls), keys, values, mapItemType(key), 
mapItemType(value));
+  }
+
+  // serialized type for primitive types (avoid boxing!), otherwise the 
deserialized type
+  private static Literal mapItemType(Encoder<?> enc) {
+    return lit(isPrimitiveEnc(enc) ? serializedType(enc) : 
deserializedType(enc), DataType.class);
+  }
+
+  private static <T> Expression serializeSeq(Expression in, Encoder<T> enc, 
boolean nullable) {
+    if (isPrimitiveEnc(enc)) {
+      Expression array = invoke(in, "toArray", new ObjectType(Object[].class), 
false);
+      return SerializerBuildHelper.createSerializerForGenericArray(
+          array, serializedType(enc), nullable);
+    }
+    Expression seq = invoke(Utils.class, "toSeq", new ObjectType(Seq.class), 
in);
+    return MapObjects$.MODULE$.apply(
+        exp -> serialize(exp, enc), seq, deserializedType(enc), nullable, 
Option.empty());
+  }
+
+  private static <T> Expression deserializeSeq(
+      Expression in, Encoder<T> enc, boolean nullable, boolean exposeAsJava) {
+    DataType type = serializedType(enc); // input type is the serializer 
result type
+    if (isPrimitiveEnc(enc)) {
+      // Spark may reuse unsafe array data, if directly exposed it must be 
copied before
+      return exposeAsJava
+          ? invoke(Utils.class, "copyToList", LIST_TYPE, in, lit(type, 
DataType.class))
+          : in;
+    }
+    Option<Class<?>> optCls = exposeAsJava ? Option.apply(List.class) : 
Option.empty();
+    // MapObjects will always copy
+    return MapObjects$.MODULE$.apply(exp -> deserialize(exp, enc), in, type, 
nullable, optCls);
+  }
+
+  private static <T> boolean isPrimitiveEnc(Encoder<T> enc) {
+    return PRIMITIV_TYPES.contains(enc.clsTag().runtimeClass());

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Update reference to renamed constant `PRIMITIVE_TYPES`.
   
   ```suggestion
       return PRIMITIVE_TYPES.contains(enc.clsTag().runtimeClass());
   ```



##########
runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderFactory.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
+
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.emptyList;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.replace;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.seqOf;
+
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal;
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder;
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders;
+import org.apache.spark.sql.catalyst.encoders.AgnosticExpressionPathEncoder;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.expressions.BoundReference;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.expressions.objects.Invoke;
+import org.apache.spark.sql.catalyst.expressions.objects.NewInstance;
+import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.Option;
+import scala.collection.Iterator;
+import scala.collection.immutable.Seq;
+import scala.reflect.ClassTag;
+
+public class EncoderFactory {
+  // Resolve the Scala case-class primary constructor (the one with the most 
parameters).
+  // Constructor ordering returned by Class.getConstructors() is JVM-defined 
and not stable
+  // across Spark versions, so we pick the widest constructor explicitly and 
then dispatch on
+  // parameter count below to pick the right argument shape per Spark version.
+  private static final Constructor<StaticInvoke> STATIC_INVOKE_CONSTRUCTOR =
+      primaryConstructor(StaticInvoke.class);
+
+  private static final Constructor<Invoke> INVOKE_CONSTRUCTOR = 
primaryConstructor(Invoke.class);
+
+  private static final Constructor<NewInstance> NEW_INSTANCE_CONSTRUCTOR =
+      primaryConstructor(NewInstance.class);
+
+  @SuppressWarnings("unchecked")
+  private static <T> Constructor<T> primaryConstructor(Class<T> cls) {
+    Constructor<?>[] ctors = cls.getConstructors();
+    Constructor<?> widest = ctors[0];
+    for (int i = 1; i < ctors.length; i++) {
+      if (ctors[i].getParameterCount() > widest.getParameterCount()) {
+        widest = ctors[i];
+      }
+    }
+    return (Constructor<T>) widest;
+  }
+
+  @SuppressWarnings({"nullness", "unchecked"})
+  static <T> ExpressionEncoder<T> create(
+      Expression serializer, Expression deserializer, Class<? super T> clazz) {
+    AgnosticEncoder<T> agnosticEncoder = new BeamAgnosticEncoder<>(serializer, 
deserializer, clazz);
+    return ExpressionEncoder.apply(agnosticEncoder, serializer, deserializer);
+  }
+
+  /**
+   * An {@link AgnosticEncoder} that implements both {@link 
AgnosticExpressionPathEncoder} (so that
+   * {@code SerializerBuildHelper} / {@code DeserializerBuildHelper} delegate 
to our pre-built
+   * expressions) and {@link AgnosticEncoders.StructEncoder} (so that {@code
+   * Dataset.select(TypedColumn)} creates an N-attribute plan instead of a 
1-attribute wrapped plan,
+   * preventing {@code FIELD_NUMBER_MISMATCH} errors).
+   *
+   * <p>The {@code toCatalyst} / {@code fromCatalyst} methods substitute the 
{@code input}
+   * expression into the pre-built serializer / deserializer via {@code 
transformUp}, so that when
+   * this encoder is nested inside a composite encoder (e.g. {@code 
Encoders.tuple}) the correct
+   * field-level expression is used in place of the root {@code 
BoundReference} / {@code
+   * GetColumnByOrdinal}.
+   */
+  @SuppressWarnings({"nullness", "unchecked", "deprecation"})
+  private static final class BeamAgnosticEncoder<T>
+      implements AgnosticExpressionPathEncoder<T>, 
AgnosticEncoders.StructEncoder<T> {
+
+    private final Expression serializer;
+    private final Expression deserializer;
+    private final Class<? super T> clazz;
+    private final Seq<AgnosticEncoders.EncoderField> encoderFields;
+
+    BeamAgnosticEncoder(Expression serializer, Expression deserializer, 
Class<? super T> clazz) {
+      this.serializer = serializer;
+      this.deserializer = deserializer;
+      this.clazz = clazz;
+      this.encoderFields = buildFields(serializer.dataType());
+    }
+
+    private static Seq<AgnosticEncoders.EncoderField> buildFields(DataType dt) 
{
+      if (dt instanceof StructType) {
+        StructField[] structFields = ((StructType) dt).fields();
+        List<AgnosticEncoders.EncoderField> fields = new 
ArrayList<>(structFields.length);
+        for (StructField sf : structFields) {
+          fields.add(
+              new AgnosticEncoders.EncoderField(
+                  sf.name(),
+                  new FieldEncoder<>(sf.dataType(), sf.nullable()),
+                  sf.nullable(),
+                  sf.metadata(),
+                  Option.empty(),
+                  Option.empty()));
+        }
+        return seqOf(fields.toArray(new AgnosticEncoders.EncoderField[0]));
+      } else {
+        // Non-struct: wrap in a single "value" field so StructEncoder sees 
one field.
+        return seqOf(
+            new AgnosticEncoders.EncoderField(
+                "value",
+                new FieldEncoder<>(dt, true),
+                true,
+                Metadata.empty(),
+                Option.empty(),
+                Option.empty()));
+      }
+    }
+
+    // --- AgnosticExpressionPathEncoder ---
+
+    @Override
+    public Expression toCatalyst(Expression input) {
+      return serializer.transformUp(replace(BoundReference.class, input));
+    }
+
+    @Override
+    public Expression fromCatalyst(Expression input) {
+      return deserializer.transformUp(replace(GetColumnByOrdinal.class, 
input));
+    }
+
+    // --- AgnosticEncoders.StructEncoder ---
+
+    @Override
+    public Seq<AgnosticEncoders.EncoderField> fields() {
+      return encoderFields;
+    }
+
+    @Override
+    public boolean isStruct() {
+      return true;
+    }
+
+    /**
+     * Setter required by the Scala compiler when implementing the {@link
+     * AgnosticEncoders.StructEncoder} trait from Java. Scala traits with 
concrete {@code val}
+     * fields generate a synthetic mangled setter ({@code 
<trait>$_setter_<field>_$eq}) that the
+     * trait's initializer invokes on subclasses. Java cannot declare {@code 
val} fields, so we
+     * implement {@link #isStruct()} directly above and accept-but-ignore the 
trait setter here. The
+     * mangled name is brittle and tied to Spark's Scala source layout — if 
Spark removes the {@code
+     * isStruct} field from {@code StructEncoder}, this method becomes dead 
code; if Spark renames
+     * it, compilation will fail and the new mangled name must be substituted.
+     */
+    @Override
+    public void
+        
org$apache$spark$sql$catalyst$encoders$AgnosticEncoders$StructEncoder$_setter_$isStruct_$eq(
+            boolean v) {
+      // no-op: isStruct() is implemented directly above
+    }
+
+    // --- AgnosticEncoder / Encoder (explicit to resolve default-method 
ambiguity) ---
+
+    @Override
+    public boolean isPrimitive() {
+      return false;
+    }
+
+    @Override
+    public StructType schema() {
+      // Build StructType from fields — mirrors the StructEncoder.schema() 
default.
+      List<StructField> sfs = new ArrayList<>(encoderFields.size());
+      Iterator<AgnosticEncoders.EncoderField> it = encoderFields.iterator();
+      while (it.hasNext()) {
+        sfs.add(it.next().structField());
+      }
+      return new StructType(sfs.toArray(new StructField[0]));
+    }
+
+    @Override
+    public DataType dataType() {
+      return schema();
+    }
+
+    @Override
+    public ClassTag<T> clsTag() {
+      return (ClassTag<T>) ClassTag.apply(clazz);
+    }
+  }
+
+  /**
+   * Minimal {@link AgnosticEncoder} stub used to carry per-field {@link 
DataType} metadata inside
+   * {@link AgnosticEncoders.EncoderField}. The actual serialization / 
deserialization is handled by
+   * {@link BeamAgnosticEncoder#toCatalyst} and {@link 
BeamAgnosticEncoder#fromCatalyst}.
+   */
+  @SuppressWarnings({"nullness", "unchecked"})
+  private static final class FieldEncoder<V> implements AgnosticEncoder<V> {
+    private final DataType fieldDataType;
+    private final boolean fieldNullable;
+
+    FieldEncoder(DataType dataType, boolean nullable) {
+      this.fieldDataType = dataType;
+      this.fieldNullable = nullable;
+    }
+
+    @Override
+    public boolean isPrimitive() {
+      return false;
+    }
+
+    @Override
+    public DataType dataType() {
+      return fieldDataType;
+    }
+
+    @Override
+    public StructType schema() {
+      return new StructType().add("value", fieldDataType, fieldNullable);
+    }
+
+    @Override
+    public boolean nullable() {
+      return fieldNullable;
+    }
+
+    @Override
+    public ClassTag<V> clsTag() {
+      return (ClassTag<V>) ClassTag.apply(Object.class);
+    }
+  }
+
+  /**
+   * Invoke method {@code fun} on Class {@code cls}, immediately propagating 
{@code null} if any
+   * input arg is {@code null}.
+   */
+  static Expression invokeIfNotNull(Class<?> cls, String fun, DataType type, 
Expression... args) {
+    return invoke(cls, fun, type, true, args);
+  }
+
+  /** Invoke method {@code fun} on Class {@code cls}. */
+  static Expression invoke(Class<?> cls, String fun, DataType type, 
Expression... args) {
+    return invoke(cls, fun, type, false, args);
+  }
+
+  private static Expression invoke(
+      Class<?> cls, String fun, DataType type, boolean propagateNull, 
Expression... args) {
+    try {
+      // To address breaking interfaces between various versions of Spark, 
expressions are
+      // created reflectively. This is fine as it's just needed once to create 
the query plan.
+      switch (STATIC_INVOKE_CONSTRUCTOR.getParameterCount()) {
+        case 6:
+          // Spark 3.1.x
+          return STATIC_INVOKE_CONSTRUCTOR.newInstance(
+              cls, type, fun, seqOf(args), propagateNull, true);
+        case 7:
+          // Spark 3.2.0
+          return STATIC_INVOKE_CONSTRUCTOR.newInstance(
+              cls, type, fun, seqOf(args), emptyList(), propagateNull, true);
+        case 8:
+          // Spark 3.2.x, 3.3.x
+          return STATIC_INVOKE_CONSTRUCTOR.newInstance(
+              cls, type, fun, seqOf(args), emptyList(), propagateNull, true, 
true);
+        case 9:
+          // Spark 4.0.x: added Option<ScalarFunction<?>> parameter
+          return STATIC_INVOKE_CONSTRUCTOR.newInstance(
+              cls, type, fun, seqOf(args), emptyList(), propagateNull, true, 
true, Option.empty());
+        default:
+          throw new RuntimeException("Unsupported version of Spark");
+      }
+    } catch (IllegalArgumentException | ReflectiveOperationException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  /** Invoke method {@code fun} on {@code obj} with provided {@code args}. */
+  static Expression invoke(
+      Expression obj, String fun, DataType type, boolean nullable, 
Expression... args) {
+    try {
+      // To address breaking interfaces between various versions of Spark, 
expressions are
+      // created reflectively. This is fine as it's just needed once to create 
the query plan.
+      switch (STATIC_INVOKE_CONSTRUCTOR.getParameterCount()) {
+        case 6:
+          // Spark 3.1.x
+          return INVOKE_CONSTRUCTOR.newInstance(obj, fun, type, seqOf(args), 
false, nullable);
+        case 7:
+          // Spark 3.2.0
+          return INVOKE_CONSTRUCTOR.newInstance(
+              obj, fun, type, seqOf(args), emptyList(), false, nullable);
+        case 8:
+        case 9:
+          // Spark 3.2.x, 3.3.x, 4.0.x: Invoke constructor is 8 params in all 
these versions
+          return INVOKE_CONSTRUCTOR.newInstance(
+              obj, fun, type, seqOf(args), emptyList(), false, nullable, true);
+        default:

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The switch statement here uses 
`STATIC_INVOKE_CONSTRUCTOR.getParameterCount()` to determine the Spark version, 
but then invokes `INVOKE_CONSTRUCTOR`. This is fragile and inconsistent with 
the `newInstance` method below (line 320), which switches on the parameter 
count of the constructor it actually uses. It's safer and clearer to switch on 
`INVOKE_CONSTRUCTOR.getParameterCount()` directly.
   
   ```java
         switch (INVOKE_CONSTRUCTOR.getParameterCount()) {
           case 6:
             // Spark 3.1.x
             return INVOKE_CONSTRUCTOR.newInstance(obj, fun, type, seqOf(args), 
false, nullable);
           case 7:
             // Spark 3.2.0
             return INVOKE_CONSTRUCTOR.newInstance(
                 obj, fun, type, seqOf(args), emptyList(), false, nullable);
           case 8:
             // Spark 3.2.x, 3.3.x, 4.0.x
             return INVOKE_CONSTRUCTOR.newInstance(
                 obj, fun, type, seqOf(args), emptyList(), false, nullable, 
true);
           default:
             throw new RuntimeException("Unsupported version of Spark");
         }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to