nfsantos commented on code in PR #1193:
URL: https://github.com/apache/jackrabbit-oak/pull/1193#discussion_r1387961987


##########
oak-run-elastic/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexerTest.java:
##########
@@ -34,16 +38,20 @@
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.junit.Test;
 
+import java.io.OutputStream;
+
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 
 import static 
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 import static org.apache.jackrabbit.oak.InitialContentHelper.INITIAL_CONTENT;
+import static org.mockito.Mockito.when;
 
 public class ElasticIndexerTest {
 
-    private NodeState root = INITIAL_CONTENT;
+    private final NodeState root = INITIAL_CONTENT;

Review Comment:
   Can be `static` as well.



##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java:
##########
@@ -205,39 +205,33 @@ public boolean close() throws IOException {
         return updatesMap.containsValue(Boolean.TRUE);
     }
 
-    private class OakBulkProcessorListener implements BulkProcessor.Listener {
+    private class OAKBulkListener implements BulkListener<String> {
 
         @Override
-        public void beforeBulk(long executionId, BulkRequest bulkRequest) {
+        public void beforeBulk(long executionId, BulkRequest request, 
List<String> contexts) {
             // register new bulk party
             phaser.register();
 
             // init update status
             updatesMap.put(executionId, Boolean.FALSE);
 
-            bulkRequest.timeout(TimeValue.timeValueMinutes(2));
-
-            LOG.debug("Sending bulk with id {} -> {}", executionId, 
bulkRequest.getDescription());
+            LOG.debug("Sending bulk with id {} -> {}", executionId, contexts);
             if (LOG.isTraceEnabled()) {
-                LOG.trace("Bulk Requests: \n{}", bulkRequest.requests()
+                LOG.trace("Bulk Requests: \n{}", request.operations()
                         .stream()
-                        .map(DocWriteRequest::toString)
+                        .map(BulkOperation::toString)
                         .collect(Collectors.joining("\n"))
                 );
             }
         }
 
         @Override
-        public void afterBulk(long executionId, BulkRequest bulkRequest, 
BulkResponse bulkResponse) {
-            LOG.debug("Bulk with id {} processed with status {} in {}", 
executionId, bulkResponse.status(), bulkResponse.getTook());
+        public void afterBulk(long executionId, BulkRequest request, 
List<String> contexts, co.elastic.clients.elasticsearch.core.BulkResponse 
response) {
+            LOG.debug("Bulk with id {} processed in {}", executionId, 
response.took());

Review Comment:
   specify units for time. seconds? Milliseconds?



##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java:
##########
@@ -286,23 +280,31 @@ public void afterBulk(long executionId, BulkRequest 
bulkRequest, BulkResponse bu
         }
 
         @Override
-        public void afterBulk(long executionId, BulkRequest bulkRequest, 
Throwable throwable) {
-            LOG.error("ElasticIndex Update Bulk Failure : Bulk with id {} 
threw an error", executionId, throwable);
-            suppressedExceptions.add(throwable);
+        public void afterBulk(long executionId, BulkRequest request, 
List<String> contexts, Throwable failure) {
+            LOG.error("ElasticIndex Update Bulk Failure : Bulk with id {} 
threw an error", executionId, failure);
+            suppressedErrorCauses.add(ErrorCause.of(ec -> {
+                    StringWriter sw = new StringWriter();
+                    PrintWriter pw = new PrintWriter(sw);
+                    failure.printStackTrace(pw);
+                    return 
ec.reason(failure.getMessage()).stackTrace(sw.toString());
+            }));
             phaser.arriveAndDeregister();

Review Comment:
   Shouldn't this call be on a finally block so it is executed even if the 
callback method fails with an exception? Same comment for the other `afterBulk` 
method.



##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java:
##########
@@ -286,23 +280,31 @@ public void afterBulk(long executionId, BulkRequest 
bulkRequest, BulkResponse bu
         }
 
         @Override
-        public void afterBulk(long executionId, BulkRequest bulkRequest, 
Throwable throwable) {
-            LOG.error("ElasticIndex Update Bulk Failure : Bulk with id {} 
threw an error", executionId, throwable);
-            suppressedExceptions.add(throwable);
+        public void afterBulk(long executionId, BulkRequest request, 
List<String> contexts, Throwable failure) {

Review Comment:
   If the methods in this listener fail with an exception, is the exception 
logged? This depends on the logic of the Elastic client, but it's important to 
know if Elastic will print the exception or if it suppresses it. 



##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java:
##########
@@ -205,39 +205,33 @@ public boolean close() throws IOException {
         return updatesMap.containsValue(Boolean.TRUE);
     }
 
-    private class OakBulkProcessorListener implements BulkProcessor.Listener {
+    private class OAKBulkListener implements BulkListener<String> {
 
         @Override
-        public void beforeBulk(long executionId, BulkRequest bulkRequest) {
+        public void beforeBulk(long executionId, BulkRequest request, 
List<String> contexts) {
             // register new bulk party
             phaser.register();
 
             // init update status
             updatesMap.put(executionId, Boolean.FALSE);
 
-            bulkRequest.timeout(TimeValue.timeValueMinutes(2));
-
-            LOG.debug("Sending bulk with id {} -> {}", executionId, 
bulkRequest.getDescription());
+            LOG.debug("Sending bulk with id {} -> {}", executionId, contexts);
             if (LOG.isTraceEnabled()) {
-                LOG.trace("Bulk Requests: \n{}", bulkRequest.requests()
+                LOG.trace("Bulk Requests: \n{}", request.operations()
                         .stream()
-                        .map(DocWriteRequest::toString)
+                        .map(BulkOperation::toString)
                         .collect(Collectors.joining("\n"))
                 );
             }
         }
 
         @Override
-        public void afterBulk(long executionId, BulkRequest bulkRequest, 
BulkResponse bulkResponse) {
-            LOG.debug("Bulk with id {} processed with status {} in {}", 
executionId, bulkResponse.status(), bulkResponse.getTook());
+        public void afterBulk(long executionId, BulkRequest request, 
List<String> contexts, co.elastic.clients.elasticsearch.core.BulkResponse 
response) {

Review Comment:
   Why the full package name for `BulkResponse`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to