mreutegg commented on code in PR #920:
URL: https://github.com/apache/jackrabbit-oak/pull/920#discussion_r1205066097


##########
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java:
##########
@@ -1270,6 +1361,86 @@ private Map<String, NodeDocument> 
getCachedNodes(Set<String> keys) {
         return nodes;
     }
 
+    @NotNull
+    private <T extends Document> Map<UpdateOp, T> bulkModify(final 
Collection<T> collection, final List<UpdateOp> updateOps,
+                                                             final Map<String, 
T> oldDocs) {
+        Map<String, UpdateOp> bulkOperations = createMap(updateOps);
+        Set<String> lackingDocs = difference(bulkOperations.keySet(), 
oldDocs.keySet());
+        oldDocs.putAll(findDocuments(collection, lackingDocs));
+
+        CacheChangesTracker tracker = null;
+        if (collection == NODES) {
+            tracker = nodesCache.registerTracker(bulkOperations.keySet());
+        }
+
+        try {
+            final BulkRequestResult bulkResult = sendBulkRequest(collection, 
bulkOperations.values(), oldDocs, false);
+            final Set<String> potentiallyUpdatedDocsSet = 
difference(bulkOperations.keySet(), bulkResult.failedUpdates);
+
+            // fetch all the docs which haven't failed, they might have passed
+            final Map<String, T> updatedDocsMap = findDocuments(collection, 
potentiallyUpdatedDocsSet);
+
+            if (collection == NODES) {
+
+                List<NodeDocument> docsToCache = new ArrayList<>();
+
+                if (bulkResult.modifiedCount == 
potentiallyUpdatedDocsSet.size()) {
+                    // all documents had been updated, now we can simply
+                    // apply the update op on oldDocs and update the cache
+                    potentiallyUpdatedDocsSet.forEach(key -> {
+                        T oldDoc = oldDocs.get(key);
+                        if (oldDoc != null && oldDoc != NULL) {
+                            NodeDocument newDoc = (NodeDocument) 
applyChanges(collection, oldDoc, bulkOperations.get(key));
+                            docsToCache.add(newDoc);
+                        }
+                    });
+                } else {
+                    // some documents might have not been updated, lets fetch 
them from database
+                    // and found out which had not been updated
+                    updatedDocsMap.forEach((key, value) -> {
+                        T oldDoc = oldDocs.get(key);
+                        if (isNull(oldDoc) || oldDoc == NULL || 
Objects.equals(oldDoc.getModCount(), value.getModCount())) {
+                            // simply ignore updating the document cache in 
case
+                            // 1. oldDoc is null
+                            // 2. document didn't get updated i.e. modCount is 
same after update operation
+                            log("Skipping updating doc cache for {}", key);

Review Comment:
   The log() method does not support markers in the message.



##########
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java:
##########
@@ -1270,6 +1361,86 @@ private Map<String, NodeDocument> 
getCachedNodes(Set<String> keys) {
         return nodes;
     }
 
+    @NotNull
+    private <T extends Document> Map<UpdateOp, T> bulkModify(final 
Collection<T> collection, final List<UpdateOp> updateOps,
+                                                             final Map<String, 
T> oldDocs) {
+        Map<String, UpdateOp> bulkOperations = createMap(updateOps);
+        Set<String> lackingDocs = difference(bulkOperations.keySet(), 
oldDocs.keySet());
+        oldDocs.putAll(findDocuments(collection, lackingDocs));
+
+        CacheChangesTracker tracker = null;
+        if (collection == NODES) {
+            tracker = nodesCache.registerTracker(bulkOperations.keySet());
+        }
+
+        try {
+            final BulkRequestResult bulkResult = sendBulkRequest(collection, 
bulkOperations.values(), oldDocs, false);
+            final Set<String> potentiallyUpdatedDocsSet = 
difference(bulkOperations.keySet(), bulkResult.failedUpdates);
+
+            // fetch all the docs which haven't failed, they might have passed
+            final Map<String, T> updatedDocsMap = findDocuments(collection, 
potentiallyUpdatedDocsSet);
+
+            if (collection == NODES) {
+
+                List<NodeDocument> docsToCache = new ArrayList<>();
+
+                if (bulkResult.modifiedCount == 
potentiallyUpdatedDocsSet.size()) {
+                    // all documents had been updated, now we can simply
+                    // apply the update op on oldDocs and update the cache
+                    potentiallyUpdatedDocsSet.forEach(key -> {
+                        T oldDoc = oldDocs.get(key);
+                        if (oldDoc != null && oldDoc != NULL) {
+                            NodeDocument newDoc = (NodeDocument) 
applyChanges(collection, oldDoc, bulkOperations.get(key));
+                            docsToCache.add(newDoc);
+                        }
+                    });
+                } else {
+                    // some documents might have not been updated, lets fetch 
them from database
+                    // and found out which had not been updated
+                    updatedDocsMap.forEach((key, value) -> {
+                        T oldDoc = oldDocs.get(key);
+                        if (isNull(oldDoc) || oldDoc == NULL || 
Objects.equals(oldDoc.getModCount(), value.getModCount())) {
+                            // simply ignore updating the document cache in 
case
+                            // 1. oldDoc is null
+                            // 2. document didn't get updated i.e. modCount is 
same after update operation
+                            log("Skipping updating doc cache for {}", key);
+                        } else {
+                            NodeDocument newDoc = (NodeDocument) 
applyChanges(collection, oldDoc, bulkOperations.get(key));
+                            docsToCache.add(newDoc);
+                        }
+                    });
+                }
+                nodesCache.putNonConflictingDocs(tracker, docsToCache);
+            }
+            oldDocs.keySet().removeAll(bulkResult.failedUpdates);
+
+            final Map<UpdateOp, T> result = new HashMap<>(oldDocs.size());
+
+            // document might have been updated, if updated then add oldDoc 
else add null to result
+            bulkOperations.entrySet().stream().filter(e -> 
!bulkResult.failedUpdates.contains(e.getKey())).forEach(e -> {
+                T updated = updatedDocsMap.get(e.getKey());
+                T oldDoc = oldDocs.get(e.getKey());
+                if (oldDoc == null || oldDoc == NULL || 
Objects.equals(oldDoc.getModCount(), updated.getModCount())) {
+                    // add null value in result cause, either this document 
didn't exist
+                    // at time of modify operation, and we didn't anything for 
it
+                    // or oldDoc is present and modCount is same,
+                    // so document had not been updated.
+                    log("{} didn't get updated, returning null.", e.getKey());

Review Comment:
   Same as above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to