This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/main by this push:
new 0f6504b Add config & logic to control how updates are collapsed in
the Consumer. (#97)
0f6504b is described below
commit 0f6504befeabaf916702d67ee06f5942fb99fd56
Author: Andrzej BiaĆecki <[email protected]>
AuthorDate: Mon Dec 18 14:04:05 2023 +0100
Add config & logic to control how updates are collapsed in the Consumer.
(#97)
* Set PARTIAL as the default.
---
CROSSDC.md | 9 ++-
crossdc-commons/build.gradle | 8 +--
.../apache/solr/crossdc/common/CrossDcConf.java | 35 ++++++++++++
.../solr/crossdc/common/KafkaCrossDcConf.java | 6 ++
crossdc-consumer/build.gradle | 14 ++---
.../crossdc/consumer/KafkaCrossDcConsumer.java | 43 +++++++++++++-
.../crossdc/consumer/KafkaCrossDcConsumerTest.java | 65 +++++++++++++++++++++-
crossdc-producer/build.gradle | 18 +++---
gradle.properties | 2 +
9 files changed, 174 insertions(+), 26 deletions(-)
diff --git a/CROSSDC.md b/CROSSDC.md
index a4dc8d3..b17b4ff 100644
--- a/CROSSDC.md
+++ b/CROSSDC.md
@@ -93,7 +93,7 @@ Optional configuration properties:
- `maxRequestSizeBytes`: (integer) The maximum size of a Kafka queue request
in bytes - limits the number of requests that will be sent over the queue in a
single batch.
- `dqlTopicName`: (string) if not empty then requests that failed processing
`maxAttempts` times will be sent to a "dead letter queue" topic in Kafka (must
exist if configured).
- `mirrorCommits`: (boolean) if "true" then standalone commit requests will be
mirrored, otherwise they will be processed only locally.
-- `expandDbq`: (enum) if set to "expand" (default) then Delete-By-Query will
be expanded before mirroring into series of Delete-By-Id, which may help with
correct processing of out-of-order requests on the consumer side. If set to
"none" then Delete-By-Query requests will be mirrored as-is.
+- `expandDbq`: (enum) if set to `expand` (default) then Delete-By-Query will
be expanded before mirroring into series of Delete-By-Id, which may help with
correct processing of out-of-order requests on the consumer side. If set to
`none` then Delete-By-Query requests will be mirrored as-is.
#### CrossDC Consumer Application
@@ -115,7 +115,9 @@ The required configuration properties are:
Optional configuration properties:
- `consumerProcessingThreads`: The number of threads used by the consumer to
concurrently process updates from the Kafka queue.
-- `port`: local port for the API endpoints. Default is 8090.
+- `port`: local port for the API endpoints. Default is `8090`.
+- `collapseUpdates`: (enum) when set to `all` then all incoming update
requests (up to `maxCollapseRecords`) will be collapsed into a single
UpdateRequest, as long as their parameters are identical. When set to `partial`
(default) then only requests without deletions will be collapsed - requests
with any delete ops will be sent individually in order to preserve ordering of
updates. When set to `none` the incoming update requests will be sent
individually without any collapsing. NOTE: req [...]
+- `maxCollapseRecords`: maximum number of incoming update request to collapse
into a single outgoing request. Default is `500`.
Optional configuration properties used when the consumer must retry by putting
updates back on the Kafka queue:
- `batchSizeBytes`: maximum batch size in bytes for the Kafka queue
@@ -138,4 +140,5 @@ To make the Cross DC UpdateProcessor optional in a common
`solrconfig.xml`, use
## Limitations
-- When `expandDbq` property is set to `expand` (default) then Delete-By-Query
converts to a series of Delete-By-Id, which can be much less efficient for
queries matching large numbers of documents. Setting this property to `none`
results in forwarding a real Delete-By-Query - this reduces the amount of data
to mirror but may cause different results due to the potential re-ordering of
failed & re-submitted requests between Consumer and the target Solr.
\ No newline at end of file
+- When `expandDbq` property is set to `expand` (default) then Delete-By-Query
converts to a series of Delete-By-Id, which can be much less efficient for
queries matching large numbers of documents. Setting this property to `none`
results in forwarding a real Delete-By-Query - this reduces the amount of data
to mirror but may cause different results due to the potential re-ordering of
failed & re-submitted requests between Consumer and the target Solr.
+- When `collapseUpdates` is set to `all` then multiple requests containing a
mix of add and delete ops will be collapsed into a single outgoing request.
This will cause the original ordering of add / delete ops to be lost (because
Solr processing of an update request always processes all add ops first, and
only then the delete ops), which may affect the final outcome when some of the
ops refer to the same document ids.
\ No newline at end of file
diff --git a/crossdc-commons/build.gradle b/crossdc-commons/build.gradle
index b8172dc..4d603bc 100644
--- a/crossdc-commons/build.gradle
+++ b/crossdc-commons/build.gradle
@@ -34,20 +34,20 @@ sourceSets {
}
dependencies {
- provided 'org.apache.solr:solr-solrj:8.11.2'
- implementation 'org.apache.kafka:kafka-clients:3.5.1'
+ provided "org.apache.solr:solr-solrj:${solrVersion}"
+ implementation "org.apache.kafka:kafka-clients:${kafkaVersion}"
implementation 'com.google.guava:guava:14.0'
testImplementation 'org.slf4j:slf4j-api:2.0.5'
testImplementation 'org.hamcrest:hamcrest:2.2'
testImplementation 'junit:junit:4.13.2'
testImplementation('org.mockito:mockito-inline:5.2.0')
- testImplementation group: 'org.apache.solr', name: 'solr-core', version:
'8.11.2', {
+ testImplementation group: 'org.apache.solr', name: 'solr-core', version:
"${solrVersion}", {
exclude group: "org.eclipse.jetty", module: "jetty-http"
exclude group: "org.eclipse.jetty", module: "jetty-server"
exclude group: "org.eclipse.jetty", module: "jetty-servlet"
}
- testImplementation group: 'org.apache.solr', name: 'solr-test-framework',
version: '8.11.2'
+ testImplementation group: 'org.apache.solr', name: 'solr-test-framework',
version: "${solrVersion}"
}
jar.enabled = false
diff --git
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
index 4abe7a8..ca23847 100644
---
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
+++
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
@@ -24,9 +24,16 @@ public abstract class CrossDcConf {
public static final String CROSSDC_PROPERTIES = "/crossdc.properties";
public static final String ZK_CROSSDC_PROPS_PATH = "zkCrossDcPropsPath";
public static final String EXPAND_DBQ = "expandDbq";
+ public static final String COLLAPSE_UPDATES = "collapseUpdates";
+ public static final String MAX_COLLAPSE_RECORDS = "maxCollapseRecords";
+ /**
+ * Option to expand Delete-By-Query requests on the producer side.
+ */
public enum ExpandDbq {
+ /** Don't expand DBQs, mirror them as-is. */
NONE,
+ /** Expand DBQs into multiple Delete-By-Id requests using matching
documents on the producer side. */
EXPAND;
private static final Map<String, ExpandDbq> valueMap = new HashMap<>();
@@ -44,4 +51,32 @@ public abstract class CrossDcConf {
return value != null ? value : defaultValue;
}
}
+
+ /**
+ * Option to collapse multiple update requests in the Consumer application
before sending
+ * to the target Solr.
+ */
+ public enum CollapseUpdates {
+ /** Don't collapse any update requests, send them directly as-is. */
+ NONE,
+ /** Collapse only update requests that don't contain any delete
operations. */
+ PARTIAL,
+ /** Always collapse update requests, as long as they have the same
parameters. */
+ ALL;
+
+ private static final Map<String, CollapseUpdates> valueMap = new
HashMap<>();
+ static {
+ for (CollapseUpdates value : values()) {
+ valueMap.put(value.name().toUpperCase(Locale.ROOT), value);
+ }
+ }
+
+ public static CollapseUpdates getOrDefault(String strValue,
CollapseUpdates defaultValue) {
+ if (strValue == null || strValue.isBlank()) {
+ return defaultValue;
+ }
+ CollapseUpdates value =
valueMap.get(strValue.toUpperCase(Locale.ROOT));
+ return value != null ? value : defaultValue;
+ }
+ }
}
diff --git
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
index f94166e..9e6d45d 100644
---
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
+++
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -65,6 +65,10 @@ public class KafkaCrossDcConf extends CrossDcConf {
private static final String DEFAULT_EXPAND_DBQ = ExpandDbq.EXPAND.name();
+ private static final String DEFAULT_COLLAPSE_UPDATES =
CollapseUpdates.PARTIAL.name();
+
+ private static final String DEFAULT_MAX_COLLAPSE_RECORDS = "500";
+
public static final String TOPIC_NAME = "topicName";
public static final String DLQ_TOPIC_NAME = "dlqTopicName";
@@ -164,6 +168,8 @@ public class KafkaCrossDcConf extends CrossDcConf {
new ConfigProperty(MIRROR_COLLECTIONS, DEFAULT_MIRROR_COLLECTIONS),
new ConfigProperty(MIRROR_COMMITS, DEFAULT_MIRROR_COMMITS),
new ConfigProperty(EXPAND_DBQ, DEFAULT_EXPAND_DBQ),
+ new ConfigProperty(COLLAPSE_UPDATES, DEFAULT_COLLAPSE_UPDATES),
+ new ConfigProperty(MAX_COLLAPSE_RECORDS,
DEFAULT_MAX_COLLAPSE_RECORDS),
new ConfigProperty(MAX_PARTITION_FETCH_BYTES,
DEFAULT_MAX_PARTITION_FETCH_BYTES),
new ConfigProperty(MAX_POLL_RECORDS, DEFAULT_MAX_POLL_RECORDS),
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/build.gradle
index 6cdbc04..f4109cf 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/build.gradle
@@ -29,7 +29,7 @@ application {
}
dependencies {
- implementation group: 'org.apache.solr', name: 'solr-solrj', version:
'8.11.2'
+ implementation group: 'org.apache.solr', name: 'solr-solrj', version:
"${solrVersion}"
implementation project(path: ':crossdc-commons', configuration: 'shadow')
implementation 'io.dropwizard.metrics:metrics-core:4.2.17'
@@ -50,21 +50,21 @@ dependencies {
testImplementation project(':crossdc-producer')
- testImplementation group: 'org.apache.solr', name: 'solr-core', version:
'8.11.2', {
+ testImplementation group: 'org.apache.solr', name: 'solr-core', version:
"${solrVersion}", {
exclude group: "org.apache.logging.log4j", module: "*"
exclude group: "org.slf4j", module: "*"
exclude group: "org.eclipse.jetty", module: "jetty-http"
exclude group: "org.eclipse.jetty", module: "jetty-server"
exclude group: "org.eclipse.jetty", module: "jetty-servlet"
}
- testImplementation group: 'org.apache.solr', name: 'solr-test-framework',
version: '8.11.2', {
+ testImplementation group: 'org.apache.solr', name: 'solr-test-framework',
version: "${solrVersion}", {
exclude group: "org.apache.logging.log4j", module: "*"
exclude group: "org.slf4j", module: "*"
}
- implementation 'org.apache.kafka:kafka_2.13:3.5.1'
- implementation 'org.apache.kafka:kafka-streams:3.5.1'
- testImplementation 'org.apache.kafka:kafka_2.13:3.5.1:test'
- testImplementation 'org.apache.kafka:kafka-streams:3.5.1:test'
+ implementation "org.apache.kafka:kafka_2.13:${kafkaVersion}"
+ implementation "org.apache.kafka:kafka-streams:${kafkaVersion}"
+ testImplementation "org.apache.kafka:kafka_2.13:${kafkaVersion}:test"
+ testImplementation "org.apache.kafka:kafka-streams:${kafkaVersion}:test"
}
test {
diff --git
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index c226a45..d5e8bf8 100644
---
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -46,6 +46,8 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
private final static int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 5000;
private final String[] topicNames;
private final int maxAttempts;
+ private final CrossDcConf.CollapseUpdates collapseUpdates;
+ private final int maxCollapseRecords;
private final SolrMessageProcessor messageProcessor;
protected final CloudSolrClient solrClient;
@@ -71,6 +73,8 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
this.topicNames = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",");
this.maxAttempts = conf.getInt(KafkaCrossDcConf.MAX_ATTEMPTS);
+ this.collapseUpdates =
CrossDcConf.CollapseUpdates.getOrDefault(conf.get(CrossDcConf.COLLAPSE_UPDATES),
CrossDcConf.CollapseUpdates.PARTIAL);
+ this.maxCollapseRecords =
conf.getInt(KafkaCrossDcConf.MAX_COLLAPSE_RECORDS);
this.startLatch = startLatch;
final Properties kafkaConsumerProps = new Properties();
@@ -193,6 +197,7 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
}
UpdateRequest updateReqBatch = null;
+ int currentCollapsed = 0;
ConsumerRecord<String,MirroredSolrRequest> lastRecord = null;
@@ -222,10 +227,28 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
log.trace("-- picked type={}, params={}", req.getType(), params);
}
+ // determine if it's an UPDATE with deletes, or if the existing
batch has deletes
+ boolean hasDeletes = false;
+ if (type == MirroredSolrRequest.Type.UPDATE) {
+ UpdateRequest ureq = (UpdateRequest) solrReq;
+ hasDeletes = hasDeletes(ureq) || hasDeletes(updateReqBatch);
+ }
+
// it's an update but with different params
- if (type == MirroredSolrRequest.Type.UPDATE && lastUpdateParams !=
null && !lastUpdateParams.toNamedList().equals(params.toNamedList())) {
+ if (type == MirroredSolrRequest.Type.UPDATE &&
+ (
+ // different params
+ (lastUpdateParams != null &&
!lastUpdateParams.toNamedList().equals(params.toNamedList())) ||
+ // no collapsing
+ (collapseUpdates == CrossDcConf.CollapseUpdates.NONE) ||
+ // partial collapsing but has deletes
+ (collapseUpdates == CrossDcConf.CollapseUpdates.PARTIAL &&
hasDeletes) ||
+ // too many collapsed - emit
+ currentCollapsed >= maxCollapseRecords
+ )
+ ) {
if (log.isTraceEnabled()) {
- log.trace("SolrParams have changed, starting new
UpdateRequest, params={}", params);
+ log.trace("Starting new UpdateRequest, params={}", params);
}
// send previous batch, if any
if (updateReqBatch != null) {
@@ -233,6 +256,7 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
}
updateReqBatch = null;
lastUpdateParamsAsNamedList = null;
+ currentCollapsed = 0;
workUnit = new PartitionManager.WorkUnit(partition);
workUnit.nextOffset =
PartitionManager.getOffsetForPartition(partitionRecords);
partitionWork.partitionQueue.add(workUnit);
@@ -244,7 +268,14 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
// just initialize
updateReqBatch = new UpdateRequest();
} else {
+ if (collapseUpdates == CrossDcConf.CollapseUpdates.NONE) {
+ throw new RuntimeException("Can't collapse requests.");
+ }
+ if (collapseUpdates == CrossDcConf.CollapseUpdates.PARTIAL &&
hasDeletes) {
+ throw new RuntimeException("Can't collapse requests with
deletions.");
+ }
metrics.counter(MetricRegistry.name(type.name(),
"collapsed")).inc();
+ currentCollapsed++;
}
UpdateRequest update = (UpdateRequest) solrReq;
MirroredSolrRequest.setParams(updateReqBatch, params);
@@ -327,6 +358,14 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
return true;
}
+ private boolean hasDeletes(UpdateRequest ureq) {
+ if (ureq == null) {
+ return false;
+ }
+ return (ureq.getDeleteByIdMap() != null &&
!ureq.getDeleteByIdMap().isEmpty()) ||
+ (ureq.getDeleteQuery() != null && !ureq.getDeleteQuery().isEmpty());
+ }
+
public void sendBatch(SolrRequest solrReqBatch, MirroredSolrRequest.Type
type, ConsumerRecord<String, MirroredSolrRequest> lastRecord,
PartitionManager.WorkUnit workUnit) {
SolrRequest finalSolrReqBatch = solrReqBatch;
// Kafka client is not thread-safe !!!
diff --git
a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
index 77632f6..6686b67 100644
---
a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
+++
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
@@ -12,6 +12,7 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.Utils;
+import org.apache.solr.crossdc.common.CrossDcConf;
import org.apache.solr.crossdc.common.IQueueHandler;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.KafkaMirroringSink;
@@ -76,10 +77,15 @@ public class KafkaCrossDcConsumerTest {
};
}
- private static KafkaCrossDcConf testCrossDCConf() {
+ private static KafkaCrossDcConf testCrossDCConf(String... keyValues) {
Map config = new HashMap<>();
config.put(KafkaCrossDcConf.TOPIC_NAME, "topic1");
config.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "localhost:9092");
+ if (keyValues != null) {
+ for (int i = 0; i < keyValues.length; i += 2) {
+ config.put(keyValues[i], keyValues[i + 1]);
+ }
+ }
return new KafkaCrossDcConf(config);
}
@@ -260,6 +266,63 @@ public class KafkaCrossDcConsumerTest {
verify(spyConsumer, times(1)).sendBatch(any(),
eq(MirroredSolrRequest.Type.UPDATE), eq(record2), any());
}
+ @Test
+ public void testCollapseUpdatesNONE() {
+ int NUM_REQS = 100;
+ doTestCollapseUpdates(CrossDcConf.CollapseUpdates.NONE, 500, NUM_REQS,
NUM_REQS, 0);
+ }
+
+ @Test
+ public void testCollapseUpdatesALL() {
+ int NUM_REQS = 100;
+ doTestCollapseUpdates(CrossDcConf.CollapseUpdates.ALL, 500, NUM_REQS,
1, 5);
+ }
+
+ @Test
+ public void testCollapseUpdatesALLMaxCollapse() {
+ int NUM_REQS = 100;
+ doTestCollapseUpdates(CrossDcConf.CollapseUpdates.ALL, 50, NUM_REQS,
2, 5);
+ }
+
+ @Test
+ public void testCollapseUpdatesPARTIAL() {
+ int NUM_REQS = 100;
+ doTestCollapseUpdates(CrossDcConf.CollapseUpdates.PARTIAL, 500,
NUM_REQS, 10, 5);
+ }
+
+ private void doTestCollapseUpdates(CrossDcConf.CollapseUpdates
collapseUpdates, int maxCollapseRecords, int inputReqs, int outputReqs, int
reqsWithDeletes) {
+ KafkaConsumer<String, MirroredSolrRequest> mockConsumer =
mock(KafkaConsumer.class);
+ // override
+ conf = testCrossDCConf(
+ CrossDcConf.COLLAPSE_UPDATES, collapseUpdates.name(),
+ CrossDcConf.MAX_COLLAPSE_RECORDS,
String.valueOf(maxCollapseRecords));
+ KafkaCrossDcConsumer spyConsumer =
createCrossDcConsumerSpy(mockConsumer);
+ doReturn(new
IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED)).when(messageProcessorMock).handleItem(any());
+ List<ConsumerRecord<String, MirroredSolrRequest>> records = new
ArrayList<>();
+ for (int i = 0; i < inputReqs; i++) {
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", "id-" + i);
+ UpdateRequest validRequest = new UpdateRequest();
+ validRequest.add(doc);
+ if ((i % 3) == 0 && reqsWithDeletes > 0) {
+ validRequest.deleteById("fakeId-" + i);
+ reqsWithDeletes--;
+ }
+ // Create a valid MirroredSolrRequest
+ ConsumerRecord<String, MirroredSolrRequest> record = new
ConsumerRecord<>("test-topic", 0, 0, "key", new
MirroredSolrRequest(validRequest));
+ records.add(record);
+ }
+ ConsumerRecords<String, MirroredSolrRequest> consumerRecords = new
ConsumerRecords<>(Collections.singletonMap(new TopicPartition("test-topic", 0),
records));
+
+
when(mockConsumer.poll(any())).thenReturn(consumerRecords).thenThrow(new
WakeupException());
+
+ spyConsumer.run();
+
+ // Verify that the valid MirroredSolrRequest was processed.
+ verify(spyConsumer, times(outputReqs)).sendBatch(any(),
eq(MirroredSolrRequest.Type.UPDATE), any(), any());
+ }
+
+
@Test
public void testHandleInvalidMirroredSolrRequest() {
KafkaConsumer<String, MirroredSolrRequest> mockConsumer =
mock(KafkaConsumer.class);
diff --git a/crossdc-producer/build.gradle b/crossdc-producer/build.gradle
index 3371fc5..87b2a06 100644
--- a/crossdc-producer/build.gradle
+++ b/crossdc-producer/build.gradle
@@ -37,21 +37,21 @@ dependencies {
implementation project(':crossdc-consumer')
implementation project(path: ':crossdc-commons', configuration: 'shadow')
- provided group: 'org.apache.solr', name: 'solr-core', version: '8.11.2'
+ provided group: 'org.apache.solr', name: 'solr-core', version:
"${solrVersion}"
testImplementation 'org.hamcrest:hamcrest:2.2'
testImplementation 'junit:junit:4.13.2'
testImplementation('org.mockito:mockito-inline:5.1.1')
- testImplementation group: 'org.apache.solr', name: 'solr-core', version:
'8.11.2'
- testImplementation group: 'org.apache.solr', name: 'solr-test-framework',
version: '8.11.2'
+ testImplementation group: 'org.apache.solr', name: 'solr-core', version:
"${solrVersion}"
+ testImplementation group: 'org.apache.solr', name: 'solr-test-framework',
version: "${solrVersion}"
testImplementation group: 'org.slf4j', name: 'slf4j-simple', version:
'2.0.9'
- testImplementation 'org.apache.kafka:kafka-clients:3.5.1:test'
- testImplementation 'org.apache.kafka:kafka_2.13:3.5.1'
- testImplementation 'org.apache.kafka:kafka-streams:3.5.1'
- testImplementation 'org.apache.kafka:kafka_2.13:3.5.1:test'
- testImplementation 'org.apache.kafka:kafka-streams:3.5.1:test'
- testImplementation 'org.apache.kafka:kafka-server-common:3.5.1:test'
+ testImplementation "org.apache.kafka:kafka-clients:${kafkaVersion}:test"
+ testImplementation "org.apache.kafka:kafka_2.13:${kafkaVersion}"
+ testImplementation "org.apache.kafka:kafka-streams:${kafkaVersion}"
+ testImplementation "org.apache.kafka:kafka_2.13:${kafkaVersion}:test"
+ testImplementation "org.apache.kafka:kafka-streams:${kafkaVersion}:test"
+ testImplementation
"org.apache.kafka:kafka-server-common:${kafkaVersion}:test"
}
diff --git a/gradle.properties b/gradle.properties
new file mode 100644
index 0000000..4e50947
--- /dev/null
+++ b/gradle.properties
@@ -0,0 +1,2 @@
+solrVersion=8.11.2
+kafkaVersion=3.5.1