gemini-code-assist[bot] commented on code in PR #38868:
URL: https://github.com/apache/beam/pull/38868#discussion_r3398622735
##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRel.java:
##########
@@ -193,11 +194,41 @@ private PCollection<Row> standardJoin(
}
// Flatten the lhs and rhs fields into a single row.
+ FieldAccessDescriptor flattenFields =
+ FieldAccessDescriptor.withFieldNames(
+ org.apache.beam.sdk.schemas.transforms.Join.LHS_TAG + ".*",
+ org.apache.beam.sdk.schemas.transforms.Join.RHS_TAG + ".*");
+
+ // Reconcile the desired output schema (which carries the Calcite-derived
field names and the
+ // correct top-level, outer-join-aware nullability) with the types the
data actually carries.
+ // Calcite's join row-type derivation can report a different nullability
than the rows hold --
+ // in particular it can mark the fields nested inside a struct column as
nullable even when the
+ // joined rows still keep them NOT NULL. Forcing the Calcite schema
verbatim then trips Select's
+ // type-equality guard. The flatten emits the lhs struct's fields followed
by the rhs struct's
+ // fields, so walk the Calcite output positionally against those data
fields, keeping Calcite's
+ // names and top-level nullability but adopting the data's (possibly
deeper) field types.
+ Schema calciteSchema = CalciteUtils.toSchema(getRowType());
+ Schema joinedSchema = joinedRows.getSchema();
+ List<Schema.Field> dataFields = new java.util.ArrayList<>();
+ dataFields.addAll(
+
Preconditions.checkArgumentNotNull(joinedSchema.getField(0).getType().getRowSchema())
+ .getFields());
+ dataFields.addAll(
+
Preconditions.checkArgumentNotNull(joinedSchema.getField(1).getType().getRowSchema())
+ .getFields());
+ Schema.Builder reconciled = Schema.builder();
+ for (int i = 0; i < calciteSchema.getFieldCount(); i++) {
Review Comment:

To prevent an unexpected `IndexOutOfBoundsException` during the loop, it is
safer to defensively check that the field count of `calciteSchema` matches the
size of `dataFields` before iterating.
```suggestion
if (calciteSchema.getFieldCount() != dataFields.size()) {
throw new IllegalStateException(
String.format(
"Field count mismatch: Calcite schema has %d fields, but data
schema has %d fields",
calciteSchema.getFieldCount(), dataFields.size()));
}
Schema.Builder reconciled = Schema.builder();
for (int i = 0; i < calciteSchema.getFieldCount(); i++) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]