gerlowskija commented on code in PR #61:
URL: https://github.com/apache/solr-sandbox/pull/61#discussion_r1223044484
##########
crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java:
##########
@@ -127,58 +125,82 @@ private UpdateRequest createAndOrGetMirrorRequest() {
log.debug("processAdd isLeader={} cmd={}", isLeader, cmd);
}
- @Override public void processDelete(final DeleteUpdateCommand cmd) throws
IOException {
- if (doMirroring && !cmd.isDeleteById() && !"*:*".equals(cmd.query)) {
+ @Override
+ public void processDelete(final DeleteUpdateCommand cmd) throws IOException {
+ String dbqMethod = cmd.getReq().getParams().get("dbqMethod");
+ if (dbqMethod == null) {
+ dbqMethod = defaultDefaultDBQMethod;
+ }
- CloudDescriptor cloudDesc =
- cmd.getReq().getCore().getCoreDescriptor().getCloudDescriptor();
+ if (doMirroring && !cmd.isDeleteById() && !"*:*".equals(cmd.query)) {
+ CloudDescriptor cloudDesc =
cmd.getReq().getCore().getCoreDescriptor().getCloudDescriptor();
String collection = cloudDesc.getCollectionName();
-
HttpClient httpClient =
cmd.getReq().getCore().getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
- try (HttpSolrClient client =
- new
HttpSolrClient.Builder(cmd.getReq().getCore().getCoreContainer().getZkController().getBaseUrl()).withHttpClient(httpClient).build())
{
-
+ try (HttpSolrClient client = new
HttpSolrClient.Builder(cmd.getReq().getCore().getCoreContainer().getZkController().getBaseUrl()).withHttpClient(httpClient).build())
{
String uniqueField =
cmd.getReq().getSchema().getUniqueKeyField().getName();
- int rows = Integer.getInteger("solr.crossdc.dbq_rows", 1000);
- SolrQuery q = new
SolrQuery(cmd.query).setRows(rows).setSort(SolrQuery.SortClause.asc(uniqueField)).setFields(uniqueField);
- String cursorMark = CursorMarkParams.CURSOR_MARK_START;
-
- int cnt = 1;
- boolean done = false;
- while (!done) {
- q.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
- QueryResponse rsp =
- client.query(collection, q);
- String nextCursorMark = rsp.getNextCursorMark();
-
- if (log.isDebugEnabled()) {
- log.debug("resp: cm={}, ncm={}, cnt={}, results={} ", cursorMark,
nextCursorMark, cnt,
- rsp.getResults());
- cnt++;
+ if (dbqMethod == null || dbqMethod.equals("default")) {
+
+ int rows = Integer.getInteger("solr.crossdc.dbq_rows", 1000);
+ SolrQuery q = new
SolrQuery(cmd.query).setRows(rows).setSort(SolrQuery.SortClause.asc(uniqueField)).setFields(uniqueField);
+ String cursorMark = CursorMarkParams.CURSOR_MARK_START;
+
+ int cnt = 1;
+ boolean done = false;
+ while (!done) {
+ q.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
+ QueryResponse rsp =
+ client.query(collection, q);
+ String nextCursorMark = rsp.getNextCursorMark();
+
+ if (log.isDebugEnabled()) {
+ log.debug("resp: cm={}, ncm={}, cnt={}, results={} ",
cursorMark, nextCursorMark, cnt,
+ rsp.getResults());
+ cnt++;
+ }
+
+ processDBQResults(client, collection, uniqueField, rsp);
+ if (cursorMark.equals(nextCursorMark)) {
+ done = true;
+ }
+ cursorMark = nextCursorMark;
}
-
- processDBQResults(client, collection, uniqueField, rsp);
- if (cursorMark.equals(nextCursorMark)) {
- done = true;
+ } else if (dbqMethod.equals("convert_no_paging")) {
Review Comment:
[0] Hardly matters, but might be nice to have these constants be an enum so
the set of acceptable values is a little more explicit.
##########
crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java:
##########
@@ -127,58 +125,82 @@ private UpdateRequest createAndOrGetMirrorRequest() {
log.debug("processAdd isLeader={} cmd={}", isLeader, cmd);
}
- @Override public void processDelete(final DeleteUpdateCommand cmd) throws
IOException {
- if (doMirroring && !cmd.isDeleteById() && !"*:*".equals(cmd.query)) {
+ @Override
+ public void processDelete(final DeleteUpdateCommand cmd) throws IOException {
+ String dbqMethod = cmd.getReq().getParams().get("dbqMethod");
+ if (dbqMethod == null) {
+ dbqMethod = defaultDefaultDBQMethod;
+ }
- CloudDescriptor cloudDesc =
- cmd.getReq().getCore().getCoreDescriptor().getCloudDescriptor();
+ if (doMirroring && !cmd.isDeleteById() && !"*:*".equals(cmd.query)) {
+ CloudDescriptor cloudDesc =
cmd.getReq().getCore().getCoreDescriptor().getCloudDescriptor();
String collection = cloudDesc.getCollectionName();
-
HttpClient httpClient =
cmd.getReq().getCore().getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
- try (HttpSolrClient client =
- new
HttpSolrClient.Builder(cmd.getReq().getCore().getCoreContainer().getZkController().getBaseUrl()).withHttpClient(httpClient).build())
{
-
+ try (HttpSolrClient client = new
HttpSolrClient.Builder(cmd.getReq().getCore().getCoreContainer().getZkController().getBaseUrl()).withHttpClient(httpClient).build())
{
String uniqueField =
cmd.getReq().getSchema().getUniqueKeyField().getName();
- int rows = Integer.getInteger("solr.crossdc.dbq_rows", 1000);
- SolrQuery q = new
SolrQuery(cmd.query).setRows(rows).setSort(SolrQuery.SortClause.asc(uniqueField)).setFields(uniqueField);
- String cursorMark = CursorMarkParams.CURSOR_MARK_START;
-
- int cnt = 1;
- boolean done = false;
- while (!done) {
- q.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
- QueryResponse rsp =
- client.query(collection, q);
- String nextCursorMark = rsp.getNextCursorMark();
-
- if (log.isDebugEnabled()) {
- log.debug("resp: cm={}, ncm={}, cnt={}, results={} ", cursorMark,
nextCursorMark, cnt,
- rsp.getResults());
- cnt++;
+ if (dbqMethod == null || dbqMethod.equals("default")) {
+
+ int rows = Integer.getInteger("solr.crossdc.dbq_rows", 1000);
+ SolrQuery q = new
SolrQuery(cmd.query).setRows(rows).setSort(SolrQuery.SortClause.asc(uniqueField)).setFields(uniqueField);
+ String cursorMark = CursorMarkParams.CURSOR_MARK_START;
+
+ int cnt = 1;
+ boolean done = false;
+ while (!done) {
+ q.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
+ QueryResponse rsp =
+ client.query(collection, q);
+ String nextCursorMark = rsp.getNextCursorMark();
+
+ if (log.isDebugEnabled()) {
+ log.debug("resp: cm={}, ncm={}, cnt={}, results={} ",
cursorMark, nextCursorMark, cnt,
+ rsp.getResults());
+ cnt++;
+ }
+
+ processDBQResults(client, collection, uniqueField, rsp);
+ if (cursorMark.equals(nextCursorMark)) {
+ done = true;
+ }
+ cursorMark = nextCursorMark;
}
-
- processDBQResults(client, collection, uniqueField, rsp);
- if (cursorMark.equals(nextCursorMark)) {
- done = true;
+ } else if (dbqMethod.equals("convert_no_paging")) {
+ int rows = 10000;
+ SolrQuery q = new
SolrQuery(cmd.query).setRows(rows).setFields(uniqueField);
Review Comment:
[Q] I wonder if we could use a higher rows value if we fetched the documents
using the /export handler, rather than /select?
Or even better: a `delete(..., search(...))` streaming expression would use
the /export handler implicitly, and batch and send the delete-by-ID requests
for us. It looks like that `delete` streaming expression was built with this
specific use case in mind. (see
[here](https://issues.apache.org/jira/browse/SOLR-14241) and
[here](https://solr.apache.org/guide/8_7/stream-decorator-reference.html#delete)
)
##########
crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryIntegrationTest.java:
##########
@@ -0,0 +1,428 @@
+package org.apache.solr.crossdc;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.lucene.util.QuickPatchThreadsFilter;
+import org.apache.solr.SolrIgnoredThreadsFilter;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.consumer.Consumer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.solr.crossdc.common.KafkaCrossDcConf.DEFAULT_MAX_REQUEST_SIZE;
+
+@ThreadLeakFilters(defaultFilters = true, filters =
{SolrIgnoredThreadsFilter.class,
+ QuickPatchThreadsFilter.class,
SolrKafkaTestsIgnoredThreadsFilter.class})
+@ThreadLeakLingering(linger = 5000)
+public class DeleteByQueryIntegrationTest extends
+ SolrTestCaseJ4 {
+
+ private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final int MAX_MIRROR_BATCH_SIZE_BYTES =
Integer.valueOf(DEFAULT_MAX_REQUEST_SIZE);
+ private static final int MAX_DOC_SIZE_BYTES = MAX_MIRROR_BATCH_SIZE_BYTES;
+
+ static final String VERSION_FIELD = "_version_";
+
+ private static final int NUM_BROKERS = 1;
+ public static EmbeddedKafkaCluster kafkaCluster;
+
+ protected static volatile MiniSolrCloudCluster solrCluster1;
+ protected static volatile MiniSolrCloudCluster solrCluster2;
+
+ protected static volatile Consumer consumer = new Consumer();
+
+ private static String TOPIC = "topic1";
+
+ private static String COLLECTION = "collection1";
+ private static String ALT_COLLECTION = "collection2";
+
+ @BeforeClass
+ public static void beforeSolrAndKafkaIntegrationTest() throws Exception {
Review Comment:
[Q] Is there a reason this test class doesn't extend
`SolrAndKafkaIntegrationTest`? Maybe it differs in subtle ways, but from a
skim it seems like we're duplicating a lot of logic from that base class here?
##########
crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java:
##########
@@ -249,8 +249,10 @@ public UpdateRequestProcessor getInstance(final
SolrQueryRequest req, final Solr
log.trace("Create MirroringUpdateProcessor with
mirroredParams={}", mirroredParams);
}
+ String defaultDefaultDBQMethod =
conf.get(KafkaCrossDcConf.DEFAULT_DBQ_METHOD);
Review Comment:
[0] Dumb question, but should all these variable names really start with
"defaultDefault" instead of just "default"?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]