yashmayya commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r974127639
##########
connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java:
##########
@@ -24,20 +24,31 @@
/**
* Single message transformation for Kafka Connect record types.
- *
+ * <br/>
* Connectors can be configured with transformations to make lightweight
message-at-a-time modifications.
*/
public interface Transformation<R extends ConnectRecord<R>> extends
Configurable, Closeable {
+ String FIELD_SYNTAX_VERSION_CONFIG = "field.syntax.version";
+ String FIELD_SYNTAX_VERSION_DOC = "Defines the version of the syntax to
access fields. "
+ + "If set to `V1`, then the field paths are limited to access the
elements at the root level of the struct or map."
+ + "If set to `V2`, the syntax will support accessing nested elements.
o access nested elements, "
+ + "dotted notation is used. If dots are already included in the field
name, then backtick pairs "
+ + "can be used to wrap field names containing dots. "
+ + "e.g. to access elements from a struct/map named \"foo.bar\", "
+ + "the following format can be used to access its elements:
\"`foo.bar`.baz\".";
Review Comment:
I think this should be `from a field in a struct/map named ...` instead?
##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java:
##########
@@ -39,33 +41,35 @@
private static final String FIELD_CONFIG = "field";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(FIELD_CONFIG, ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name to
extract.");
+ .define(FIELD_CONFIG, ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name to
extract.")
+ .define(FIELD_SYNTAX_VERSION_CONFIG, ConfigDef.Type.STRING,
FIELD_SYNTAX_VERSION_DEFAULT_VALUE, ConfigDef.Importance.HIGH,
FIELD_SYNTAX_VERSION_DOC);
Review Comment:
Could we add a validator here to ensure that `FIELD_SYNTAX_VERSION_CONFIG`
belongs to `FieldSyntaxVersion.values()` (maybe case-insensitive match if
desired)?
##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a path to a field within a structure within a Connect key/value
(e.g. Struct or
+ * Map<String, Object>).
+ * <ul>
+ * <li>It follows a dotted notation to represent nested values.</li>
+ * <li>If field names contain dots, can be escaped by wrapping field names
with backticks.</li>
+ * <li>If field names contain dots at wrapping positions (beginning or end of
path, before or after dots), then backticks need to be
+ * escaped by backslash.</li>
+ * </ul>
+ * Paths are calculated once and cached for further access.
+ */
+public class FieldPath {
+
+ private static final String BACKTICK = "`";
+ private static final String DOT = ".";
+ public static final char BACKTICK_CHAR = '`';
+ public static final char DOT_CHAR = '.';
+ public static final char BACKSLASH_CHAR = '\\';
+
+ private static final Map<String, FieldPath> PATHS_CACHE = new HashMap<>();
+
+ private final String[] path;
+
+ public static FieldPath ofV1(String field) {
+ return of(field, FieldSyntaxVersion.V1);
+ }
+
+ public static FieldPath ofV2(String field) {
+ return of(field, FieldSyntaxVersion.V2);
+ }
+
+ /**
+ * If version is V2, then paths are cached for further access.
+ *
+ * @param field field path expression
+ * @param version field syntax version
+ */
+ public static FieldPath of(String field, FieldSyntaxVersion version) {
+ if (field == null || field.isEmpty() ||
version.equals(FieldSyntaxVersion.V1)) {
+ return new FieldPath(field, version);
+ } else {
+ if (PATHS_CACHE.containsKey(field)) {
+ return PATHS_CACHE.get(field);
+ } else {
+ final FieldPath fieldPath = new FieldPath(field, version);
+ PATHS_CACHE.put(field, fieldPath);
+ return fieldPath;
+ }
+ }
+ }
+
+ FieldPath(String path, FieldSyntaxVersion version) {
+ if (path == null || path.isEmpty()) { // empty path
+ this.path = new String[] {};
+ } else {
+ switch (version) {
+ case V1: // backward compatibility
+ this.path = new String[] {path};
+ break;
+ case V2:
+ // if no dots or wrapping backticks are used, then return
path with single step
+ if (!path.contains(DOT)
+ && !(path.startsWith(BACKTICK) && path.endsWith(
+ BACKTICK))) {
+ this.path = new String[] {path};
+ } else {
+ // prepare for tracking path steps
+ final List<String> steps = new ArrayList<>();
+ final StringBuilder s = new StringBuilder(
+ path); // avoid creating new string on changes
Review Comment:
```suggestion
final StringBuilder s = new StringBuilder(path); //
avoid creating new string on changes
```
nit: unnecessary newline
##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a path to a field within a structure within a Connect key/value
(e.g. Struct or
+ * Map<String, Object>).
+ * <ul>
+ * <li>It follows a dotted notation to represent nested values.</li>
+ * <li>If field names contain dots, can be escaped by wrapping field names
with backticks.</li>
+ * <li>If field names contain dots at wrapping positions (beginning or end of
path, before or after dots), then backticks need to be
+ * escaped by backslash.</li>
+ * </ul>
+ * Paths are calculated once and cached for further access.
+ */
+public class FieldPath {
+
+ private static final String BACKTICK = "`";
+ private static final String DOT = ".";
+ public static final char BACKTICK_CHAR = '`';
+ public static final char DOT_CHAR = '.';
+ public static final char BACKSLASH_CHAR = '\\';
+
+ private static final Map<String, FieldPath> PATHS_CACHE = new HashMap<>();
+
+ private final String[] path;
+
+ public static FieldPath ofV1(String field) {
+ return of(field, FieldSyntaxVersion.V1);
+ }
+
+ public static FieldPath ofV2(String field) {
+ return of(field, FieldSyntaxVersion.V2);
+ }
+
+ /**
+ * If version is V2, then paths are cached for further access.
+ *
+ * @param field field path expression
+ * @param version field syntax version
+ */
+ public static FieldPath of(String field, FieldSyntaxVersion version) {
+ if (field == null || field.isEmpty() ||
version.equals(FieldSyntaxVersion.V1)) {
+ return new FieldPath(field, version);
+ } else {
+ if (PATHS_CACHE.containsKey(field)) {
+ return PATHS_CACHE.get(field);
+ } else {
+ final FieldPath fieldPath = new FieldPath(field, version);
+ PATHS_CACHE.put(field, fieldPath);
+ return fieldPath;
+ }
+ }
+ }
+
+ FieldPath(String path, FieldSyntaxVersion version) {
+ if (path == null || path.isEmpty()) { // empty path
+ this.path = new String[] {};
+ } else {
+ switch (version) {
+ case V1: // backward compatibility
+ this.path = new String[] {path};
+ break;
+ case V2:
+ // if no dots or wrapping backticks are used, then return
path with single step
+ if (!path.contains(DOT)
+ && !(path.startsWith(BACKTICK) && path.endsWith(
+ BACKTICK))) {
Review Comment:
Why do we need this check?
##########
connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java:
##########
@@ -24,20 +24,31 @@
/**
* Single message transformation for Kafka Connect record types.
- *
+ * <br/>
* Connectors can be configured with transformations to make lightweight
message-at-a-time modifications.
*/
public interface Transformation<R extends ConnectRecord<R>> extends
Configurable, Closeable {
+ String FIELD_SYNTAX_VERSION_CONFIG = "field.syntax.version";
+ String FIELD_SYNTAX_VERSION_DOC = "Defines the version of the syntax to
access fields. "
+ + "If set to `V1`, then the field paths are limited to access the
elements at the root level of the struct or map."
+ + "If set to `V2`, the syntax will support accessing nested elements.
o access nested elements, "
+ + "dotted notation is used. If dots are already included in the field
name, then backtick pairs "
+ + "can be used to wrap field names containing dots. "
+ + "e.g. to access elements from a struct/map named \"foo.bar\", "
+ + "the following format can be used to access its elements:
\"`foo.bar`.baz\".";
+
+ String FIELD_SYNTAX_VERSION_DEFAULT_VALUE = "V1";
Review Comment:
I think this (and the above config too) can be moved to `FieldPath` or
`FieldSyntaxVersion` in the `connect:transforms` subproject since they don't
need to be added to the public interface? And then you can avoid using the
string form "V1" directly here.
Or else, if the intention *is* to add it to the public interface and allow
custom SMT builders to also use this new notation, then should we consider
moving the `FieldPath` and related classes to the `connect:api` subproject?
##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a path to a field within a structure within a Connect key/value
(e.g. Struct or
+ * Map<String, Object>).
+ * <ul>
+ * <li>It follows a dotted notation to represent nested values.</li>
+ * <li>If field names contain dots, can be escaped by wrapping field names
with backticks.</li>
+ * <li>If field names contain dots at wrapping positions (beginning or end of
path, before or after dots), then backticks need to be
Review Comment:
```suggestion
* <li>If field names contain backticks at wrapping positions (beginning or
end of path, before or after dots), then backticks need to be
```
##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a path to a field within a structure within a Connect key/value
(e.g. Struct or
+ * Map<String, Object>).
+ * <ul>
+ * <li>It follows a dotted notation to represent nested values.</li>
+ * <li>If field names contain dots, can be escaped by wrapping field names
with backticks.</li>
+ * <li>If field names contain dots at wrapping positions (beginning or end of
path, before or after dots), then backticks need to be
+ * escaped by backslash.</li>
+ * </ul>
+ * Paths are calculated once and cached for further access.
+ */
+public class FieldPath {
+
+ private static final String BACKTICK = "`";
+ private static final String DOT = ".";
+ public static final char BACKTICK_CHAR = '`';
+ public static final char DOT_CHAR = '.';
+ public static final char BACKSLASH_CHAR = '\\';
+
+ private static final Map<String, FieldPath> PATHS_CACHE = new HashMap<>();
+
+ private final String[] path;
+
+ public static FieldPath ofV1(String field) {
+ return of(field, FieldSyntaxVersion.V1);
+ }
+
+ public static FieldPath ofV2(String field) {
+ return of(field, FieldSyntaxVersion.V2);
+ }
+
+ /**
+ * If version is V2, then paths are cached for further access.
+ *
+ * @param field field path expression
+ * @param version field syntax version
+ */
+ public static FieldPath of(String field, FieldSyntaxVersion version) {
+ if (field == null || field.isEmpty() ||
version.equals(FieldSyntaxVersion.V1)) {
+ return new FieldPath(field, version);
+ } else {
+ if (PATHS_CACHE.containsKey(field)) {
+ return PATHS_CACHE.get(field);
+ } else {
+ final FieldPath fieldPath = new FieldPath(field, version);
+ PATHS_CACHE.put(field, fieldPath);
+ return fieldPath;
+ }
+ }
+ }
+
+ FieldPath(String path, FieldSyntaxVersion version) {
+ if (path == null || path.isEmpty()) { // empty path
+ this.path = new String[] {};
+ } else {
+ switch (version) {
+ case V1: // backward compatibility
+ this.path = new String[] {path};
+ break;
+ case V2:
+ // if no dots or wrapping backticks are used, then return
path with single step
+ if (!path.contains(DOT)
+ && !(path.startsWith(BACKTICK) && path.endsWith(
+ BACKTICK))) {
+ this.path = new String[] {path};
+ } else {
+ // prepare for tracking path steps
+ final List<String> steps = new ArrayList<>();
+ final StringBuilder s = new StringBuilder(
+ path); // avoid creating new string on changes
+
+ while (s.length() > 0) { // until path is traverse
+ // process backtick pair if any
+ if (s.charAt(0) == BACKTICK_CHAR) {
+ s.deleteCharAt(0);
+
+ // find backtick pair
+ int idx = 0;
+ while (idx >= 0) {
+ idx = s.indexOf(BACKTICK, idx);
+ if (idx == -1) {
+ throw new IllegalArgumentException(
+ "Incomplete backtick pair at
[...]`" + s);
+ }
+ if (idx != s.length() - 1) { // non-global
backtick
+ if (s.charAt(idx + 1) != DOT_CHAR
+ || s.charAt(idx - 1)
+ == BACKSLASH_CHAR) { // not
wrapped or escaped
+ idx++; // move index forward and
keep searching
+ } else { // it's end pair
+ steps.add(
+
checkIncompleteBacktickPair(s.substring(0, idx)));
+ s.delete(0, idx + 2); // rm
backtick and dot
+ break;
+ }
+ } else { // global backtick
Review Comment:
Is a "global" backtick referring to a pair of backticks that wrap the whole
field path? I didn't get why that requires special handling?
##########
connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java:
##########
@@ -24,20 +24,31 @@
/**
* Single message transformation for Kafka Connect record types.
- *
+ * <br/>
* Connectors can be configured with transformations to make lightweight
message-at-a-time modifications.
*/
public interface Transformation<R extends ConnectRecord<R>> extends
Configurable, Closeable {
+ String FIELD_SYNTAX_VERSION_CONFIG = "field.syntax.version";
+ String FIELD_SYNTAX_VERSION_DOC = "Defines the version of the syntax to
access fields. "
+ + "If set to `V1`, then the field paths are limited to access the
elements at the root level of the struct or map."
+ + "If set to `V2`, the syntax will support accessing nested elements.
o access nested elements, "
Review Comment:
```suggestion
+ "If set to `V2`, the syntax will support accessing nested
elements. To access nested elements, "
```
##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a path to a field within a structure within a Connect key/value
(e.g. Struct or
+ * Map<String, Object>).
+ * <ul>
+ * <li>It follows a dotted notation to represent nested values.</li>
+ * <li>If field names contain dots, can be escaped by wrapping field names
with backticks.</li>
+ * <li>If field names contain dots at wrapping positions (beginning or end of
path, before or after dots), then backticks need to be
+ * escaped by backslash.</li>
+ * </ul>
+ * Paths are calculated once and cached for further access.
+ */
+public class FieldPath {
+
+ private static final String BACKTICK = "`";
+ private static final String DOT = ".";
+ public static final char BACKTICK_CHAR = '`';
+ public static final char DOT_CHAR = '.';
+ public static final char BACKSLASH_CHAR = '\\';
+
+ private static final Map<String, FieldPath> PATHS_CACHE = new HashMap<>();
+
+ private final String[] path;
+
+ public static FieldPath ofV1(String field) {
+ return of(field, FieldSyntaxVersion.V1);
+ }
+
+ public static FieldPath ofV2(String field) {
+ return of(field, FieldSyntaxVersion.V2);
+ }
Review Comment:
nit: can be package-private if only visible for testing
##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a path to a field within a structure within a Connect key/value
(e.g. Struct or
+ * Map<String, Object>).
+ * <ul>
+ * <li>It follows a dotted notation to represent nested values.</li>
+ * <li>If field names contain dots, can be escaped by wrapping field names
with backticks.</li>
+ * <li>If field names contain dots at wrapping positions (beginning or end of
path, before or after dots), then backticks need to be
+ * escaped by backslash.</li>
+ * </ul>
+ * Paths are calculated once and cached for further access.
+ */
+public class FieldPath {
+
+ private static final String BACKTICK = "`";
+ private static final String DOT = ".";
+ public static final char BACKTICK_CHAR = '`';
+ public static final char DOT_CHAR = '.';
+ public static final char BACKSLASH_CHAR = '\\';
+
+ private static final Map<String, FieldPath> PATHS_CACHE = new HashMap<>();
+
+ private final String[] path;
+
+ public static FieldPath ofV1(String field) {
+ return of(field, FieldSyntaxVersion.V1);
+ }
+
+ public static FieldPath ofV2(String field) {
+ return of(field, FieldSyntaxVersion.V2);
+ }
+
+ /**
+ * If version is V2, then paths are cached for further access.
+ *
+ * @param field field path expression
+ * @param version field syntax version
+ */
+ public static FieldPath of(String field, FieldSyntaxVersion version) {
+ if (field == null || field.isEmpty() ||
version.equals(FieldSyntaxVersion.V1)) {
+ return new FieldPath(field, version);
+ } else {
+ if (PATHS_CACHE.containsKey(field)) {
+ return PATHS_CACHE.get(field);
+ } else {
+ final FieldPath fieldPath = new FieldPath(field, version);
+ PATHS_CACHE.put(field, fieldPath);
+ return fieldPath;
+ }
+ }
+ }
+
+ FieldPath(String path, FieldSyntaxVersion version) {
+ if (path == null || path.isEmpty()) { // empty path
+ this.path = new String[] {};
+ } else {
+ switch (version) {
+ case V1: // backward compatibility
+ this.path = new String[] {path};
+ break;
+ case V2:
+ // if no dots or wrapping backticks are used, then return
path with single step
+ if (!path.contains(DOT)
+ && !(path.startsWith(BACKTICK) && path.endsWith(
+ BACKTICK))) {
+ this.path = new String[] {path};
+ } else {
+ // prepare for tracking path steps
+ final List<String> steps = new ArrayList<>();
+ final StringBuilder s = new StringBuilder(
+ path); // avoid creating new string on changes
+
+ while (s.length() > 0) { // until path is traverse
+ // process backtick pair if any
+ if (s.charAt(0) == BACKTICK_CHAR) {
+ s.deleteCharAt(0);
+
+ // find backtick pair
+ int idx = 0;
+ while (idx >= 0) {
+ idx = s.indexOf(BACKTICK, idx);
+ if (idx == -1) {
+ throw new IllegalArgumentException(
+ "Incomplete backtick pair at
[...]`" + s);
+ }
+ if (idx != s.length() - 1) { // non-global
backtick
+ if (s.charAt(idx + 1) != DOT_CHAR
+ || s.charAt(idx - 1)
+ == BACKSLASH_CHAR) { // not
wrapped or escaped
+ idx++; // move index forward and
keep searching
+ } else { // it's end pair
+ steps.add(
+
checkIncompleteBacktickPair(s.substring(0, idx)));
+ s.delete(0, idx + 2); // rm
backtick and dot
+ break;
+ }
+ } else { // global backtick
+
steps.add(checkIncompleteBacktickPair(s.substring(0, idx)));
+ s.delete(0, s.length());
+ break;
+ }
+ }
+ } else { // process path dots
+ final int atDot = s.indexOf(DOT);
+ if (atDot > 0) { // get step and move forward
+
steps.add(checkIncompleteBacktickPair(s.substring(0, atDot)));
+ s.delete(0, atDot + 1);
+ } else { // add all
+
steps.add(checkIncompleteBacktickPair(s.toString()));
+ s.delete(0, s.length());
+ }
+ }
+ }
+
+ this.path = steps.toArray(new String[0]);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown syntax
version: " + version);
+ }
+ }
+ }
+
+ private String checkIncompleteBacktickPair(String field) {
Review Comment:
Would it be possible to add a doc comment for this method? Also
`checkIncompleteBacktickPair` seems like a misnomer for a method that is
returning a String
##########
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldPathTest.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.junit.jupiter.api.Test;
+
+class FieldPathTest {
+ final static String[] EMPTY_PATH = new String[]{};
+
+ @Test void shouldBuildV1WithDotsAndBacktickPair() {
+ assertArrayEquals(new String[] {"foo.bar.baz"},
FieldPath.ofV1("foo.bar.baz").path());
+ assertArrayEquals(new String[] {"foo.`bar.baz`"},
FieldPath.ofV1("foo.`bar.baz`").path());
+ }
+
+ @Test void shouldBuildV2WithEmptyPath() {
+ assertArrayEquals(EMPTY_PATH, FieldPath.of("",
FieldSyntaxVersion.V2).path());
+ }
+
+ @Test void shouldBuildV2WithNullPath() {
+ assertArrayEquals(EMPTY_PATH, FieldPath.of(null,
FieldSyntaxVersion.V2).path());
+ }
+
+ @Test void shouldBuildV2WithoutDots() {
+ assertArrayEquals(new String[] {"foobarbaz"},
FieldPath.of("foobarbaz", FieldSyntaxVersion.V2).path());
+ }
+ @Test void shouldBuildV2WithoutWrappingBackticks() {
+ assertArrayEquals(new String[] {"foo`bar`baz"},
FieldPath.of("foo`bar`baz", FieldSyntaxVersion.V2).path());
+ }
+
+ @Test void shouldBuildV2WhenIncludesDots() {
+ assertArrayEquals(new String[] {"foo", "bar", "baz"},
FieldPath.of("foo.bar.baz", FieldSyntaxVersion.V2).path());
+ }
+
+ @Test void shouldBuildV2WhenIncludesDotsAndBacktickPair() {
+ assertArrayEquals(new String[] {"foo", "bar.baz"},
FieldPath.of("foo.`bar.baz`", FieldSyntaxVersion.V2).path());
+ assertArrayEquals(new String[] {"foo", "bar", "baz"},
FieldPath.of("foo.`bar`.baz", FieldSyntaxVersion.V2).path());
+ }
+
+ @Test void shouldBuildV2AndIgnoreBackticksThatAreNotWrapping() {
+ assertArrayEquals(new String[] {"foo", "ba`r.baz"},
FieldPath.of("foo.`ba`r.baz`", FieldSyntaxVersion.V2).path());
+ assertArrayEquals(new String[] {"foo", "ba`r", "baz"},
FieldPath.of("foo.ba`r.baz", FieldSyntaxVersion.V2).path());
+ }
+
+ @Test void shouldBuildV2AndEscapeBackticks() {
+ assertArrayEquals(new String[] {"foo", "bar`.`baz"},
FieldPath.of("foo.`bar\\`.\\`baz`", FieldSyntaxVersion.V2).path());
+ assertArrayEquals(new String[] {"foo", "bar\\`.`baz"},
FieldPath.of("foo.`bar\\\\`.\\`baz`", FieldSyntaxVersion.V2).path());
+ }
+
+ @Test void shouldBuildV2WithBackticksWrappingBackticks() {
Review Comment:
The rules defined in the KIP don't seem to include this case (although there
is an example in the subsequent section)?
https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures#KIP821:ConnectTransformssupportfornestedstructures-Rules
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]