wombatu-kun opened a new issue, #16601:
URL: https://github.com/apache/iceberg/issues/16601
### Problem
`KafkaMetadataTransform` stores its per-instance configuration in a `static`
field:
```java
private static RecordAppender recordAppender;
...
@Override
public void configure(Map<String, ?> configs) {
recordAppender = getRecordAppender(configs);
}
```
`recordAppender` captures everything the SMT was configured with: the
metadata field name/prefix, whether metadata is nested vs flattened, and any
external field. Because the field is `static`, it is shared by every
`KafkaMetadataTransform` instance in the worker JVM. Kafka Connect creates a
separate transform instance per use and calls `configure()` on each, so the
last `configure()` call wins for all instances.
### Impact
When two or more `KafkaMetadataTransform` instances are configured
differently in the same worker (for example two connectors that both use the
SMT, or two transform aliases in one chain), every instance emits the field
names and structure of whichever instance was configured last. The output is
silently wrong: records get the wrong metadata field names, or the wrong
nested/flat shape. It is also a thread-safety hazard, since the field is
mutated from each task's `configure()` and read from each task's `apply()`.
Sibling SMTs in the same module (`CopyValue`, `DebeziumTransform`) correctly
hold their configuration in instance fields; `KafkaMetadataTransform` is the
only one using a static field.
### Reproduction
This test fails on current code. The second (last-configured) instance
behaves correctly, but the first instance picks up the second's configuration:
```java
@Test
public void testConfigIsNotSharedAcrossInstances() {
SinkRecord record =
new SinkRecord(
TOPIC, PARTITION, null, null, null, VALUE_MAP, OFFSET, TIMESTAMP,
TimestampType.CREATE_TIME);
try (KafkaMetadataTransform first = new KafkaMetadataTransform();
KafkaMetadataTransform second = new KafkaMetadataTransform()) {
first.configure(ImmutableMap.of("field_name", "aaa"));
second.configure(ImmutableMap.of("field_name", "bbb"));
// the second (last-configured) instance works correctly today: it uses
its own "bbb" prefix
Map<?, ?> secondValue = (Map<?, ?>) second.apply(record).value();
assertThat(secondValue.get("bbb_topic")).isEqualTo(TOPIC);
assertThat(secondValue.get("aaa_topic")).isNull();
// the first instance must also use its own "aaa" prefix, not the
second's "bbb"; today this fails
Map<?, ?> firstValue = (Map<?, ?>) first.apply(record).value();
assertThat(firstValue.get("aaa_topic")).isEqualTo(TOPIC);
assertThat(firstValue.get("bbb_topic")).isNull();
}
}
```
Failure:
```
org.opentest4j.AssertionFailedError:
expected: "topic"
but was: null
```
### Fix
Make `recordAppender` an instance field (remove `static`). All reads are
already in instance methods, so the change is minimal and matches the other
SMTs in the module. There is no measurable cost: the appender is still built
once per instance in `configure()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]