wombatu-kun opened a new issue, #16601:
URL: https://github.com/apache/iceberg/issues/16601

   ### Problem
   
   `KafkaMetadataTransform` stores its per-instance configuration in a `static` 
field:
   
   ```java
   private static RecordAppender recordAppender;
   ...
   @Override
   public void configure(Map<String, ?> configs) {
     recordAppender = getRecordAppender(configs);
   }
   ```
   
   `recordAppender` captures everything the SMT was configured with: the 
metadata field name/prefix, whether metadata is nested vs flattened, and any 
external field. Because the field is `static`, it is shared by every 
`KafkaMetadataTransform` instance in the worker JVM. Kafka Connect creates a 
separate transform instance per use and calls `configure()` on each, so the 
last `configure()` call wins for all instances.
   
   ### Impact
   
   When two or more `KafkaMetadataTransform` instances are configured 
differently in the same worker (for example two connectors that both use the 
SMT, or two transform aliases in one chain), every instance emits the field 
names and structure of whichever instance was configured last. The output is 
silently wrong: records get the wrong metadata field names, or the wrong 
nested/flat shape. It is also a thread-safety hazard, since the field is 
mutated from each task's `configure()` and read from each task's `apply()`.
   
   Sibling SMTs in the same module (`CopyValue`, `DebeziumTransform`) correctly 
hold their configuration in instance fields; `KafkaMetadataTransform` is the 
only one using a static field.
   
   ### Reproduction
   
   This test fails on current code. The second (last-configured) instance 
behaves correctly, but the first instance picks up the second's configuration:
   
   ```java
   @Test
   public void testConfigIsNotSharedAcrossInstances() {
     SinkRecord record =
         new SinkRecord(
             TOPIC, PARTITION, null, null, null, VALUE_MAP, OFFSET, TIMESTAMP, 
TimestampType.CREATE_TIME);
     try (KafkaMetadataTransform first = new KafkaMetadataTransform();
         KafkaMetadataTransform second = new KafkaMetadataTransform()) {
       first.configure(ImmutableMap.of("field_name", "aaa"));
       second.configure(ImmutableMap.of("field_name", "bbb"));
   
       // the second (last-configured) instance works correctly today: it uses 
its own "bbb" prefix
       Map<?, ?> secondValue = (Map<?, ?>) second.apply(record).value();
       assertThat(secondValue.get("bbb_topic")).isEqualTo(TOPIC);
       assertThat(secondValue.get("aaa_topic")).isNull();
   
       // the first instance must also use its own "aaa" prefix, not the 
second's "bbb"; today this fails
       Map<?, ?> firstValue = (Map<?, ?>) first.apply(record).value();
       assertThat(firstValue.get("aaa_topic")).isEqualTo(TOPIC);
       assertThat(firstValue.get("bbb_topic")).isNull();
     }
   }
   ```
   
   Failure:
   
   ```
   org.opentest4j.AssertionFailedError:
   expected: "topic"
    but was: null
   ```
   
   ### Fix
   
   Make `recordAppender` an instance field (remove `static`). All reads are 
already in instance methods, so the change is minimal and matches the other 
SMTs in the module. There is no measurable cost: the appender is still built 
once per instance in `configure()`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to