rhauch commented on code in PR #11442:
URL: https://github.com/apache/kafka/pull/11442#discussion_r899523942
##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java:
##########
@@ -154,4 +161,14 @@ protected static Object updateSchemaIn(Object keyOrValue,
Schema updatedSchema)
}
return keyOrValue;
}
+
+ private static String buildSchemaName(String namespace, String name) {
+ if (Utils.isBlank(namespace)) {
+ return name;
+ } else if (!Utils.isBlank(name)) {
+ return namespace + "." + name;
Review Comment:
What happens if the `namespace` ends with a `.`? We should probably handle
that case more transparently. Unit tests should verify that
`schema.namespace=a.b.c.` and `schema.namespace=a.b.c` are treated the same.
##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java:
##########
@@ -35,29 +37,34 @@
private static final Logger log =
LoggerFactory.getLogger(SetSchemaMetadata.class);
public static final String OVERVIEW_DOC =
- "Set the schema name, version or both on the record's key (<code>"
+ Key.class.getName() + "</code>)"
- + " or value (<code>" + Value.class.getName() + "</code>)
schema.";
+ "Set the name, namespace and version on the schema of the record's
key " +
+ "(<code>" + Key.class.getName() + "</code>) or value " +
+ "(<code>" + Value.class.getName() + "</code>).";
private interface ConfigName {
String SCHEMA_NAME = "schema.name";
+ String SCHEMA_NAMESPACE = "schema.namespace";
String SCHEMA_VERSION = "schema.version";
}
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(ConfigName.SCHEMA_NAME, ConfigDef.Type.STRING, null,
ConfigDef.Importance.HIGH, "Schema name to set.")
+ .define(ConfigName.SCHEMA_NAMESPACE, ConfigDef.Type.STRING, null,
ConfigDef.Importance.HIGH, "Namespace to append to the schema name.")
Review Comment:
Nit: aren't we _prepending_ the namespace, rather than _appending_ the
namespace?
```suggestion
.define(ConfigName.SCHEMA_NAMESPACE, ConfigDef.Type.STRING,
null, ConfigDef.Importance.HIGH, "Namespace to prepend to the schema name.")
```
##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java:
##########
@@ -154,4 +161,14 @@ protected static Object updateSchemaIn(Object keyOrValue,
Schema updatedSchema)
}
return keyOrValue;
}
+
+ private static String buildSchemaName(String namespace, String name) {
Review Comment:
I know that we're trying to cover all of the cases in the tests that call
the transform, but it'd be good to also have test case methods that check the
different permutations of inputs to this method, including null, empty strings,
and blank strings for names and namespaces, namespaces with and without a
trailing `.`, namespaces with multiple `.` characters, namespaces with leading
and/or trailing whitespaces, etc.
In addition to such tests being easier to write and understand, they help
test all possible combinations or code paths _and_ they help identify expected
behaviors in various scenarios to better catch regressions in the future.
##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java:
##########
@@ -71,7 +78,7 @@ public R apply(R record) {
schema.type(),
schema.isOptional(),
schema.defaultValue(),
- schemaName != null ? schemaName : schema.name(),
+ buildSchemaName(schemaNamespace, schemaName != null ?
schemaName : schema.name()),
Review Comment:
IIUC, this new `buildSchemaName` method matches the same behavior when a
`schema.name` configuration value is provided, but no `schema.namespace` config
value is provided: the name is used no matter what the `schema.name` value is.
For example, a `schema.name=` config value would use the blank (or empty) name
for the schema.
However, if such a user were to simply _add_ a `schema.namespace` config
property and keep the `schema.name` set to a blank or empty string, then this
method throws a `DataException`.
It might be better to try to keep the behavior as close as possible to the
older behavior, except simply use a `schema.namespace` when set to a non-blank
or non-empty string. IOW:
```
protected static String buildSchemaName(String namespace, String name) {
if (Utils.isBlank(namespace)) {
return name;
}
namespace = namespace.trim().replaceAll("[.]$", "");
return namespace + "." + name;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]