This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f6504b  Add config & logic to control how updates are collapsed in 
the Consumer. (#97)
0f6504b is described below

commit 0f6504befeabaf916702d67ee06f5942fb99fd56
Author: Andrzej BiaƂecki <[email protected]>
AuthorDate: Mon Dec 18 14:04:05 2023 +0100

    Add config & logic to control how updates are collapsed in the Consumer. 
(#97)
    
    * Set PARTIAL as the default.
---
 CROSSDC.md                                         |  9 ++-
 crossdc-commons/build.gradle                       |  8 +--
 .../apache/solr/crossdc/common/CrossDcConf.java    | 35 ++++++++++++
 .../solr/crossdc/common/KafkaCrossDcConf.java      |  6 ++
 crossdc-consumer/build.gradle                      | 14 ++---
 .../crossdc/consumer/KafkaCrossDcConsumer.java     | 43 +++++++++++++-
 .../crossdc/consumer/KafkaCrossDcConsumerTest.java | 65 +++++++++++++++++++++-
 crossdc-producer/build.gradle                      | 18 +++---
 gradle.properties                                  |  2 +
 9 files changed, 174 insertions(+), 26 deletions(-)

diff --git a/CROSSDC.md b/CROSSDC.md
index a4dc8d3..b17b4ff 100644
--- a/CROSSDC.md
+++ b/CROSSDC.md
@@ -93,7 +93,7 @@ Optional configuration properties:
 - `maxRequestSizeBytes`: (integer) The maximum size of a Kafka queue request 
in bytes - limits the number of requests that will be sent over the queue in a 
single batch.
 - `dqlTopicName`: (string) if not empty then requests that failed processing 
`maxAttempts` times will be sent to a "dead letter queue" topic in Kafka (must 
exist if configured).
 - `mirrorCommits`: (boolean) if "true" then standalone commit requests will be 
mirrored, otherwise they will be processed only locally.
-- `expandDbq`: (enum) if set to "expand" (default) then Delete-By-Query will 
be expanded before mirroring into series of Delete-By-Id, which may help with 
correct processing of out-of-order requests on the consumer side. If set to 
"none" then Delete-By-Query requests will be mirrored as-is.
+- `expandDbq`: (enum) if set to `expand` (default) then Delete-By-Query will 
be expanded before mirroring into series of Delete-By-Id, which may help with 
correct processing of out-of-order requests on the consumer side. If set to 
`none` then Delete-By-Query requests will be mirrored as-is.
 
 #### CrossDC Consumer Application
 
@@ -115,7 +115,9 @@ The required configuration properties are:
 
 Optional configuration properties:
 - `consumerProcessingThreads`: The number of threads used by the consumer to 
concurrently process updates from the Kafka queue.
-- `port`: local port for the API endpoints. Default is 8090.
+- `port`: local port for the API endpoints. Default is `8090`.
+- `collapseUpdates`: (enum) when set to `all` then all incoming update 
requests (up to `maxCollapseRecords`) will be collapsed into a single 
UpdateRequest, as long as their parameters are identical. When set to `partial` 
(default) then only requests without deletions will be collapsed - requests 
with any delete ops will be sent individually in order to preserve ordering of 
updates. When set to `none` the incoming update requests will be sent 
individually without any collapsing. NOTE: req [...]
+- `maxCollapseRecords`: maximum number of incoming update request to collapse 
into a single outgoing request. Default is `500`.
 
 Optional configuration properties used when the consumer must retry by putting 
updates back on the Kafka queue:
 - `batchSizeBytes`: maximum batch size in bytes for the Kafka queue
@@ -138,4 +140,5 @@ To make the Cross DC UpdateProcessor optional in a common 
`solrconfig.xml`, use
 
 ## Limitations
 
-- When `expandDbq` property is set to `expand` (default) then Delete-By-Query 
converts to a series of Delete-By-Id, which can be much less efficient for 
queries matching large numbers of documents. Setting this property to `none` 
results in forwarding a real Delete-By-Query - this reduces the amount of data 
to mirror but may cause different results due to the potential re-ordering of 
failed & re-submitted requests between Consumer and the target Solr.
\ No newline at end of file
+- When `expandDbq` property is set to `expand` (default) then Delete-By-Query 
converts to a series of Delete-By-Id, which can be much less efficient for 
queries matching large numbers of documents. Setting this property to `none` 
results in forwarding a real Delete-By-Query - this reduces the amount of data 
to mirror but may cause different results due to the potential re-ordering of 
failed & re-submitted requests between Consumer and the target Solr.
+- When `collapseUpdates` is set to `all` then multiple requests containing a 
mix of add and delete ops will be collapsed into a single outgoing request. 
This will cause the original ordering of add / delete ops to be lost (because 
Solr processing of an update request always processes all add ops first, and 
only then the delete ops), which may affect the final outcome when some of the 
ops refer to the same document ids.
\ No newline at end of file
diff --git a/crossdc-commons/build.gradle b/crossdc-commons/build.gradle
index b8172dc..4d603bc 100644
--- a/crossdc-commons/build.gradle
+++ b/crossdc-commons/build.gradle
@@ -34,20 +34,20 @@ sourceSets {
 }
 
 dependencies {
-    provided 'org.apache.solr:solr-solrj:8.11.2'
-    implementation 'org.apache.kafka:kafka-clients:3.5.1'
+    provided "org.apache.solr:solr-solrj:${solrVersion}"
+    implementation "org.apache.kafka:kafka-clients:${kafkaVersion}"
     implementation 'com.google.guava:guava:14.0'
     testImplementation 'org.slf4j:slf4j-api:2.0.5'
     testImplementation 'org.hamcrest:hamcrest:2.2'
     testImplementation 'junit:junit:4.13.2'
     testImplementation('org.mockito:mockito-inline:5.2.0')
 
-    testImplementation group: 'org.apache.solr', name: 'solr-core', version: 
'8.11.2', {
+    testImplementation group: 'org.apache.solr', name: 'solr-core', version: 
"${solrVersion}", {
         exclude group: "org.eclipse.jetty", module: "jetty-http"
         exclude group: "org.eclipse.jetty", module: "jetty-server"
         exclude group: "org.eclipse.jetty", module: "jetty-servlet"
     }
-    testImplementation group: 'org.apache.solr', name: 'solr-test-framework', 
version: '8.11.2'
+    testImplementation group: 'org.apache.solr', name: 'solr-test-framework', 
version: "${solrVersion}"
 }
 
 jar.enabled = false
diff --git 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
index 4abe7a8..ca23847 100644
--- 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
+++ 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
@@ -24,9 +24,16 @@ public abstract class CrossDcConf {
     public static final String CROSSDC_PROPERTIES = "/crossdc.properties";
     public static final String ZK_CROSSDC_PROPS_PATH = "zkCrossDcPropsPath";
     public static final String EXPAND_DBQ = "expandDbq";
+    public static final String COLLAPSE_UPDATES = "collapseUpdates";
+    public static final String MAX_COLLAPSE_RECORDS = "maxCollapseRecords";
 
+    /**
+     * Option to expand Delete-By-Query requests on the producer side.
+     */
     public enum ExpandDbq {
+        /** Don't expand DBQs, mirror them as-is. */
         NONE,
+        /** Expand DBQs into multiple Delete-By-Id requests using matching 
documents on the producer side. */
         EXPAND;
 
         private static final Map<String, ExpandDbq> valueMap = new HashMap<>();
@@ -44,4 +51,32 @@ public abstract class CrossDcConf {
             return value != null ? value : defaultValue;
         }
     }
+
+    /**
+     * Option to collapse multiple update requests in the Consumer application 
before sending
+     * to the target Solr.
+     */
+    public enum CollapseUpdates {
+        /** Don't collapse any update requests, send them directly as-is. */
+        NONE,
+        /** Collapse only update requests that don't contain any delete 
operations. */
+        PARTIAL,
+        /** Always collapse update requests, as long as they have the same 
parameters. */
+        ALL;
+
+        private static final Map<String, CollapseUpdates> valueMap = new 
HashMap<>();
+        static {
+            for (CollapseUpdates value : values()) {
+                valueMap.put(value.name().toUpperCase(Locale.ROOT), value);
+            }
+        }
+
+        public static CollapseUpdates getOrDefault(String strValue, 
CollapseUpdates defaultValue) {
+            if (strValue == null || strValue.isBlank()) {
+                return defaultValue;
+            }
+            CollapseUpdates value = 
valueMap.get(strValue.toUpperCase(Locale.ROOT));
+            return value != null ? value : defaultValue;
+        }
+    }
 }
diff --git 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
index f94166e..9e6d45d 100644
--- 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
+++ 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -65,6 +65,10 @@ public class KafkaCrossDcConf extends CrossDcConf {
 
   private static final String DEFAULT_EXPAND_DBQ = ExpandDbq.EXPAND.name();
 
+  private static final String DEFAULT_COLLAPSE_UPDATES = 
CollapseUpdates.PARTIAL.name();
+
+  private static final String DEFAULT_MAX_COLLAPSE_RECORDS = "500";
+
   public static final String TOPIC_NAME = "topicName";
 
   public static final String DLQ_TOPIC_NAME = "dlqTopicName";
@@ -164,6 +168,8 @@ public class KafkaCrossDcConf extends CrossDcConf {
             new ConfigProperty(MIRROR_COLLECTIONS, DEFAULT_MIRROR_COLLECTIONS),
             new ConfigProperty(MIRROR_COMMITS, DEFAULT_MIRROR_COMMITS),
             new ConfigProperty(EXPAND_DBQ, DEFAULT_EXPAND_DBQ),
+            new ConfigProperty(COLLAPSE_UPDATES, DEFAULT_COLLAPSE_UPDATES),
+            new ConfigProperty(MAX_COLLAPSE_RECORDS, 
DEFAULT_MAX_COLLAPSE_RECORDS),
 
             new ConfigProperty(MAX_PARTITION_FETCH_BYTES, 
DEFAULT_MAX_PARTITION_FETCH_BYTES),
             new ConfigProperty(MAX_POLL_RECORDS, DEFAULT_MAX_POLL_RECORDS),
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/build.gradle
index 6cdbc04..f4109cf 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/build.gradle
@@ -29,7 +29,7 @@ application {
 }
 
 dependencies {
-    implementation group: 'org.apache.solr', name: 'solr-solrj', version: 
'8.11.2'
+    implementation group: 'org.apache.solr', name: 'solr-solrj', version: 
"${solrVersion}"
     implementation project(path: ':crossdc-commons', configuration: 'shadow')
 
     implementation 'io.dropwizard.metrics:metrics-core:4.2.17'
@@ -50,21 +50,21 @@ dependencies {
 
     testImplementation  project(':crossdc-producer')
 
-    testImplementation group: 'org.apache.solr', name: 'solr-core', version: 
'8.11.2', {
+    testImplementation group: 'org.apache.solr', name: 'solr-core', version: 
"${solrVersion}", {
         exclude group: "org.apache.logging.log4j", module: "*"
         exclude group: "org.slf4j", module: "*"
         exclude group: "org.eclipse.jetty", module: "jetty-http"
         exclude group: "org.eclipse.jetty", module: "jetty-server"
         exclude group: "org.eclipse.jetty", module: "jetty-servlet"
     }
-    testImplementation group: 'org.apache.solr', name: 'solr-test-framework', 
version: '8.11.2', {
+    testImplementation group: 'org.apache.solr', name: 'solr-test-framework', 
version: "${solrVersion}", {
         exclude group: "org.apache.logging.log4j", module: "*"
         exclude group: "org.slf4j", module: "*"
     }
-    implementation 'org.apache.kafka:kafka_2.13:3.5.1'
-    implementation 'org.apache.kafka:kafka-streams:3.5.1'
-    testImplementation 'org.apache.kafka:kafka_2.13:3.5.1:test'
-    testImplementation 'org.apache.kafka:kafka-streams:3.5.1:test'
+    implementation "org.apache.kafka:kafka_2.13:${kafkaVersion}"
+    implementation "org.apache.kafka:kafka-streams:${kafkaVersion}"
+    testImplementation "org.apache.kafka:kafka_2.13:${kafkaVersion}:test"
+    testImplementation "org.apache.kafka:kafka-streams:${kafkaVersion}:test"
 }
 
 test {
diff --git 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index c226a45..d5e8bf8 100644
--- 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -46,6 +46,8 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
   private final static int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 5000;
   private final String[] topicNames;
   private final int maxAttempts;
+  private final CrossDcConf.CollapseUpdates collapseUpdates;
+  private final int maxCollapseRecords;
   private final SolrMessageProcessor messageProcessor;
 
   protected final CloudSolrClient solrClient;
@@ -71,6 +73,8 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
 
     this.topicNames = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",");
     this.maxAttempts = conf.getInt(KafkaCrossDcConf.MAX_ATTEMPTS);
+    this.collapseUpdates = 
CrossDcConf.CollapseUpdates.getOrDefault(conf.get(CrossDcConf.COLLAPSE_UPDATES),
 CrossDcConf.CollapseUpdates.PARTIAL);
+    this.maxCollapseRecords = 
conf.getInt(KafkaCrossDcConf.MAX_COLLAPSE_RECORDS);
     this.startLatch = startLatch;
     final Properties kafkaConsumerProps = new Properties();
 
@@ -193,6 +197,7 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
       }
 
       UpdateRequest updateReqBatch = null;
+      int currentCollapsed = 0;
 
       ConsumerRecord<String,MirroredSolrRequest> lastRecord = null;
 
@@ -222,10 +227,28 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
               log.trace("-- picked type={}, params={}", req.getType(), params);
             }
 
+            // determine if it's an UPDATE with deletes, or if the existing 
batch has deletes
+            boolean hasDeletes = false;
+            if (type == MirroredSolrRequest.Type.UPDATE) {
+              UpdateRequest ureq = (UpdateRequest) solrReq;
+              hasDeletes = hasDeletes(ureq) || hasDeletes(updateReqBatch);
+            }
+
             // it's an update but with different params
-            if (type == MirroredSolrRequest.Type.UPDATE && lastUpdateParams != 
null && !lastUpdateParams.toNamedList().equals(params.toNamedList())) {
+            if (type == MirroredSolrRequest.Type.UPDATE &&
+                  (
+                    // different params
+                    (lastUpdateParams != null && 
!lastUpdateParams.toNamedList().equals(params.toNamedList())) ||
+                    // no collapsing
+                    (collapseUpdates == CrossDcConf.CollapseUpdates.NONE) ||
+                    // partial collapsing but has deletes
+                    (collapseUpdates == CrossDcConf.CollapseUpdates.PARTIAL && 
hasDeletes) ||
+                    // too many collapsed - emit
+                    currentCollapsed >= maxCollapseRecords
+                  )
+                ) {
               if (log.isTraceEnabled()) {
-                log.trace("SolrParams have changed, starting new 
UpdateRequest, params={}", params);
+                log.trace("Starting new UpdateRequest, params={}", params);
               }
               // send previous batch, if any
               if (updateReqBatch != null) {
@@ -233,6 +256,7 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
               }
               updateReqBatch = null;
               lastUpdateParamsAsNamedList = null;
+              currentCollapsed = 0;
               workUnit = new PartitionManager.WorkUnit(partition);
               workUnit.nextOffset = 
PartitionManager.getOffsetForPartition(partitionRecords);
               partitionWork.partitionQueue.add(workUnit);
@@ -244,7 +268,14 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
                 // just initialize
                 updateReqBatch = new UpdateRequest();
               } else {
+                if (collapseUpdates == CrossDcConf.CollapseUpdates.NONE) {
+                  throw new RuntimeException("Can't collapse requests.");
+                }
+                if (collapseUpdates == CrossDcConf.CollapseUpdates.PARTIAL && 
hasDeletes) {
+                  throw new RuntimeException("Can't collapse requests with 
deletions.");
+                }
                 metrics.counter(MetricRegistry.name(type.name(), 
"collapsed")).inc();
+                currentCollapsed++;
               }
               UpdateRequest update = (UpdateRequest) solrReq;
               MirroredSolrRequest.setParams(updateReqBatch, params);
@@ -327,6 +358,14 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
     return true;
   }
 
+  private boolean hasDeletes(UpdateRequest ureq) {
+    if (ureq == null) {
+      return false;
+    }
+    return (ureq.getDeleteByIdMap() != null && 
!ureq.getDeleteByIdMap().isEmpty()) ||
+        (ureq.getDeleteQuery() != null && !ureq.getDeleteQuery().isEmpty());
+  }
+
   public void sendBatch(SolrRequest solrReqBatch, MirroredSolrRequest.Type 
type, ConsumerRecord<String, MirroredSolrRequest> lastRecord, 
PartitionManager.WorkUnit workUnit) {
     SolrRequest finalSolrReqBatch = solrReqBatch;
     // Kafka client is not thread-safe !!!
diff --git 
a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
 
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
index 77632f6..6686b67 100644
--- 
a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
+++ 
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
@@ -12,6 +12,7 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.Utils;
+import org.apache.solr.crossdc.common.CrossDcConf;
 import org.apache.solr.crossdc.common.IQueueHandler;
 import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.common.KafkaMirroringSink;
@@ -76,10 +77,15 @@ public class KafkaCrossDcConsumerTest {
                 };
     }
 
-    private static KafkaCrossDcConf testCrossDCConf() {
+    private static KafkaCrossDcConf testCrossDCConf(String... keyValues) {
         Map config = new HashMap<>();
         config.put(KafkaCrossDcConf.TOPIC_NAME, "topic1");
         config.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "localhost:9092");
+        if (keyValues != null) {
+            for (int i = 0; i < keyValues.length; i += 2) {
+                config.put(keyValues[i], keyValues[i + 1]);
+            }
+        }
         return new KafkaCrossDcConf(config);
     }
 
@@ -260,6 +266,63 @@ public class KafkaCrossDcConsumerTest {
         verify(spyConsumer, times(1)).sendBatch(any(), 
eq(MirroredSolrRequest.Type.UPDATE), eq(record2), any());
     }
 
+    @Test
+    public void testCollapseUpdatesNONE() {
+        int NUM_REQS = 100;
+        doTestCollapseUpdates(CrossDcConf.CollapseUpdates.NONE, 500, NUM_REQS, 
NUM_REQS, 0);
+    }
+
+    @Test
+    public void testCollapseUpdatesALL() {
+        int NUM_REQS = 100;
+        doTestCollapseUpdates(CrossDcConf.CollapseUpdates.ALL, 500, NUM_REQS, 
1, 5);
+    }
+
+    @Test
+    public void testCollapseUpdatesALLMaxCollapse() {
+        int NUM_REQS = 100;
+        doTestCollapseUpdates(CrossDcConf.CollapseUpdates.ALL, 50, NUM_REQS, 
2, 5);
+    }
+
+    @Test
+    public void testCollapseUpdatesPARTIAL() {
+        int NUM_REQS = 100;
+        doTestCollapseUpdates(CrossDcConf.CollapseUpdates.PARTIAL, 500, 
NUM_REQS, 10, 5);
+    }
+
+    private void doTestCollapseUpdates(CrossDcConf.CollapseUpdates 
collapseUpdates, int maxCollapseRecords, int inputReqs, int outputReqs, int 
reqsWithDeletes) {
+        KafkaConsumer<String, MirroredSolrRequest> mockConsumer = 
mock(KafkaConsumer.class);
+        // override
+        conf = testCrossDCConf(
+            CrossDcConf.COLLAPSE_UPDATES, collapseUpdates.name(),
+            CrossDcConf.MAX_COLLAPSE_RECORDS, 
String.valueOf(maxCollapseRecords));
+        KafkaCrossDcConsumer spyConsumer = 
createCrossDcConsumerSpy(mockConsumer);
+        doReturn(new 
IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED)).when(messageProcessorMock).handleItem(any());
+        List<ConsumerRecord<String, MirroredSolrRequest>> records = new 
ArrayList<>();
+        for (int i = 0; i < inputReqs; i++) {
+            SolrInputDocument doc = new SolrInputDocument();
+            doc.addField("id", "id-" + i);
+            UpdateRequest validRequest = new UpdateRequest();
+            validRequest.add(doc);
+            if ((i % 3) == 0 && reqsWithDeletes > 0) {
+                validRequest.deleteById("fakeId-" + i);
+                reqsWithDeletes--;
+            }
+            // Create a valid MirroredSolrRequest
+            ConsumerRecord<String, MirroredSolrRequest> record = new 
ConsumerRecord<>("test-topic", 0, 0, "key", new 
MirroredSolrRequest(validRequest));
+            records.add(record);
+        }
+        ConsumerRecords<String, MirroredSolrRequest> consumerRecords = new 
ConsumerRecords<>(Collections.singletonMap(new TopicPartition("test-topic", 0), 
records));
+
+        
when(mockConsumer.poll(any())).thenReturn(consumerRecords).thenThrow(new 
WakeupException());
+
+        spyConsumer.run();
+
+        // Verify that the valid MirroredSolrRequest was processed.
+        verify(spyConsumer, times(outputReqs)).sendBatch(any(), 
eq(MirroredSolrRequest.Type.UPDATE), any(), any());
+    }
+
+
     @Test
     public void testHandleInvalidMirroredSolrRequest() {
         KafkaConsumer<String, MirroredSolrRequest> mockConsumer = 
mock(KafkaConsumer.class);
diff --git a/crossdc-producer/build.gradle b/crossdc-producer/build.gradle
index 3371fc5..87b2a06 100644
--- a/crossdc-producer/build.gradle
+++ b/crossdc-producer/build.gradle
@@ -37,21 +37,21 @@ dependencies {
     implementation project(':crossdc-consumer')
     implementation project(path: ':crossdc-commons', configuration: 'shadow')
 
-    provided  group: 'org.apache.solr', name: 'solr-core', version: '8.11.2'
+    provided  group: 'org.apache.solr', name: 'solr-core', version: 
"${solrVersion}"
 
     testImplementation 'org.hamcrest:hamcrest:2.2'
     testImplementation 'junit:junit:4.13.2'
     testImplementation('org.mockito:mockito-inline:5.1.1')
-    testImplementation group: 'org.apache.solr', name: 'solr-core', version: 
'8.11.2'
-    testImplementation group: 'org.apache.solr', name: 'solr-test-framework', 
version: '8.11.2'
+    testImplementation group: 'org.apache.solr', name: 'solr-core', version: 
"${solrVersion}"
+    testImplementation group: 'org.apache.solr', name: 'solr-test-framework', 
version: "${solrVersion}"
 
     testImplementation group: 'org.slf4j', name: 'slf4j-simple', version: 
'2.0.9'
-    testImplementation 'org.apache.kafka:kafka-clients:3.5.1:test'
-    testImplementation 'org.apache.kafka:kafka_2.13:3.5.1'
-    testImplementation 'org.apache.kafka:kafka-streams:3.5.1'
-    testImplementation 'org.apache.kafka:kafka_2.13:3.5.1:test'
-    testImplementation 'org.apache.kafka:kafka-streams:3.5.1:test'
-    testImplementation 'org.apache.kafka:kafka-server-common:3.5.1:test'
+    testImplementation "org.apache.kafka:kafka-clients:${kafkaVersion}:test"
+    testImplementation "org.apache.kafka:kafka_2.13:${kafkaVersion}"
+    testImplementation "org.apache.kafka:kafka-streams:${kafkaVersion}"
+    testImplementation "org.apache.kafka:kafka_2.13:${kafkaVersion}:test"
+    testImplementation "org.apache.kafka:kafka-streams:${kafkaVersion}:test"
+    testImplementation 
"org.apache.kafka:kafka-server-common:${kafkaVersion}:test"
 
 }
 
diff --git a/gradle.properties b/gradle.properties
new file mode 100644
index 0000000..4e50947
--- /dev/null
+++ b/gradle.properties
@@ -0,0 +1,2 @@
+solrVersion=8.11.2
+kafkaVersion=3.5.1

Reply via email to