tmaret commented on a change in pull request #97:
URL: 
https://github.com/apache/sling-org-apache-sling-distribution-journal/pull/97#discussion_r805421130



##########
File path: 
src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
##########
@@ -46,15 +50,20 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriber.escapeTopicName;
 import static 
org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS;
 import static 
org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_PATHS;
 
 @Component(immediate = true)
 @ParametersAreNonnullByDefault
 public class PackageDistributedNotifier implements TopologyChangeHandler {
 
+    public static final String STORE_TYPE_OFFSETS = "lastRaisedEventOffset";
+
     private static final Logger LOG = 
LoggerFactory.getLogger(PackageDistributedNotifier.class);
 
+    private final Map<String, LocalStore> map = new HashMap<>();

Review comment:
       It's a detail, we may rename that field `localStores` to capture what 
the map contains.

##########
File path: 
src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
##########
@@ -130,6 +150,9 @@ private void sendEvt(String pubAgentName, 
DistributionQueueItem queueItem) {
         try {
             Event distributed = 
DistributionEvent.eventPackageDistributed(queueItem, pubAgentName);
             eventAdmin.sendEvent(distributed);
+            LocalStore localStore = map.get(pubAgentName);
+            localStore.store(STORE_TYPE_OFFSETS, 
queueItem.get(QueueItemFactory.RECORD_OFFSET));

Review comment:
       Agreed. To reduce the load on the repository without compromising 
processing guarantees, we should store the `lastRaisedEventOffset` either (a) 
on a periodical basis if it has changed, or (b) after a given number of 
consecutive changes.

##########
File path: 
src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
##########
@@ -130,6 +150,9 @@ private void sendEvt(String pubAgentName, 
DistributionQueueItem queueItem) {
         try {
             Event distributed = 
DistributionEvent.eventPackageDistributed(queueItem, pubAgentName);
             eventAdmin.sendEvent(distributed);
+            LocalStore localStore = map.get(pubAgentName);
+            localStore.store(STORE_TYPE_OFFSETS, 
queueItem.get(QueueItemFactory.RECORD_OFFSET));
+            map.put(pubAgentName, localStore);

Review comment:
       Putting the entry back into the map is not necessary since the code 
obtains a reference to the entry and update the entry directly.

##########
File path: 
src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
##########
@@ -91,6 +103,14 @@ public void changed(TopologyViewDiff diffView) {
      */
     private void processOffsets(String pubAgentName, Supplier<LongStream> 
offsets) {
         long minOffset = offsets.get().findFirst().getAsLong();
+        if (!map.containsKey(pubAgentName)) {

Review comment:
       The `containsKey` and `put` methods could be replaced with a single 
invocation to the `computeIfAbsent` method which would simplify the code.
   
   ```
   LocalStore store = map.computeIfAbsent(pubAgentName, this::newLocalStore);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to