This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8d8c367066f KAFKA-17192 Fix MirrorMaker2 worker config does not pass
config.provi… (#16678)
8d8c367066f is described below
commit 8d8c367066f59d888d6aa2011b840915cd07a3c2
Author: Kondrat Bertalan <[email protected]>
AuthorDate: Wed Jul 31 22:50:22 2024 +0200
KAFKA-17192 Fix MirrorMaker2 worker config does not pass config.provi…
(#16678)
Reviewers: Chris Egerton <[email protected]>
---
.../main/java/org/apache/kafka/common/utils/Utils.java | 17 ++++++++++++++++-
.../apache/kafka/connect/mirror/MirrorMakerConfig.java | 2 +-
.../kafka/connect/mirror/MirrorMakerConfigTest.java | 1 +
3 files changed, 18 insertions(+), 2 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index f2961a8f282..b23a9d72a4f 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -1648,9 +1648,24 @@ public final class Utils {
* @param <V> the type of values stored in the map
*/
public static <V> Map<String, V> entriesWithPrefix(Map<String, V> map,
String prefix, boolean strip) {
+ return entriesWithPrefix(map, prefix, strip, false);
+ }
+
+ /**
+ * Find all key/value pairs whose keys begin with the given prefix,
optionally removing that prefix
+ * from all resulting keys.
+ * @param map the map to filter key/value pairs from
+ * @param prefix the prefix to search keys for
+ * @param strip whether the keys of the returned map should not include
the prefix
+ * @param allowMatchingLength whether to include keys that are exactly the
same length as the prefix
+ * @return a {@link Map} containing a key/value pair for every key/value
pair in the {@code map}
+ * parameter whose key begins with the given {@code prefix}; may be empty,
but never null
+ * @param <V> the type of values stored in the map
+ */
+ public static <V> Map<String, V> entriesWithPrefix(Map<String, V> map,
String prefix, boolean strip, boolean allowMatchingLength) {
Map<String, V> result = new HashMap<>();
for (Map.Entry<String, V> entry : map.entrySet()) {
- if (entry.getKey().startsWith(prefix) && entry.getKey().length() >
prefix.length()) {
+ if (entry.getKey().startsWith(prefix) && (allowMatchingLength ||
entry.getKey().length() > prefix.length())) {
if (strip)
result.put(entry.getKey().substring(prefix.length()),
entry.getValue());
else
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
index 8f9f06f058e..014d9767288 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
@@ -308,7 +308,7 @@ public class MirrorMakerConfig extends AbstractConfig {
}
private Map<String, String> stringsWithPrefix(String prefix) {
- return Utils.entriesWithPrefix(rawProperties, prefix, false);
+ return Utils.entriesWithPrefix(rawProperties, prefix, false, true);
}
static Map<String, String> clusterConfigsWithPrefix(String prefix,
Map<String, String> props) {
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
index 163ebdd4b7b..03f3bd6aaa8 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
@@ -257,6 +257,7 @@ public class MirrorMakerConfigTest {
assertEquals("b->a", aProps.get("client.id"));
assertEquals("123", aProps.get("offset.storage.replication.factor"));
assertEquals("__", aProps.get("replication.policy.separator"));
+ assertEquals("fake", aProps.get("config.providers"));
Map<String, String> bProps = mirrorConfig.workerConfig(b);
assertEquals("a->b", bProps.get("client.id"));
assertEquals("456", bProps.get("status.storage.replication.factor"));