This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6e9c039eeaf KAFKA-17097 Add replace.null.with.default configuration to 
ValueToKey and ReplaceField (#16571)
6e9c039eeaf is described below

commit 6e9c039eeaffa4591e226dc4da1199664f7ea2cc
Author: PoAn Yang <[email protected]>
AuthorDate: Mon Jul 15 18:12:59 2024 +0800

    KAFKA-17097 Add replace.null.with.default configuration to ValueToKey and 
ReplaceField (#16571)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/connect/transforms/ReplaceField.java     |  9 +++-
 .../kafka/connect/transforms/ValueToKey.java       |  9 +++-
 .../kafka/connect/transforms/ReplaceFieldTest.java | 62 ++++++++++++++++++++++
 .../kafka/connect/transforms/ValueToKeyTest.java   | 31 +++++++++++
 4 files changed, 107 insertions(+), 4 deletions(-)

diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
index 92d69f8adc1..12ddfed0b92 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
@@ -54,6 +54,7 @@ public abstract class ReplaceField<R extends 
ConnectRecord<R>> implements Transf
         String INCLUDE = "include";
 
         String RENAME = "renames";
+        String REPLACE_NULL_WITH_DEFAULT_CONFIG = "replace.null.with.default";
     }
 
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
@@ -76,7 +77,9 @@ public abstract class ReplaceField<R extends 
ConnectRecord<R>> implements Transf
                 public String toString() {
                     return "list of colon-delimited pairs, e.g. 
<code>foo:bar,abc:xyz</code>";
                 }
-            }, ConfigDef.Importance.MEDIUM, "Field rename mappings.");
+            }, ConfigDef.Importance.MEDIUM, "Field rename mappings.")
+            .define(ConfigName.REPLACE_NULL_WITH_DEFAULT_CONFIG, 
ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM,
+                    "Whether to replace fields that have a default value and 
that are null to the default value. When set to true, the default value is 
used, otherwise null is used.");
 
     private static final String PURPOSE = "field replacement";
 
@@ -84,6 +87,7 @@ public abstract class ReplaceField<R extends 
ConnectRecord<R>> implements Transf
     private Set<String> include;
     private Map<String, String> renames;
     private Map<String, String> reverseRenames;
+    private boolean replaceNullWithDefault;
 
     private Cache<Schema, Schema> schemaUpdateCache;
 
@@ -103,6 +107,7 @@ public abstract class ReplaceField<R extends 
ConnectRecord<R>> implements Transf
         include = new HashSet<>(config.getList(ConfigName.INCLUDE));
         renames = parseRenameMappings(config.getList(ConfigName.RENAME));
         reverseRenames = invert(renames);
+        replaceNullWithDefault = 
config.getBoolean(ConfigName.REPLACE_NULL_WITH_DEFAULT_CONFIG);
 
         schemaUpdateCache = new SynchronizedCache<>(new LRUCache<>(16));
     }
@@ -180,7 +185,7 @@ public abstract class ReplaceField<R extends 
ConnectRecord<R>> implements Transf
         final Struct updatedValue = new Struct(updatedSchema);
 
         for (Field field : updatedSchema.fields()) {
-            final Object fieldValue = value.get(reverseRenamed(field.name()));
+            final Object fieldValue = replaceNullWithDefault ? 
value.get(reverseRenamed(field.name())) : 
value.getWithoutDefault(reverseRenamed(field.name()));
             updatedValue.put(field.name(), fieldValue);
         }
 
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
index 454ea38f572..24cdec2249a 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
@@ -43,14 +43,18 @@ public class ValueToKey<R extends ConnectRecord<R>> 
implements Transformation<R>
     public static final String OVERVIEW_DOC = "Replace the record key with a 
new key formed from a subset of fields in the record value.";
 
     public static final String FIELDS_CONFIG = "fields";
+    public static final String REPLACE_NULL_WITH_DEFAULT_CONFIG = 
"replace.null.with.default";
 
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
             .define(FIELDS_CONFIG, ConfigDef.Type.LIST, 
ConfigDef.NO_DEFAULT_VALUE, new NonEmptyListValidator(), 
ConfigDef.Importance.HIGH,
-                    "Field names on the record value to extract as the record 
key.");
+                    "Field names on the record value to extract as the record 
key.")
+            .define(REPLACE_NULL_WITH_DEFAULT_CONFIG, ConfigDef.Type.BOOLEAN, 
true, ConfigDef.Importance.MEDIUM,
+                    "Whether to replace fields that have a default value and 
that are null to the default value. When set to true, the default value is 
used, otherwise null is used.");
 
     private static final String PURPOSE = "copying fields from value to key";
 
     private List<String> fields;
+    private boolean replaceNullWithDefault;
 
     private Cache<Schema, Schema> valueToKeySchemaCache;
 
@@ -63,6 +67,7 @@ public class ValueToKey<R extends ConnectRecord<R>> 
implements Transformation<R>
     public void configure(Map<String, ?> configs) {
         final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
         fields = config.getList(FIELDS_CONFIG);
+        replaceNullWithDefault = 
config.getBoolean(REPLACE_NULL_WITH_DEFAULT_CONFIG);
         valueToKeySchemaCache = new SynchronizedCache<>(new LRUCache<>(16));
     }
 
@@ -103,7 +108,7 @@ public class ValueToKey<R extends ConnectRecord<R>> 
implements Transformation<R>
 
         final Struct key = new Struct(keySchema);
         for (String field : fields) {
-            key.put(field, value.get(field));
+            key.put(field, replaceNullWithDefault ? value.get(field) : 
value.getWithoutDefault(field));
         }
 
         return record.newRecord(record.topic(), record.kafkaPartition(), 
keySchema, key, value.schema(), value, record.timestamp());
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
index 53566cada7c..31c24ca5b8a 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
@@ -24,16 +24,28 @@ import org.apache.kafka.connect.sink.SinkRecord;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
 public class ReplaceFieldTest {
+    private final ReplaceField<SinkRecord> xformKey = new ReplaceField.Key<>();
     private final ReplaceField<SinkRecord> xform = new ReplaceField.Value<>();
 
+    public static Stream<Arguments> data() {
+        return Stream.of(
+                Arguments.of(false, null),
+                Arguments.of(true, 42)
+        );
+    }
+
     @AfterEach
     public void teardown() {
         xform.close();
@@ -176,4 +188,54 @@ public class ReplaceFieldTest {
     public void testReplaceFieldVersionRetrievedFromAppInfoParser() {
         assertEquals(AppInfoParser.getVersion(), xform.version());
     }
+
+    @ParameterizedTest
+    @MethodSource("data")
+    public void testReplaceNullWithDefaultConfigOnValue(boolean 
replaceNullWithDefault, Object expectedValue) {
+        final Map<String, String> props = new HashMap<>();
+        props.put("include", "abc");
+        props.put("renames", "abc:optional_with_default");
+        props.put("replace.null.with.default", 
String.valueOf(replaceNullWithDefault));
+
+        xform.configure(props);
+
+        final Schema valueSchema = SchemaBuilder.struct()
+                .field("abc", 
SchemaBuilder.int32().optional().defaultValue(42).build())
+                .build();
+
+        final Struct value = new Struct(valueSchema).put("abc", null);
+
+        final SinkRecord record = new SinkRecord("test", 0, null, null, 
valueSchema, value, 0);
+        final SinkRecord transformedRecord = xform.apply(record);
+
+        final Struct updatedValue = (Struct) transformedRecord.value();
+
+        assertEquals(1, updatedValue.schema().fields().size());
+        assertEquals(expectedValue, 
updatedValue.getWithoutDefault("optional_with_default"));
+    }
+
+    @ParameterizedTest
+    @MethodSource("data")
+    public void testReplaceNullWithDefaultConfigOnKey(boolean 
replaceNullWithDefault, Object expectedValue) {
+        final Map<String, String> props = new HashMap<>();
+        props.put("include", "abc");
+        props.put("renames", "abc:optional_with_default");
+        props.put("replace.null.with.default", 
String.valueOf(replaceNullWithDefault));
+
+        xformKey.configure(props);
+
+        final Schema keySchema = SchemaBuilder.struct()
+                .field("abc", 
SchemaBuilder.int32().optional().defaultValue(42).build())
+                .build();
+
+        final Struct key = new Struct(keySchema).put("abc", null);
+
+        final SinkRecord record = new SinkRecord("test", 0, keySchema, key, 
null, null, 0);
+        final SinkRecord transformedRecord = xformKey.apply(record);
+
+        final Struct updatedKey = (Struct) transformedRecord.key();
+
+        assertEquals(1, updatedKey.schema().fields().size());
+        assertEquals(expectedValue, 
updatedKey.getWithoutDefault("optional_with_default"));
+    }
 }
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
index a7d032009e5..df528cf518a 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
@@ -25,9 +25,14 @@ import org.apache.kafka.connect.sink.SinkRecord;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -36,6 +41,13 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 public class ValueToKeyTest {
     private final ValueToKey<SinkRecord> xform = new ValueToKey<>();
 
+    public static Stream<Arguments> data() {
+        return Stream.of(
+                Arguments.of(false, null),
+                Arguments.of(true, 42)
+        );
+    }
+
     @AfterEach
     public void teardown() {
         xform.close();
@@ -113,4 +125,23 @@ public class ValueToKeyTest {
     public void testValueToKeyVersionRetrievedFromAppInfoParser() {
         assertEquals(AppInfoParser.getVersion(), xform.version());
     }
+
+    @ParameterizedTest
+    @MethodSource("data")
+    public void testReplaceNullWithDefaultConfig(boolean 
replaceNullWithDefault, Object expectedValue) {
+        Map<String, Object> config = new HashMap<>();
+        config.put("fields", "optional_with_default");
+        config.put("replace.null.with.default", replaceNullWithDefault);
+        xform.configure(config);
+
+        final Schema valueSchema = SchemaBuilder.struct()
+                .field("optional_with_default", 
SchemaBuilder.int32().optional().defaultValue(42).build())
+                .build();
+        final Struct value = new 
Struct(valueSchema).put("optional_with_default", null);
+
+        final SinkRecord record = new SinkRecord("", 0, null, null, 
valueSchema, value, 0);
+        final SinkRecord transformedRecord = xform.apply(record);
+
+        assertEquals(expectedValue, ((Struct) 
transformedRecord.key()).getWithoutDefault("optional_with_default"));
+    }
 }

Reply via email to