rayhondo opened a new pull request, #16249:
URL: https://github.com/apache/iceberg/pull/16249
## Problem
`TableMetadataCache.schema()` in `DynamicIcebergSink` iterates **all
historical table schemas** when looking for a write target, returning on the
first `SAME` match from `CompareSchemasVisitor`. Because
`CompareSchemasVisitor` matches by name+type (not field IDs), and because older
schemas have lower IDs that are iterated first, the sink deterministically uses
a historical schema's field IDs even when the current table schema has evolved
past it.
## Reproduction
1. Create an Iceberg table with schema `[id, data, extra]` (e.g. a full
Debezium CDC envelope with `op`, `after`, `before`, `source`, `ts_ms`,
`transaction`).
2. Delete columns: `table.updateSchema().deleteColumn("extra").commit()`.
3. Send a `DynamicRecord` whose schema has the same structure as the
original (step 1).
4. `TableMetadataCache` finds schema id=0 (historical, full) as `SAME` and
uses its field IDs.
5. Data is written with old field IDs; the current schema (without `extra`)
cannot resolve them on read → null.
## Impact
Silent data corruption. No exception is thrown. Writes "succeed" but
affected columns return null on read indefinitely. Affects any table where
columns were deleted after initial creation when an upstream producer continues
to emit the pre-delete shape.
## Root cause
`TableMetadataCache.schema()` (`flink/v2.0/.../TableMetadataCache.java`,
`flink/v2.1/...` identical):
```java
for (Map.Entry<Integer, Schema> tableSchema :
cached.tableSchemas.entrySet()) {
Result result = CompareSchemasVisitor.visit(input,
tableSchema.getValue(), ...);
if (result == SAME) {
return new ResolvedSchemaInfo(tableSchema.getValue(), SAME,
identity());
}
...
}
```
`tableSchemas` is populated from `table.schemas()` (all historical schemas,
not just current).
## Fix
Resolve only against the current schema. `CacheItem` now tracks
`currentSchemaId` populated from `table.schema().schemaId()` in `update()`. The
historical iteration is removed — writing with a historical schema's field IDs
is always wrong when the current schema has evolved.
```java
Schema currentSchema = cached.tableSchemas.get(cached.currentSchemaId);
if (currentSchema != null) {
Result result = CompareSchemasVisitor.visit(input, currentSchema,
caseSensitive, dropUnusedColumns);
if (result == SAME) return new ResolvedSchemaInfo(currentSchema, SAME,
identity());
if (result == DATA_CONVERSION_NEEDED) compatible = currentSchema;
// SCHEMA_UPDATE_NEEDED falls through to evolution as before
}
```
Applied identically to `flink/v2.0` and `flink/v2.1`. Not applied to
`flink/v1.20` in this PR (happy to add if reviewers prefer one PR; the bug
exists there too).
## Tests
Added `testHistoricalSchemaDoesNotShadowCurrentAfterColumnDelete` to
`TestTableMetadataCache` in both v2.0 and v2.1. Creates a table with full
schema, deletes a column, then asserts that submitting the original
(pre-delete) schema returns `NOT_FOUND` (i.e. triggers proper schema evolution)
rather than `SAME` against the historical schema.
## Behavior change
Previously, an input schema structurally identical to *any* historical
schema resolved as `SAME`. Now only the current schema can produce `SAME`. The
previous behavior was the bug; any caller depending on it was producing corrupt
writes.
## Versions
Confirmed in 1.10.0 and 1.10.1; `TableMetadataCache` logic is identical. Not
fixed in any released version as of this PR.
## AI-assisted contribution disclosure
Per the [AI-assisted contributions
guidelines](https://iceberg.apache.org/contribute/#guidelines-for-ai-assisted-contributions):
AI assistance was used to draft the patch and tests. The author traced the
root cause end-to-end against the upstream source, verified the fix matches the
documented intent of `TableMetadataCache`, and ran the affected module tests +
spotlessCheck locally. Reviewers may want to focus on whether removing
historical iteration affects any code path I haven't considered (none found in
`iceberg-flink`).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]