This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new d54f49acdbd SOLR-18062: CrossDC - support arbitrary Kafka properties. 
(#4087)
d54f49acdbd is described below

commit d54f49acdbddb2d0cbafe8ce683763dea503d6af
Author: Andrzej BiaƂecki <[email protected]>
AuthorDate: Mon Feb 9 13:40:20 2026 +0100

    SOLR-18062: CrossDC - support arbitrary Kafka properties. (#4087)
---
 changelog/unreleased/solr-18062.yml                |   9 +
 .../org/apache/solr/crossdc/common/ConfUtil.java   |  50 +++
 .../apache/solr/crossdc/common/ConfUtilTest.java   | 364 +++++++++++++++++++++
 .../pages/cross-dc-replication.adoc                |   6 +
 4 files changed, 429 insertions(+)

diff --git a/changelog/unreleased/solr-18062.yml 
b/changelog/unreleased/solr-18062.yml
new file mode 100644
index 00000000000..30ef0381bb8
--- /dev/null
+++ b/changelog/unreleased/solr-18062.yml
@@ -0,0 +1,9 @@
+# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
+title: CrossDC - support arbitrary Kafka properties
+type: added # added, changed, fixed, deprecated, removed, dependency_update, 
security, other
+authors:
+  - name: Andrzej Bialecki
+    nick: ab
+links:
+  - name: SOLR-18062
+    url: https://issues.apache.org/jira/browse/SOLR-18062
diff --git 
a/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/ConfUtil.java 
b/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/ConfUtil.java
index 2fde6a6d270..924f2be802b 100644
--- 
a/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/ConfUtil.java
+++ 
b/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/ConfUtil.java
@@ -21,9 +21,11 @@ import static 
org.apache.solr.crossdc.common.KafkaCrossDcConf.TOPIC_NAME;
 
 import java.io.ByteArrayInputStream;
 import java.lang.invoke.MethodHandles;
+import java.util.HashSet;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.util.SuppressForbidden;
@@ -34,6 +36,9 @@ import org.slf4j.LoggerFactory;
 public class ConfUtil {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  public static final String KAFKA_ENV_PREFIX = "SOLR_CROSSDC_KAFKA_";
+  public static final String KAFKA_PROP_PREFIX = "solr.crossdc.kafka.";
+
   public static void fillProperties(SolrZkClient solrClient, Map<String, 
Object> properties) {
     // fill in from environment
     Map<String, String> env = System.getenv();
@@ -47,6 +52,14 @@ public class ConfUtil {
         properties.put(configKey.getKey(), val);
       }
     }
+    // fill in aux Kafka env with prefix
+    env.forEach(
+        (key, val) -> {
+          if (key.startsWith(KAFKA_ENV_PREFIX)) {
+            properties.put(normalizeKafkaEnvKey(key), val);
+          }
+        });
+
     // fill in from system properties
     for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
       String val = System.getProperty(configKey.getKey());
@@ -54,6 +67,15 @@ public class ConfUtil {
         properties.put(configKey.getKey(), val);
       }
     }
+    // fill in aux Kafka system properties with prefix
+    System.getProperties()
+        .forEach(
+            (key, val) -> {
+              if (key.toString().startsWith(KAFKA_PROP_PREFIX)) {
+                properties.put(normalizeKafkaSysPropKey(key.toString()), val);
+              }
+            });
+
     Properties zkProps = new Properties();
     if (solrClient != null) {
       try {
@@ -90,6 +112,34 @@ public class ConfUtil {
             e);
       }
     }
+    // normalize any left aux properties by stripping prefixes
+    if (!properties.isEmpty()) {
+      Set<String> keys = new HashSet<>(properties.keySet());
+      keys.forEach(
+          key -> {
+            if (key.startsWith(KAFKA_ENV_PREFIX)) {
+              properties.put(normalizeKafkaEnvKey(key), 
properties.remove(key));
+            } else if (key.startsWith(KAFKA_PROP_PREFIX)) {
+              properties.put(normalizeKafkaSysPropKey(key), 
properties.remove(key));
+            }
+          });
+    }
+  }
+
+  public static String normalizeKafkaEnvKey(String key) {
+    if (key.startsWith(KAFKA_ENV_PREFIX)) {
+      return 
key.substring(KAFKA_ENV_PREFIX.length()).toLowerCase(Locale.ROOT).replace('_', 
'.');
+    } else {
+      return key;
+    }
+  }
+
+  public static String normalizeKafkaSysPropKey(String key) {
+    if (key.startsWith(KAFKA_PROP_PREFIX)) {
+      return 
key.substring(KAFKA_PROP_PREFIX.length()).toLowerCase(Locale.ROOT);
+    } else {
+      return key;
+    }
   }
 
   public static void verifyProperties(Map<String, Object> properties) {
diff --git 
a/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/common/ConfUtilTest.java
 
b/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/common/ConfUtilTest.java
new file mode 100644
index 00000000000..49de459b288
--- /dev/null
+++ 
b/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/common/ConfUtilTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.common;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStreamWriter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Comprehensive unit tests for ConfUtil utility class. Tests configuration 
property resolution from
+ * multiple sources: - Environment variables - System properties - ZooKeeper 
And validates the
+ * correct priority and handling of custom Kafka properties.
+ */
+public class ConfUtilTest extends SolrTestCaseJ4 {
+
+  @Mock private SolrZkClient mockZkClient;
+
+  private AutoCloseable mocks;
+  private Properties originalSysProps;
+
+  @Before
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    assumeWorkingMockito();
+    mocks = MockitoAnnotations.openMocks(this);
+
+    // Save original system properties
+    originalSysProps = new Properties();
+    originalSysProps.putAll(System.getProperties());
+  }
+
+  @After
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+    // Restore original system properties
+    System.setProperties(originalSysProps);
+
+    if (mocks != null) {
+      mocks.close();
+    }
+  }
+
+  @Test
+  public void testFillProperties_WithStandardConfigKeys() {
+    Map<String, Object> properties = new HashMap<>();
+
+    // Set system properties with standard config keys
+    System.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "localhost:9092");
+    System.setProperty(KafkaCrossDcConf.TOPIC_NAME, "test-topic");
+    System.setProperty(KafkaCrossDcConf.GROUP_ID, "test-group");
+
+    ConfUtil.fillProperties(null, properties);
+
+    assertEquals("localhost:9092", 
properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
+    assertEquals("test-topic", properties.get(KafkaCrossDcConf.TOPIC_NAME));
+    assertEquals("test-group", properties.get(KafkaCrossDcConf.GROUP_ID));
+  }
+
+  @Test
+  public void testFillProperties_WithKafkaPrefixSystemProperties() {
+    Map<String, Object> properties = new HashMap<>();
+
+    // Set custom Kafka properties with solr.crossdc.kafka. prefix
+    System.setProperty("solr.crossdc.kafka.max.request.size", "2097152");
+    System.setProperty("solr.crossdc.kafka.compression.type", "gzip");
+    System.setProperty("solr.crossdc.kafka.acks", "all");
+
+    ConfUtil.fillProperties(null, properties);
+
+    // Verify custom Kafka properties are added with correct keys (lowercase, 
dots)
+    assertEquals("2097152", properties.get("max.request.size"));
+    assertEquals("gzip", properties.get("compression.type"));
+    assertEquals("all", properties.get("acks"));
+  }
+
+  @Test
+  public void testFillProperties_WithMixedCaseKafkaPrefix() {
+    Map<String, Object> properties = new HashMap<>();
+
+    // Test solr.crossdc.kafka. prefix handles case conversion correctly
+    System.setProperty("solr.crossdc.kafka.SSL.Protocol", "TLSv1.2");
+    System.setProperty("solr.crossdc.kafka.security.protocol", "SSL");
+
+    ConfUtil.fillProperties(null, properties);
+
+    // Properties should be converted to lowercase
+    assertEquals("TLSv1.2", properties.get("ssl.protocol"));
+    assertEquals("SSL", properties.get("security.protocol"));
+  }
+
+  @Test
+  public void testFillProperties_FromZooKeeper() throws Exception {
+    Map<String, Object> properties = new HashMap<>();
+
+    // Mock ZooKeeper data
+    Properties zkProps = new Properties();
+    zkProps.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "zk-kafka:9092");
+    zkProps.setProperty(KafkaCrossDcConf.TOPIC_NAME, "zk-topic");
+    zkProps.setProperty("custom.zk.property", "zk-value");
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    OutputStreamWriter writer = new OutputStreamWriter(baos, "UTF-8");
+    zkProps.store(writer, null);
+    writer.close();
+    byte[] zkData = baos.toByteArray();
+
+    when(mockZkClient.exists(anyString())).thenReturn(true);
+    when(mockZkClient.getData(anyString(), isNull(), 
isNull())).thenReturn(zkData);
+
+    ConfUtil.fillProperties(mockZkClient, properties);
+
+    // Verify ZooKeeper properties are loaded
+    assertEquals("zk-kafka:9092", 
properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
+    assertEquals("zk-topic", properties.get(KafkaCrossDcConf.TOPIC_NAME));
+    assertEquals("zk-value", properties.get("custom.zk.property"));
+  }
+
+  @Test
+  public void testFillProperties_PriorityOrder() throws Exception {
+    Map<String, Object> properties = new HashMap<>();
+
+    // Set up ZooKeeper with lowest priority
+    Properties zkProps = new Properties();
+    zkProps.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "zk-kafka:9092");
+    zkProps.setProperty(KafkaCrossDcConf.TOPIC_NAME, "zk-topic");
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    OutputStreamWriter writer = new OutputStreamWriter(baos, "UTF-8");
+    zkProps.store(writer, null);
+    writer.close();
+    byte[] zkData = baos.toByteArray();
+
+    when(mockZkClient.exists(anyString())).thenReturn(true);
+    when(mockZkClient.getData(anyString(), isNull(), 
isNull())).thenReturn(zkData);
+
+    // Set system property with higher priority
+    System.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "sys-kafka:9092");
+
+    ConfUtil.fillProperties(mockZkClient, properties);
+
+    // System property should override ZooKeeper value
+    assertEquals("sys-kafka:9092", 
properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
+    // ZooKeeper value should be used when no system property is set
+    assertEquals("zk-topic", properties.get(KafkaCrossDcConf.TOPIC_NAME));
+  }
+
+  @Test
+  public void testFillProperties_CustomKafkaPropertiesFromSystemProps() {
+    Map<String, Object> properties = new HashMap<>();
+
+    // Set various custom Kafka properties
+    System.setProperty("solr.crossdc.kafka.batch.size", "16384");
+    System.setProperty("solr.crossdc.kafka.linger.ms", "10");
+    System.setProperty("solr.crossdc.kafka.buffer.memory", "33554432");
+    System.setProperty("solr.crossdc.kafka.retries", "3");
+
+    ConfUtil.fillProperties(null, properties);
+
+    // Verify all custom properties are present with correct transformation
+    assertEquals("16384", properties.get("batch.size"));
+    assertEquals("10", properties.get("linger.ms"));
+    assertEquals("33554432", properties.get("buffer.memory"));
+    assertEquals("3", properties.get("retries"));
+  }
+
+  @Test
+  public void testFillProperties_NullZkClient() {
+    Map<String, Object> properties = new HashMap<>();
+
+    System.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "localhost:9092");
+    System.setProperty("solr.crossdc.kafka.compression.type", "snappy");
+
+    // Should not throw exception with null ZK client
+    ConfUtil.fillProperties(null, properties);
+
+    assertEquals("localhost:9092", 
properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
+    assertEquals("snappy", properties.get("compression.type"));
+  }
+
+  @Test(expected = SolrException.class)
+  public void testFillProperties_ZkDataNull() throws Exception {
+    Map<String, Object> properties = new HashMap<>();
+
+    when(mockZkClient.exists(anyString())).thenReturn(true);
+    when(mockZkClient.getData(anyString(), isNull(), 
isNull())).thenReturn(null);
+
+    // Should throw SolrException when ZK data is null
+    ConfUtil.fillProperties(mockZkClient, properties);
+  }
+
+  @Test(expected = SolrException.class)
+  public void testFillProperties_ZkInterrupted() throws Exception {
+    Map<String, Object> properties = new HashMap<>();
+
+    when(mockZkClient.exists(anyString())).thenReturn(true);
+    when(mockZkClient.getData(anyString(), isNull(), isNull()))
+        .thenThrow(new InterruptedException("Test interrupt"));
+
+    // Should throw SolrException and set interrupt flag
+    ConfUtil.fillProperties(mockZkClient, properties);
+  }
+
+  @Test(expected = SolrException.class)
+  public void testFillProperties_ZkException() throws Exception {
+    Map<String, Object> properties = new HashMap<>();
+
+    when(mockZkClient.exists(anyString())).thenReturn(true);
+    when(mockZkClient.getData(anyString(), isNull(), isNull()))
+        .thenThrow(new RuntimeException("Test exception"));
+
+    // Should throw SolrException wrapping the original exception
+    ConfUtil.fillProperties(mockZkClient, properties);
+  }
+
+  @Test
+  public void testVerifyProperties_ValidConfiguration() {
+    Map<String, Object> properties = new HashMap<>();
+    properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "localhost:9092");
+    properties.put(KafkaCrossDcConf.TOPIC_NAME, "test-topic");
+
+    // Should not throw exception
+    ConfUtil.verifyProperties(properties);
+  }
+
+  @Test(expected = SolrException.class)
+  public void testVerifyProperties_MissingBootstrapServers() {
+    Map<String, Object> properties = new HashMap<>();
+    properties.put(KafkaCrossDcConf.TOPIC_NAME, "test-topic");
+
+    // Should throw SolrException due to missing bootstrapServers
+    ConfUtil.verifyProperties(properties);
+  }
+
+  @Test(expected = SolrException.class)
+  public void testVerifyProperties_MissingTopicName() {
+    Map<String, Object> properties = new HashMap<>();
+    properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "localhost:9092");
+
+    // Should throw SolrException due to missing topicName
+    ConfUtil.verifyProperties(properties);
+  }
+
+  @Test(expected = SolrException.class)
+  public void testVerifyProperties_BothMissing() {
+    Map<String, Object> properties = new HashMap<>();
+
+    // Should throw SolrException due to missing both required properties
+    ConfUtil.verifyProperties(properties);
+  }
+
+  @Test
+  public void testFillProperties_EmptyProperties() {
+    Map<String, Object> properties = new HashMap<>();
+
+    // No system properties or environment variables set
+    ConfUtil.fillProperties(null, properties);
+
+    // Should complete without error, properties may be empty or contain 
defaults
+    assertNotNull(properties);
+  }
+
+  @Test
+  public void testFillProperties_SecurityProperties() throws Exception {
+    Map<String, Object> properties = new HashMap<>();
+
+    // Set security-related properties
+    System.setProperty("solr.crossdc.kafka.ssl.truststore.location", 
"/path/to/truststore");
+    System.setProperty("solr.crossdc.kafka.ssl.keystore.location", 
"/path/to/keystore");
+    System.setProperty("solr.crossdc.kafka.security.protocol", "SSL");
+
+    ConfUtil.fillProperties(null, properties);
+
+    // Verify security properties are correctly transformed
+    assertEquals("/path/to/truststore", 
properties.get("ssl.truststore.location"));
+    assertEquals("/path/to/keystore", properties.get("ssl.keystore.location"));
+    assertEquals("SSL", properties.get("security.protocol"));
+  }
+
+  @Test
+  public void testFillProperties_ComplexScenario() throws Exception {
+    Map<String, Object> properties = new HashMap<>();
+
+    // Create a complex scenario with multiple sources
+    Properties zkProps = new Properties();
+    zkProps.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "zk-kafka:9092");
+    zkProps.setProperty(KafkaCrossDcConf.TOPIC_NAME, "zk-topic");
+    zkProps.setProperty(KafkaCrossDcConf.GROUP_ID, "zk-group");
+    zkProps.setProperty("zk.only.property", "zk-value");
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    OutputStreamWriter writer = new OutputStreamWriter(baos, "UTF-8");
+    zkProps.store(writer, null);
+    writer.close();
+    byte[] zkData = baos.toByteArray();
+
+    when(mockZkClient.exists(anyString())).thenReturn(true);
+    when(mockZkClient.getData(anyString(), isNull(), 
isNull())).thenReturn(zkData);
+
+    // Set system properties - should override ZK
+    System.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "sys-kafka:9092");
+    System.setProperty("solr.crossdc.kafka.max.poll.records", "1000");
+    System.setProperty("solr.crossdc.kafka.enable.auto.commit", "false");
+
+    ConfUtil.fillProperties(mockZkClient, properties);
+
+    // Verify priority: system props > ZK
+    assertEquals("sys-kafka:9092", 
properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
+    assertEquals("zk-topic", properties.get(KafkaCrossDcConf.TOPIC_NAME));
+    assertEquals("zk-group", properties.get(KafkaCrossDcConf.GROUP_ID));
+
+    // Verify custom Kafka properties from system props
+    assertEquals("1000", properties.get("max.poll.records"));
+    assertEquals("false", properties.get("enable.auto.commit"));
+
+    // Verify ZK-only property
+    assertEquals("zk-value", properties.get("zk.only.property"));
+  }
+
+  // we can't easily modify envvars, test just the key conversion in properties
+  @Test
+  public void testUnderscoreToDotsConversion() {
+    Map<String, Object> properties = new HashMap<>();
+
+    // Test underscore to dot conversion in kafka. prefix env
+    properties.put("SOLR_CROSSDC_KAFKA_MAX_POLL_RECORDS", "500");
+    properties.put("SOLR_CROSSDC_KAFKA_FETCH_MAX_WAIT_MS", "1000");
+
+    ConfUtil.fillProperties(null, properties);
+
+    // Verify underscores are converted to dots
+    assertEquals("500", properties.get("max.poll.records"));
+    assertEquals("1000", properties.get("fetch.max.wait.ms"));
+  }
+}
diff --git 
a/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc 
b/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc
index ec0e4da1df4..8211aa04e1d 100644
--- 
a/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc
+++ 
b/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc
@@ -135,6 +135,12 @@ Optional configuration properties:
 `solr.crossdc.expandDbq` _<enum>_ :: If set to `expand` (default) then 
Delete-By-Query will be expanded before mirroring into series of Delete-By-Id, 
which may help with correct processing of out-of-order requests on the consumer 
side.
 If set to `none` then Delete-By-Query requests will be mirrored as-is.
 
+Additional configuration specific to the Kafka client library can be passed 
through using either
+environment variables starting with `SOLR_CROSSDC_KAFKA_` or system properties 
starting with
+`solr.crossdc.kafka.`. For example, to adjust the `max.poll.records` property 
specific to the Kafka client
+library you can set either `SOLR_CROSSDC_KAFKA_MAX_POLL_RECORDS` environment 
variable or `solr.crossdc.kafka.max.poll.records`
+system property.
+
 === CrossDC Manager
 
 . Start the Manager process using the included start script at 
`solr/cross-dc-manager/bin/cross-dc-manager` (or `cross-dc-manager.cmd` for 
Windows).

Reply via email to