This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6e9c039eeaf KAFKA-17097 Add replace.null.with.default configuration to
ValueToKey and ReplaceField (#16571)
6e9c039eeaf is described below
commit 6e9c039eeaffa4591e226dc4da1199664f7ea2cc
Author: PoAn Yang <[email protected]>
AuthorDate: Mon Jul 15 18:12:59 2024 +0800
KAFKA-17097 Add replace.null.with.default configuration to ValueToKey and
ReplaceField (#16571)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/connect/transforms/ReplaceField.java | 9 +++-
.../kafka/connect/transforms/ValueToKey.java | 9 +++-
.../kafka/connect/transforms/ReplaceFieldTest.java | 62 ++++++++++++++++++++++
.../kafka/connect/transforms/ValueToKeyTest.java | 31 +++++++++++
4 files changed, 107 insertions(+), 4 deletions(-)
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
index 92d69f8adc1..12ddfed0b92 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
@@ -54,6 +54,7 @@ public abstract class ReplaceField<R extends
ConnectRecord<R>> implements Transf
String INCLUDE = "include";
String RENAME = "renames";
+ String REPLACE_NULL_WITH_DEFAULT_CONFIG = "replace.null.with.default";
}
public static final ConfigDef CONFIG_DEF = new ConfigDef()
@@ -76,7 +77,9 @@ public abstract class ReplaceField<R extends
ConnectRecord<R>> implements Transf
public String toString() {
return "list of colon-delimited pairs, e.g.
<code>foo:bar,abc:xyz</code>";
}
- }, ConfigDef.Importance.MEDIUM, "Field rename mappings.");
+ }, ConfigDef.Importance.MEDIUM, "Field rename mappings.")
+ .define(ConfigName.REPLACE_NULL_WITH_DEFAULT_CONFIG,
ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM,
+ "Whether to replace fields that have a default value and
that are null to the default value. When set to true, the default value is
used, otherwise null is used.");
private static final String PURPOSE = "field replacement";
@@ -84,6 +87,7 @@ public abstract class ReplaceField<R extends
ConnectRecord<R>> implements Transf
private Set<String> include;
private Map<String, String> renames;
private Map<String, String> reverseRenames;
+ private boolean replaceNullWithDefault;
private Cache<Schema, Schema> schemaUpdateCache;
@@ -103,6 +107,7 @@ public abstract class ReplaceField<R extends
ConnectRecord<R>> implements Transf
include = new HashSet<>(config.getList(ConfigName.INCLUDE));
renames = parseRenameMappings(config.getList(ConfigName.RENAME));
reverseRenames = invert(renames);
+ replaceNullWithDefault =
config.getBoolean(ConfigName.REPLACE_NULL_WITH_DEFAULT_CONFIG);
schemaUpdateCache = new SynchronizedCache<>(new LRUCache<>(16));
}
@@ -180,7 +185,7 @@ public abstract class ReplaceField<R extends
ConnectRecord<R>> implements Transf
final Struct updatedValue = new Struct(updatedSchema);
for (Field field : updatedSchema.fields()) {
- final Object fieldValue = value.get(reverseRenamed(field.name()));
+ final Object fieldValue = replaceNullWithDefault ?
value.get(reverseRenamed(field.name())) :
value.getWithoutDefault(reverseRenamed(field.name()));
updatedValue.put(field.name(), fieldValue);
}
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
index 454ea38f572..24cdec2249a 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
@@ -43,14 +43,18 @@ public class ValueToKey<R extends ConnectRecord<R>>
implements Transformation<R>
public static final String OVERVIEW_DOC = "Replace the record key with a
new key formed from a subset of fields in the record value.";
public static final String FIELDS_CONFIG = "fields";
+ public static final String REPLACE_NULL_WITH_DEFAULT_CONFIG =
"replace.null.with.default";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FIELDS_CONFIG, ConfigDef.Type.LIST,
ConfigDef.NO_DEFAULT_VALUE, new NonEmptyListValidator(),
ConfigDef.Importance.HIGH,
- "Field names on the record value to extract as the record
key.");
+ "Field names on the record value to extract as the record
key.")
+ .define(REPLACE_NULL_WITH_DEFAULT_CONFIG, ConfigDef.Type.BOOLEAN,
true, ConfigDef.Importance.MEDIUM,
+ "Whether to replace fields that have a default value and
that are null to the default value. When set to true, the default value is
used, otherwise null is used.");
private static final String PURPOSE = "copying fields from value to key";
private List<String> fields;
+ private boolean replaceNullWithDefault;
private Cache<Schema, Schema> valueToKeySchemaCache;
@@ -63,6 +67,7 @@ public class ValueToKey<R extends ConnectRecord<R>>
implements Transformation<R>
public void configure(Map<String, ?> configs) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
fields = config.getList(FIELDS_CONFIG);
+ replaceNullWithDefault =
config.getBoolean(REPLACE_NULL_WITH_DEFAULT_CONFIG);
valueToKeySchemaCache = new SynchronizedCache<>(new LRUCache<>(16));
}
@@ -103,7 +108,7 @@ public class ValueToKey<R extends ConnectRecord<R>>
implements Transformation<R>
final Struct key = new Struct(keySchema);
for (String field : fields) {
- key.put(field, value.get(field));
+ key.put(field, replaceNullWithDefault ? value.get(field) :
value.getWithoutDefault(field));
}
return record.newRecord(record.topic(), record.kafkaPartition(),
keySchema, key, value.schema(), value, record.timestamp());
diff --git
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
index 53566cada7c..31c24ca5b8a 100644
---
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
+++
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
@@ -24,16 +24,28 @@ import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.HashMap;
import java.util.Map;
+import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
public class ReplaceFieldTest {
+ private final ReplaceField<SinkRecord> xformKey = new ReplaceField.Key<>();
private final ReplaceField<SinkRecord> xform = new ReplaceField.Value<>();
+ public static Stream<Arguments> data() {
+ return Stream.of(
+ Arguments.of(false, null),
+ Arguments.of(true, 42)
+ );
+ }
+
@AfterEach
public void teardown() {
xform.close();
@@ -176,4 +188,54 @@ public class ReplaceFieldTest {
public void testReplaceFieldVersionRetrievedFromAppInfoParser() {
assertEquals(AppInfoParser.getVersion(), xform.version());
}
+
+ @ParameterizedTest
+ @MethodSource("data")
+ public void testReplaceNullWithDefaultConfigOnValue(boolean
replaceNullWithDefault, Object expectedValue) {
+ final Map<String, String> props = new HashMap<>();
+ props.put("include", "abc");
+ props.put("renames", "abc:optional_with_default");
+ props.put("replace.null.with.default",
String.valueOf(replaceNullWithDefault));
+
+ xform.configure(props);
+
+ final Schema valueSchema = SchemaBuilder.struct()
+ .field("abc",
SchemaBuilder.int32().optional().defaultValue(42).build())
+ .build();
+
+ final Struct value = new Struct(valueSchema).put("abc", null);
+
+ final SinkRecord record = new SinkRecord("test", 0, null, null,
valueSchema, value, 0);
+ final SinkRecord transformedRecord = xform.apply(record);
+
+ final Struct updatedValue = (Struct) transformedRecord.value();
+
+ assertEquals(1, updatedValue.schema().fields().size());
+ assertEquals(expectedValue,
updatedValue.getWithoutDefault("optional_with_default"));
+ }
+
+ @ParameterizedTest
+ @MethodSource("data")
+ public void testReplaceNullWithDefaultConfigOnKey(boolean
replaceNullWithDefault, Object expectedValue) {
+ final Map<String, String> props = new HashMap<>();
+ props.put("include", "abc");
+ props.put("renames", "abc:optional_with_default");
+ props.put("replace.null.with.default",
String.valueOf(replaceNullWithDefault));
+
+ xformKey.configure(props);
+
+ final Schema keySchema = SchemaBuilder.struct()
+ .field("abc",
SchemaBuilder.int32().optional().defaultValue(42).build())
+ .build();
+
+ final Struct key = new Struct(keySchema).put("abc", null);
+
+ final SinkRecord record = new SinkRecord("test", 0, keySchema, key,
null, null, 0);
+ final SinkRecord transformedRecord = xformKey.apply(record);
+
+ final Struct updatedKey = (Struct) transformedRecord.key();
+
+ assertEquals(1, updatedKey.schema().fields().size());
+ assertEquals(expectedValue,
updatedKey.getWithoutDefault("optional_with_default"));
+ }
}
diff --git
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
index a7d032009e5..df528cf518a 100644
---
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
+++
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
@@ -25,9 +25,14 @@ import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -36,6 +41,13 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public class ValueToKeyTest {
private final ValueToKey<SinkRecord> xform = new ValueToKey<>();
+ public static Stream<Arguments> data() {
+ return Stream.of(
+ Arguments.of(false, null),
+ Arguments.of(true, 42)
+ );
+ }
+
@AfterEach
public void teardown() {
xform.close();
@@ -113,4 +125,23 @@ public class ValueToKeyTest {
public void testValueToKeyVersionRetrievedFromAppInfoParser() {
assertEquals(AppInfoParser.getVersion(), xform.version());
}
+
+ @ParameterizedTest
+ @MethodSource("data")
+ public void testReplaceNullWithDefaultConfig(boolean
replaceNullWithDefault, Object expectedValue) {
+ Map<String, Object> config = new HashMap<>();
+ config.put("fields", "optional_with_default");
+ config.put("replace.null.with.default", replaceNullWithDefault);
+ xform.configure(config);
+
+ final Schema valueSchema = SchemaBuilder.struct()
+ .field("optional_with_default",
SchemaBuilder.int32().optional().defaultValue(42).build())
+ .build();
+ final Struct value = new
Struct(valueSchema).put("optional_with_default", null);
+
+ final SinkRecord record = new SinkRecord("", 0, null, null,
valueSchema, value, 0);
+ final SinkRecord transformedRecord = xform.apply(record);
+
+ assertEquals(expectedValue, ((Struct)
transformedRecord.key()).getWithoutDefault("optional_with_default"));
+ }
}