rzo1 commented on code in PR #1869:
URL: https://github.com/apache/stormcrawler/pull/1869#discussion_r3040769321


##########
external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.stormcrawler.opensearch;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import org.apache.storm.metric.api.MultiCountMetric;
+import org.apache.storm.tuple.Tuple;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.opensearch.action.DocWriteRequest;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkRequest;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.action.delete.DeleteRequest;
+import org.opensearch.action.index.IndexResponse;
+import org.opensearch.core.index.shard.ShardId;
+import org.opensearch.core.rest.RestStatus;
+import org.slf4j.LoggerFactory;
+
+class WaitAckCacheTest {
+
+    private WaitAckCache cache;
+    private List<Tuple> evicted;
+    private List<Tuple> acked;
+    private List<Tuple> failed;
+
+    @BeforeEach
+    void setUp() {
+        evicted = new CopyOnWriteArrayList<>();
+        acked = new ArrayList<>();
+        failed = new ArrayList<>();
+        cache = new 
WaitAckCache(LoggerFactory.getLogger(WaitAckCacheTest.class), evicted::add);
+    }
+
+    private Tuple mockTuple(String url) {
+        Tuple t = mock(Tuple.class);
+        when(t.getValueByField("url")).thenReturn(url);
+        when(t.getStringByField("url")).thenReturn(url);
+        return t;
+    }
+
+    private static ShardId shardId() {
+        return new ShardId("index", "_na_", 0);
+    }
+
+    private static BulkItemResponse successItem(int itemId, String docId) {
+        IndexResponse indexResponse = new IndexResponse(shardId(), docId, 1, 
1, 1, true);
+        return new BulkItemResponse(itemId, DocWriteRequest.OpType.INDEX, 
indexResponse);
+    }
+
+    private static BulkItemResponse failedItem(int itemId, String docId, 
RestStatus status) {
+        BulkItemResponse.Failure failure =
+                new BulkItemResponse.Failure("index", docId, new 
Exception("test failure"), status);
+        return new BulkItemResponse(itemId, DocWriteRequest.OpType.INDEX, 
failure);
+    }
+
+    private static BulkResponse bulkResponse(BulkItemResponse... items) {
+        return new BulkResponse(items, 10L);
+    }
+
+    @Test
+    void addAndContains() {
+        Tuple t = mockTuple("http://example.com";);
+        assertFalse(cache.contains("doc1"));
+
+        cache.addTuple("doc1", t);
+        assertTrue(cache.contains("doc1"));
+        assertEquals(1, cache.estimatedSize());
+    }
+
+    @Test
+    void invalidateRemovesEntry() {
+        Tuple t = mockTuple("http://example.com";);
+        cache.addTuple("doc1", t);
+        assertTrue(cache.contains("doc1"));
+
+        cache.invalidate("doc1");
+        assertFalse(cache.contains("doc1"));
+    }
+
+    @Test
+    void processBulkResponse_successfulItem_ackedViaTupleAction() {
+        Tuple t = mockTuple("http://example.com";);
+        cache.addTuple("doc1", t);
+
+        BulkResponse response = bulkResponse(successItem(0, "doc1"));
+
+        cache.processBulkResponse(
+                response,
+                1L,
+                null,
+                (id, tuple, selected) -> {
+                    if (!selected.failed) {
+                        acked.add(tuple);
+                    } else {
+                        failed.add(tuple);
+                    }
+                });
+
+        assertEquals(1, acked.size());
+        assertEquals(0, failed.size());
+        assertSame(t, acked.get(0));
+        assertFalse(cache.contains("doc1"));
+    }
+
+    @Test
+    void processBulkResponse_failedItem_failedViaTupleAction() {
+        Tuple t = mockTuple("http://example.com";);
+        cache.addTuple("doc1", t);
+
+        BulkResponse response =
+                bulkResponse(failedItem(0, "doc1", 
RestStatus.INTERNAL_SERVER_ERROR));
+
+        cache.processBulkResponse(
+                response,
+                1L,
+                null,
+                (id, tuple, selected) -> {
+                    if (!selected.failed) {
+                        acked.add(tuple);
+                    } else {
+                        failed.add(tuple);
+                    }
+                });
+
+        assertEquals(0, acked.size());
+        assertEquals(1, failed.size());
+        assertSame(t, failed.get(0));
+    }
+
+    @Test
+    void processBulkResponse_conflictIsNotAFailure() {
+        Tuple t = mockTuple("http://example.com";);
+        cache.addTuple("doc1", t);
+
+        MultiCountMetric counter = new MultiCountMetric();
+        BulkResponse response = bulkResponse(failedItem(0, "doc1", 
RestStatus.CONFLICT));
+
+        cache.processBulkResponse(
+                response,
+                1L,
+                counter,
+                (id, tuple, selected) -> {
+                    if (!selected.failed) {
+                        acked.add(tuple);
+                    } else {
+                        failed.add(tuple);
+                    }
+                });
+
+        assertEquals(1, acked.size());
+        assertEquals(0, failed.size());
+    }
+
+    @Test
+    void processBulkResponse_multipleTuplesForSameDocId() {
+        Tuple t1 = mockTuple("http://example.com/1";);
+        Tuple t2 = mockTuple("http://example.com/2";);
+        cache.addTuple("doc1", t1);
+        cache.addTuple("doc1", t2);
+
+        BulkResponse response = bulkResponse(successItem(0, "doc1"));
+
+        cache.processBulkResponse(response, 1L, null, (id, tuple, selected) -> 
acked.add(tuple));
+
+        assertEquals(2, acked.size());
+        assertTrue(acked.contains(t1));
+        assertTrue(acked.contains(t2));
+    }
+
+    @Test
+    void processBulkResponse_duplicateDocIdInBulk_prefersSuccess() {
+        // https://github.com/apache/stormcrawler/issues/832
+        Tuple t = mockTuple("http://example.com";);
+        cache.addTuple("doc1", t);
+
+        BulkResponse response =
+                bulkResponse(
+                        failedItem(0, "doc1", 
RestStatus.INTERNAL_SERVER_ERROR),
+                        successItem(1, "doc1"));
+
+        cache.processBulkResponse(
+                response,
+                1L,
+                null,
+                (id, tuple, selected) -> {
+                    if (!selected.failed) {
+                        acked.add(tuple);
+                    } else {
+                        failed.add(tuple);
+                    }
+                });
+
+        assertEquals(1, acked.size());
+        assertEquals(0, failed.size());
+    }
+
+    @Test
+    void processFailedBulk_failsAllMatchingTuples() {
+        Tuple t1 = mockTuple("http://example.com/1";);
+        Tuple t2 = mockTuple("http://example.com/2";);
+        cache.addTuple("doc1", t1);
+        cache.addTuple("doc2", t2);
+
+        BulkRequest request = new BulkRequest();
+        request.add(new DeleteRequest("index", "doc1"));
+        request.add(new DeleteRequest("index", "doc2"));
+
+        cache.processFailedBulk(request, 1L, new Exception("connection lost"), 
failed::add);
+
+        assertEquals(2, failed.size());
+        assertTrue(failed.contains(t1));
+        assertTrue(failed.contains(t2));
+        assertFalse(cache.contains("doc1"));
+        assertFalse(cache.contains("doc2"));
+    }
+
+    @Test
+    void processFailedBulk_ignoresMissingIds() {
+        Tuple t = mockTuple("http://example.com";);
+        cache.addTuple("doc1", t);
+
+        BulkRequest request = new BulkRequest();
+        request.add(new DeleteRequest("index", "doc_unknown"));
+
+        cache.processFailedBulk(request, 1L, new Exception("test"), 
failed::add);
+
+        assertEquals(0, failed.size());
+        // doc1 should still be in cache since it wasn't in the failed request
+        assertTrue(cache.contains("doc1"));
+    }
+
+    @Test
+    void eviction_failsTuplesOnExpiry() {
+        cache =
+                new WaitAckCache(
+                        "expireAfterWrite=1s",
+                        LoggerFactory.getLogger(WaitAckCacheTest.class),
+                        evicted::add);
+        Tuple t = mockTuple("http://example.com";);
+        cache.addTuple("doc1", t);
+
+        // Force cache maintenance after expiry by doing a contains() check
+        // which accesses the cache and triggers Caffeine's cleanup
+        await().atMost(5, TimeUnit.SECONDS)

Review Comment:
   Maybe inject a Ticker (via Caffeine.newBuilder().ticker(...)) so tests can 
advance time deterministically without wall-clock waits?



##########
external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/WaitAckCache.java:
##########
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.stormcrawler.opensearch;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.storm.metric.api.MultiCountMetric;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+import org.jetbrains.annotations.Nullable;
+import org.opensearch.action.DocWriteRequest;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkRequest;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.core.rest.RestStatus;
+import org.slf4j.Logger;
+
+/**
+ * Thread-safe cache that tracks in-flight tuples awaiting bulk acknowledgment 
from OpenSearch.
+ * Provides shared logic for processing bulk responses and failing tuples on 
error, used by
+ * IndexerBolt, DeletionBolt, and StatusUpdaterBolt.
+ */
+public class WaitAckCache {

Review Comment:
   WaitAckCache has no cleanUp() / invalidateAll() method, i.e. none of the 
bolts can call this on shutdown, but it means any tuples still in the cache at 
topology teardown won't be explicitly failed; they'll just be abandoned. The 
old code had the same gap, so no regression, but might be worth to fix?



##########
external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/WaitAckCache.java:
##########
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.stormcrawler.opensearch;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.storm.metric.api.MultiCountMetric;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+import org.jetbrains.annotations.Nullable;
+import org.opensearch.action.DocWriteRequest;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkRequest;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.core.rest.RestStatus;
+import org.slf4j.Logger;
+
+/**
+ * Thread-safe cache that tracks in-flight tuples awaiting bulk acknowledgment 
from OpenSearch.
+ * Provides shared logic for processing bulk responses and failing tuples on 
error, used by
+ * IndexerBolt, DeletionBolt, and StatusUpdaterBolt.
+ */
+public class WaitAckCache {
+
+    /** Callback invoked for each tuple when processing a successful bulk 
response. */
+    @FunctionalInterface
+    public interface TupleAction {
+        void handle(String id, Tuple tuple, BulkItemResponseToFailedFlag 
selected);
+    }
+
+    private final Cache<String, List<Tuple>> cache;
+    private final java.util.concurrent.locks.ReentrantLock lock =
+            new java.util.concurrent.locks.ReentrantLock(true);
+    private final Logger log;
+
+    /** Creates a cache with a fixed 60-second expiry. */
+    public WaitAckCache(Logger log, Consumer<Tuple> onEviction) {
+        this(Caffeine.newBuilder().expireAfterWrite(60, TimeUnit.SECONDS), 
log, onEviction);
+    }
+
+    /** Creates a cache from a Caffeine spec string (e.g. 
"expireAfterWrite=300s"). */
+    public WaitAckCache(String cacheSpec, Logger log, Consumer<Tuple> 
onEviction) {
+        this(Caffeine.from(cacheSpec), log, onEviction);
+    }
+
+    private WaitAckCache(Caffeine<Object, Object> builder, Logger log, 
Consumer<Tuple> onEviction) {
+        this.log = log;
+        this.cache =
+                builder.<String, List<Tuple>>removalListener(
+                                (String key, List<Tuple> value, RemovalCause 
cause) -> {
+                                    if (!cause.wasEvicted()) {
+                                        return;
+                                    }
+                                    if (value != null) {
+                                        log.error(
+                                                "Purged from waitAck {} with 
{} values",
+                                                key,
+                                                value.size());
+                                        for (Tuple t : value) {
+                                            onEviction.accept(t);
+                                        }
+                                    } else {
+                                        log.error("Purged from waitAck {} with 
no values", key);
+                                    }
+                                })
+                        .build();
+    }
+
+    /** Registers a gauge metric that reports the estimated cache size. */
+    public void registerMetric(TopologyContext context, String name, int 
timeBucketSecs) {
+        context.registerMetric(name, () -> cache.estimatedSize(), 
timeBucketSecs);
+    }
+
+    public long estimatedSize() {
+        return cache.estimatedSize();
+    }
+
+    /** Adds a tuple to the cache under the given document ID, creating the 
list if needed. */
+    public void addTuple(String docID, Tuple tuple) {
+        lock.lock();
+        try {
+            List<Tuple> tt = cache.get(docID, k -> new LinkedList<>());
+            tt.add(tuple);
+            if (log.isDebugEnabled()) {
+                String url = (String) tuple.getValueByField("url");
+                log.debug("Added to waitAck {} with ID {} total {}", url, 
docID, tt.size());
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /** Returns true if the cache contains an entry for the given document ID. 
*/
+    public boolean contains(String docID) {
+        lock.lock();
+        try {
+            return cache.getIfPresent(docID) != null;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /** Invalidates a single cache entry. */
+    public void invalidate(String docID) {
+        lock.lock();
+        try {
+            cache.invalidate(docID);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Processes a successful bulk response: classifies each item (conflict vs 
failure), retrieves
+     * cached tuples, selects the best response per document ID, and invokes 
the action for each
+     * tuple.
+     *
+     * @param conflictCounter optional metric counter; if non-null, increments 
"doc_conflicts" scope
+     *     for each conflict
+     */
+    public void processBulkResponse(
+            BulkResponse response,
+            long executionId,
+            @Nullable MultiCountMetric conflictCounter,
+            TupleAction action) {
+
+        var idsToBulkItems =
+                Arrays.stream(response.getItems())
+                        .map(
+                                bir -> {
+                                    BulkItemResponse.Failure f = 
bir.getFailure();
+                                    boolean failed = false;
+                                    if (f != null) {
+                                        if 
(f.getStatus().equals(RestStatus.CONFLICT)) {
+                                            if (conflictCounter != null) {
+                                                
conflictCounter.scope("doc_conflicts").incrBy(1);
+                                            }
+                                            log.debug("Doc conflict ID {}", 
bir.getId());
+                                        } else {
+                                            log.error(
+                                                    "Bulk item failure ID {}: 
{}", bir.getId(), f);
+                                            failed = true;
+                                        }
+                                    }
+                                    return new 
BulkItemResponseToFailedFlag(bir, failed);
+                                })
+                        .collect(
+                                // 
https://github.com/apache/stormcrawler/issues/832
+                                Collectors.groupingBy(b -> b.id, 
Collectors.toUnmodifiableList()));
+
+        Map<String, List<Tuple>> presentTuples;
+        long estimatedSize;
+        Set<String> debugInfo = null;
+        lock.lock();
+        try {
+            presentTuples = cache.getAllPresent(idsToBulkItems.keySet());
+            if (!presentTuples.isEmpty()) {
+                cache.invalidateAll(presentTuples.keySet());
+            }
+            estimatedSize = cache.estimatedSize();
+            if (log.isDebugEnabled() && estimatedSize > 0L) {
+                debugInfo = new HashSet<>(cache.asMap().keySet());
+            }
+        } finally {
+            lock.unlock();
+        }
+
+        int ackCount = 0;
+        int failureCount = 0;
+
+        for (var entry : presentTuples.entrySet()) {
+            final var id = entry.getKey();
+            final var tuples = entry.getValue();
+            final var bulkItems = idsToBulkItems.get(id);
+
+            BulkItemResponseToFailedFlag selected = selectBest(bulkItems, id);
+
+            if (tuples != null) {
+                log.debug("Found {} tuple(s) for ID {}", tuples.size(), id);
+                for (Tuple t : tuples) {
+                    if (selected.failed) {
+                        failureCount++;
+                    } else {
+                        ackCount++;
+                    }
+                    action.handle(id, t, selected);
+                }
+            } else {
+                log.warn("Could not find unacked tuples for {}", id);
+            }
+        }
+
+        log.info(
+                "Bulk response [{}] : items {}, waitAck {}, acked {}, failed 
{}",
+                executionId,
+                idsToBulkItems.size(),
+                estimatedSize,
+                ackCount,
+                failureCount);
+
+        if (debugInfo != null) {
+            for (String k : debugInfo) {
+                log.debug("Still in wait ack after bulk response [{}] => {}", 
executionId, k);
+            }
+        }
+    }
+
+    /** Processes a failed bulk request by failing all associated tuples. */
+    public void processFailedBulk(
+            BulkRequest request, long executionId, Throwable failure, 
Consumer<Tuple> failAction) {
+
+        log.error("Exception with bulk {} - failing the whole lot ", 
executionId, failure);
+
+        final var failedIds =
+                request.requests().stream()
+                        .map(DocWriteRequest::id)
+                        .collect(Collectors.toUnmodifiableSet());
+
+        Map<String, List<Tuple>> failedTupleLists;
+        lock.lock();
+        try {
+            failedTupleLists = cache.getAllPresent(failedIds);
+            if (!failedTupleLists.isEmpty()) {
+                cache.invalidateAll(failedTupleLists.keySet());
+            }
+        } finally {
+            lock.unlock();
+        }
+
+        for (var id : failedIds) {
+            var tuples = failedTupleLists.get(id);
+            if (tuples != null) {
+                log.debug("Failed {} tuple(s) for ID {}", tuples.size(), id);
+                for (Tuple t : tuples) {
+                    failAction.accept(t);
+                }
+            } else {
+                log.warn("Could not find unacked tuple for {}", id);
+            }
+        }
+    }
+
+    /**
+     * Selects the best response when there are multiple bulk items for the 
same document ID.
+     * Prefers non-failed responses; warns when there is a mix of success and 
failure.
+     */
+    private BulkItemResponseToFailedFlag selectBest(
+            List<BulkItemResponseToFailedFlag> items, String id) {
+        if (items.size() == 1) {
+            return items.get(0);
+        }
+
+        BulkItemResponseToFailedFlag best = items.get(0);
+        int failedCount = 0;
+        for (var item : items) {
+            if (item.failed) {
+                failedCount++;
+            } else {
+                best = item;
+            }
+        }
+        if (failedCount > 0 && failedCount < items.size()) {

Review Comment:
   If all items are failed, best stays as items.get(0) (a failed item), which 
is actually correct behaviour **but** the warning only fires when there's a 
mix. If there are multiple failed items for the same ID, no warning is logged 
and the first one is silently used. Not a bug per se, but worth a comment.



##########
external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java:
##########
@@ -184,135 +132,27 @@ public void beforeBulk(long executionId, BulkRequest 
request) {}
 
     @Override
     public void afterBulk(long executionId, BulkRequest request, BulkResponse 
response) {
-        var idsToBulkItemsWithFailedFlag =

Review Comment:
   In the old DeletionBolt, a conflict was silently ignored (just LOG.debug). 
The new shared path in processBulkResponse also only logs at debug. That's 
fine. But the IndexerBolt path previously incremented 
eventCounter.scope("doc_conflicts")  that counter now comes from the 
conflictCounter parameter, which DeletionBolt passes as null. So deletion 
conflicts are no longer counted anywhere. If that's intentional it should be 
documented; if not, it's a regression.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to