This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch branch_10x
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/branch_10x by this push:
new 639a3884983 SOLR-18062: CrossDC - support arbitrary Kafka properties.
(cherry-pick from #4087)
639a3884983 is described below
commit 639a3884983447f80ed3c0295edcd5348f372260
Author: Andrzej BiaĆecki <[email protected]>
AuthorDate: Mon Feb 9 13:58:25 2026 +0100
SOLR-18062: CrossDC - support arbitrary Kafka properties. (cherry-pick from
#4087)
---
changelog/unreleased/solr-18062.yml | 9 +
.../org/apache/solr/crossdc/common/ConfUtil.java | 50 +++
.../apache/solr/crossdc/common/ConfUtilTest.java | 364 +++++++++++++++++++++
.../pages/cross-dc-replication.adoc | 6 +
4 files changed, 429 insertions(+)
diff --git a/changelog/unreleased/solr-18062.yml
b/changelog/unreleased/solr-18062.yml
new file mode 100644
index 00000000000..30ef0381bb8
--- /dev/null
+++ b/changelog/unreleased/solr-18062.yml
@@ -0,0 +1,9 @@
+# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
+title: CrossDC - support arbitrary Kafka properties
+type: added # added, changed, fixed, deprecated, removed, dependency_update,
security, other
+authors:
+ - name: Andrzej Bialecki
+ nick: ab
+links:
+ - name: SOLR-18062
+ url: https://issues.apache.org/jira/browse/SOLR-18062
diff --git
a/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/ConfUtil.java
b/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/ConfUtil.java
index 2fde6a6d270..924f2be802b 100644
---
a/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/ConfUtil.java
+++
b/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/ConfUtil.java
@@ -21,9 +21,11 @@ import static
org.apache.solr.crossdc.common.KafkaCrossDcConf.TOPIC_NAME;
import java.io.ByteArrayInputStream;
import java.lang.invoke.MethodHandles;
+import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.util.SuppressForbidden;
@@ -34,6 +36,9 @@ import org.slf4j.LoggerFactory;
public class ConfUtil {
private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ public static final String KAFKA_ENV_PREFIX = "SOLR_CROSSDC_KAFKA_";
+ public static final String KAFKA_PROP_PREFIX = "solr.crossdc.kafka.";
+
public static void fillProperties(SolrZkClient solrClient, Map<String,
Object> properties) {
// fill in from environment
Map<String, String> env = System.getenv();
@@ -47,6 +52,14 @@ public class ConfUtil {
properties.put(configKey.getKey(), val);
}
}
+ // fill in aux Kafka env with prefix
+ env.forEach(
+ (key, val) -> {
+ if (key.startsWith(KAFKA_ENV_PREFIX)) {
+ properties.put(normalizeKafkaEnvKey(key), val);
+ }
+ });
+
// fill in from system properties
for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
String val = System.getProperty(configKey.getKey());
@@ -54,6 +67,15 @@ public class ConfUtil {
properties.put(configKey.getKey(), val);
}
}
+ // fill in aux Kafka system properties with prefix
+ System.getProperties()
+ .forEach(
+ (key, val) -> {
+ if (key.toString().startsWith(KAFKA_PROP_PREFIX)) {
+ properties.put(normalizeKafkaSysPropKey(key.toString()), val);
+ }
+ });
+
Properties zkProps = new Properties();
if (solrClient != null) {
try {
@@ -90,6 +112,34 @@ public class ConfUtil {
e);
}
}
+ // normalize any left aux properties by stripping prefixes
+ if (!properties.isEmpty()) {
+ Set<String> keys = new HashSet<>(properties.keySet());
+ keys.forEach(
+ key -> {
+ if (key.startsWith(KAFKA_ENV_PREFIX)) {
+ properties.put(normalizeKafkaEnvKey(key),
properties.remove(key));
+ } else if (key.startsWith(KAFKA_PROP_PREFIX)) {
+ properties.put(normalizeKafkaSysPropKey(key),
properties.remove(key));
+ }
+ });
+ }
+ }
+
+ public static String normalizeKafkaEnvKey(String key) {
+ if (key.startsWith(KAFKA_ENV_PREFIX)) {
+ return
key.substring(KAFKA_ENV_PREFIX.length()).toLowerCase(Locale.ROOT).replace('_',
'.');
+ } else {
+ return key;
+ }
+ }
+
+ public static String normalizeKafkaSysPropKey(String key) {
+ if (key.startsWith(KAFKA_PROP_PREFIX)) {
+ return
key.substring(KAFKA_PROP_PREFIX.length()).toLowerCase(Locale.ROOT);
+ } else {
+ return key;
+ }
}
public static void verifyProperties(Map<String, Object> properties) {
diff --git
a/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/common/ConfUtilTest.java
b/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/common/ConfUtilTest.java
new file mode 100644
index 00000000000..49de459b288
--- /dev/null
+++
b/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/common/ConfUtilTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.common;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStreamWriter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Comprehensive unit tests for ConfUtil utility class. Tests configuration
property resolution from
+ * multiple sources: - Environment variables - System properties - ZooKeeper
And validates the
+ * correct priority and handling of custom Kafka properties.
+ */
+public class ConfUtilTest extends SolrTestCaseJ4 {
+
+ @Mock private SolrZkClient mockZkClient;
+
+ private AutoCloseable mocks;
+ private Properties originalSysProps;
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ assumeWorkingMockito();
+ mocks = MockitoAnnotations.openMocks(this);
+
+ // Save original system properties
+ originalSysProps = new Properties();
+ originalSysProps.putAll(System.getProperties());
+ }
+
+ @After
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ // Restore original system properties
+ System.setProperties(originalSysProps);
+
+ if (mocks != null) {
+ mocks.close();
+ }
+ }
+
+ @Test
+ public void testFillProperties_WithStandardConfigKeys() {
+ Map<String, Object> properties = new HashMap<>();
+
+ // Set system properties with standard config keys
+ System.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "localhost:9092");
+ System.setProperty(KafkaCrossDcConf.TOPIC_NAME, "test-topic");
+ System.setProperty(KafkaCrossDcConf.GROUP_ID, "test-group");
+
+ ConfUtil.fillProperties(null, properties);
+
+ assertEquals("localhost:9092",
properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
+ assertEquals("test-topic", properties.get(KafkaCrossDcConf.TOPIC_NAME));
+ assertEquals("test-group", properties.get(KafkaCrossDcConf.GROUP_ID));
+ }
+
+ @Test
+ public void testFillProperties_WithKafkaPrefixSystemProperties() {
+ Map<String, Object> properties = new HashMap<>();
+
+ // Set custom Kafka properties with solr.crossdc.kafka. prefix
+ System.setProperty("solr.crossdc.kafka.max.request.size", "2097152");
+ System.setProperty("solr.crossdc.kafka.compression.type", "gzip");
+ System.setProperty("solr.crossdc.kafka.acks", "all");
+
+ ConfUtil.fillProperties(null, properties);
+
+ // Verify custom Kafka properties are added with correct keys (lowercase,
dots)
+ assertEquals("2097152", properties.get("max.request.size"));
+ assertEquals("gzip", properties.get("compression.type"));
+ assertEquals("all", properties.get("acks"));
+ }
+
+ @Test
+ public void testFillProperties_WithMixedCaseKafkaPrefix() {
+ Map<String, Object> properties = new HashMap<>();
+
+ // Test solr.crossdc.kafka. prefix handles case conversion correctly
+ System.setProperty("solr.crossdc.kafka.SSL.Protocol", "TLSv1.2");
+ System.setProperty("solr.crossdc.kafka.security.protocol", "SSL");
+
+ ConfUtil.fillProperties(null, properties);
+
+ // Properties should be converted to lowercase
+ assertEquals("TLSv1.2", properties.get("ssl.protocol"));
+ assertEquals("SSL", properties.get("security.protocol"));
+ }
+
+ @Test
+ public void testFillProperties_FromZooKeeper() throws Exception {
+ Map<String, Object> properties = new HashMap<>();
+
+ // Mock ZooKeeper data
+ Properties zkProps = new Properties();
+ zkProps.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "zk-kafka:9092");
+ zkProps.setProperty(KafkaCrossDcConf.TOPIC_NAME, "zk-topic");
+ zkProps.setProperty("custom.zk.property", "zk-value");
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ OutputStreamWriter writer = new OutputStreamWriter(baos, "UTF-8");
+ zkProps.store(writer, null);
+ writer.close();
+ byte[] zkData = baos.toByteArray();
+
+ when(mockZkClient.exists(anyString())).thenReturn(true);
+ when(mockZkClient.getData(anyString(), isNull(),
isNull())).thenReturn(zkData);
+
+ ConfUtil.fillProperties(mockZkClient, properties);
+
+ // Verify ZooKeeper properties are loaded
+ assertEquals("zk-kafka:9092",
properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
+ assertEquals("zk-topic", properties.get(KafkaCrossDcConf.TOPIC_NAME));
+ assertEquals("zk-value", properties.get("custom.zk.property"));
+ }
+
+ @Test
+ public void testFillProperties_PriorityOrder() throws Exception {
+ Map<String, Object> properties = new HashMap<>();
+
+ // Set up ZooKeeper with lowest priority
+ Properties zkProps = new Properties();
+ zkProps.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "zk-kafka:9092");
+ zkProps.setProperty(KafkaCrossDcConf.TOPIC_NAME, "zk-topic");
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ OutputStreamWriter writer = new OutputStreamWriter(baos, "UTF-8");
+ zkProps.store(writer, null);
+ writer.close();
+ byte[] zkData = baos.toByteArray();
+
+ when(mockZkClient.exists(anyString())).thenReturn(true);
+ when(mockZkClient.getData(anyString(), isNull(),
isNull())).thenReturn(zkData);
+
+ // Set system property with higher priority
+ System.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "sys-kafka:9092");
+
+ ConfUtil.fillProperties(mockZkClient, properties);
+
+ // System property should override ZooKeeper value
+ assertEquals("sys-kafka:9092",
properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
+ // ZooKeeper value should be used when no system property is set
+ assertEquals("zk-topic", properties.get(KafkaCrossDcConf.TOPIC_NAME));
+ }
+
+ @Test
+ public void testFillProperties_CustomKafkaPropertiesFromSystemProps() {
+ Map<String, Object> properties = new HashMap<>();
+
+ // Set various custom Kafka properties
+ System.setProperty("solr.crossdc.kafka.batch.size", "16384");
+ System.setProperty("solr.crossdc.kafka.linger.ms", "10");
+ System.setProperty("solr.crossdc.kafka.buffer.memory", "33554432");
+ System.setProperty("solr.crossdc.kafka.retries", "3");
+
+ ConfUtil.fillProperties(null, properties);
+
+ // Verify all custom properties are present with correct transformation
+ assertEquals("16384", properties.get("batch.size"));
+ assertEquals("10", properties.get("linger.ms"));
+ assertEquals("33554432", properties.get("buffer.memory"));
+ assertEquals("3", properties.get("retries"));
+ }
+
+ @Test
+ public void testFillProperties_NullZkClient() {
+ Map<String, Object> properties = new HashMap<>();
+
+ System.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "localhost:9092");
+ System.setProperty("solr.crossdc.kafka.compression.type", "snappy");
+
+ // Should not throw exception with null ZK client
+ ConfUtil.fillProperties(null, properties);
+
+ assertEquals("localhost:9092",
properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
+ assertEquals("snappy", properties.get("compression.type"));
+ }
+
+ @Test(expected = SolrException.class)
+ public void testFillProperties_ZkDataNull() throws Exception {
+ Map<String, Object> properties = new HashMap<>();
+
+ when(mockZkClient.exists(anyString())).thenReturn(true);
+ when(mockZkClient.getData(anyString(), isNull(),
isNull())).thenReturn(null);
+
+ // Should throw SolrException when ZK data is null
+ ConfUtil.fillProperties(mockZkClient, properties);
+ }
+
+ @Test(expected = SolrException.class)
+ public void testFillProperties_ZkInterrupted() throws Exception {
+ Map<String, Object> properties = new HashMap<>();
+
+ when(mockZkClient.exists(anyString())).thenReturn(true);
+ when(mockZkClient.getData(anyString(), isNull(), isNull()))
+ .thenThrow(new InterruptedException("Test interrupt"));
+
+ // Should throw SolrException and set interrupt flag
+ ConfUtil.fillProperties(mockZkClient, properties);
+ }
+
+ @Test(expected = SolrException.class)
+ public void testFillProperties_ZkException() throws Exception {
+ Map<String, Object> properties = new HashMap<>();
+
+ when(mockZkClient.exists(anyString())).thenReturn(true);
+ when(mockZkClient.getData(anyString(), isNull(), isNull()))
+ .thenThrow(new RuntimeException("Test exception"));
+
+ // Should throw SolrException wrapping the original exception
+ ConfUtil.fillProperties(mockZkClient, properties);
+ }
+
+ @Test
+ public void testVerifyProperties_ValidConfiguration() {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "localhost:9092");
+ properties.put(KafkaCrossDcConf.TOPIC_NAME, "test-topic");
+
+ // Should not throw exception
+ ConfUtil.verifyProperties(properties);
+ }
+
+ @Test(expected = SolrException.class)
+ public void testVerifyProperties_MissingBootstrapServers() {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(KafkaCrossDcConf.TOPIC_NAME, "test-topic");
+
+ // Should throw SolrException due to missing bootstrapServers
+ ConfUtil.verifyProperties(properties);
+ }
+
+ @Test(expected = SolrException.class)
+ public void testVerifyProperties_MissingTopicName() {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "localhost:9092");
+
+ // Should throw SolrException due to missing topicName
+ ConfUtil.verifyProperties(properties);
+ }
+
+ @Test(expected = SolrException.class)
+ public void testVerifyProperties_BothMissing() {
+ Map<String, Object> properties = new HashMap<>();
+
+ // Should throw SolrException due to missing both required properties
+ ConfUtil.verifyProperties(properties);
+ }
+
+ @Test
+ public void testFillProperties_EmptyProperties() {
+ Map<String, Object> properties = new HashMap<>();
+
+ // No system properties or environment variables set
+ ConfUtil.fillProperties(null, properties);
+
+ // Should complete without error, properties may be empty or contain
defaults
+ assertNotNull(properties);
+ }
+
+ @Test
+ public void testFillProperties_SecurityProperties() throws Exception {
+ Map<String, Object> properties = new HashMap<>();
+
+ // Set security-related properties
+ System.setProperty("solr.crossdc.kafka.ssl.truststore.location",
"/path/to/truststore");
+ System.setProperty("solr.crossdc.kafka.ssl.keystore.location",
"/path/to/keystore");
+ System.setProperty("solr.crossdc.kafka.security.protocol", "SSL");
+
+ ConfUtil.fillProperties(null, properties);
+
+ // Verify security properties are correctly transformed
+ assertEquals("/path/to/truststore",
properties.get("ssl.truststore.location"));
+ assertEquals("/path/to/keystore", properties.get("ssl.keystore.location"));
+ assertEquals("SSL", properties.get("security.protocol"));
+ }
+
+ @Test
+ public void testFillProperties_ComplexScenario() throws Exception {
+ Map<String, Object> properties = new HashMap<>();
+
+ // Create a complex scenario with multiple sources
+ Properties zkProps = new Properties();
+ zkProps.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "zk-kafka:9092");
+ zkProps.setProperty(KafkaCrossDcConf.TOPIC_NAME, "zk-topic");
+ zkProps.setProperty(KafkaCrossDcConf.GROUP_ID, "zk-group");
+ zkProps.setProperty("zk.only.property", "zk-value");
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ OutputStreamWriter writer = new OutputStreamWriter(baos, "UTF-8");
+ zkProps.store(writer, null);
+ writer.close();
+ byte[] zkData = baos.toByteArray();
+
+ when(mockZkClient.exists(anyString())).thenReturn(true);
+ when(mockZkClient.getData(anyString(), isNull(),
isNull())).thenReturn(zkData);
+
+ // Set system properties - should override ZK
+ System.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "sys-kafka:9092");
+ System.setProperty("solr.crossdc.kafka.max.poll.records", "1000");
+ System.setProperty("solr.crossdc.kafka.enable.auto.commit", "false");
+
+ ConfUtil.fillProperties(mockZkClient, properties);
+
+ // Verify priority: system props > ZK
+ assertEquals("sys-kafka:9092",
properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
+ assertEquals("zk-topic", properties.get(KafkaCrossDcConf.TOPIC_NAME));
+ assertEquals("zk-group", properties.get(KafkaCrossDcConf.GROUP_ID));
+
+ // Verify custom Kafka properties from system props
+ assertEquals("1000", properties.get("max.poll.records"));
+ assertEquals("false", properties.get("enable.auto.commit"));
+
+ // Verify ZK-only property
+ assertEquals("zk-value", properties.get("zk.only.property"));
+ }
+
+ // we can't easily modify envvars, test just the key conversion in properties
+ @Test
+ public void testUnderscoreToDotsConversion() {
+ Map<String, Object> properties = new HashMap<>();
+
+ // Test underscore to dot conversion in kafka. prefix env
+ properties.put("SOLR_CROSSDC_KAFKA_MAX_POLL_RECORDS", "500");
+ properties.put("SOLR_CROSSDC_KAFKA_FETCH_MAX_WAIT_MS", "1000");
+
+ ConfUtil.fillProperties(null, properties);
+
+ // Verify underscores are converted to dots
+ assertEquals("500", properties.get("max.poll.records"));
+ assertEquals("1000", properties.get("fetch.max.wait.ms"));
+ }
+}
diff --git
a/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc
b/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc
index ec0e4da1df4..8211aa04e1d 100644
---
a/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc
+++
b/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc
@@ -135,6 +135,12 @@ Optional configuration properties:
`solr.crossdc.expandDbq` _<enum>_ :: If set to `expand` (default) then
Delete-By-Query will be expanded before mirroring into series of Delete-By-Id,
which may help with correct processing of out-of-order requests on the consumer
side.
If set to `none` then Delete-By-Query requests will be mirrored as-is.
+Additional configuration specific to the Kafka client library can be passed
through using either
+environment variables starting with `SOLR_CROSSDC_KAFKA_` or system properties
starting with
+`solr.crossdc.kafka.`. For example, to adjust the `max.poll.records` property
specific to the Kafka client
+library you can set either `SOLR_CROSSDC_KAFKA_MAX_POLL_RECORDS` environment
variable or `solr.crossdc.kafka.max.poll.records`
+system property.
+
=== CrossDC Manager
. Start the Manager process using the included start script at
`solr/cross-dc-manager/bin/cross-dc-manager` (or `cross-dc-manager.cmd` for
Windows).