mreutegg commented on code in PR #920:
URL: https://github.com/apache/jackrabbit-oak/pull/920#discussion_r1205066097
##########
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java:
##########
@@ -1270,6 +1361,86 @@ private Map<String, NodeDocument>
getCachedNodes(Set<String> keys) {
return nodes;
}
+ @NotNull
+ private <T extends Document> Map<UpdateOp, T> bulkModify(final
Collection<T> collection, final List<UpdateOp> updateOps,
+ final Map<String,
T> oldDocs) {
+ Map<String, UpdateOp> bulkOperations = createMap(updateOps);
+ Set<String> lackingDocs = difference(bulkOperations.keySet(),
oldDocs.keySet());
+ oldDocs.putAll(findDocuments(collection, lackingDocs));
+
+ CacheChangesTracker tracker = null;
+ if (collection == NODES) {
+ tracker = nodesCache.registerTracker(bulkOperations.keySet());
+ }
+
+ try {
+ final BulkRequestResult bulkResult = sendBulkRequest(collection,
bulkOperations.values(), oldDocs, false);
+ final Set<String> potentiallyUpdatedDocsSet =
difference(bulkOperations.keySet(), bulkResult.failedUpdates);
+
+ // fetch all the docs which haven't failed, they might have passed
+ final Map<String, T> updatedDocsMap = findDocuments(collection,
potentiallyUpdatedDocsSet);
+
+ if (collection == NODES) {
+
+ List<NodeDocument> docsToCache = new ArrayList<>();
+
+ if (bulkResult.modifiedCount ==
potentiallyUpdatedDocsSet.size()) {
+ // all documents had been updated, now we can simply
+ // apply the update op on oldDocs and update the cache
+ potentiallyUpdatedDocsSet.forEach(key -> {
+ T oldDoc = oldDocs.get(key);
+ if (oldDoc != null && oldDoc != NULL) {
+ NodeDocument newDoc = (NodeDocument)
applyChanges(collection, oldDoc, bulkOperations.get(key));
+ docsToCache.add(newDoc);
+ }
+ });
+ } else {
+ // some documents might have not been updated, lets fetch
them from database
+ // and found out which had not been updated
+ updatedDocsMap.forEach((key, value) -> {
+ T oldDoc = oldDocs.get(key);
+ if (isNull(oldDoc) || oldDoc == NULL ||
Objects.equals(oldDoc.getModCount(), value.getModCount())) {
+ // simply ignore updating the document cache in
case
+ // 1. oldDoc is null
+ // 2. document didn't get updated i.e. modCount is
same after update operation
+ log("Skipping updating doc cache for {}", key);
Review Comment:
The log() method does not support markers in the message.
##########
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java:
##########
@@ -1270,6 +1361,86 @@ private Map<String, NodeDocument>
getCachedNodes(Set<String> keys) {
return nodes;
}
+ @NotNull
+ private <T extends Document> Map<UpdateOp, T> bulkModify(final
Collection<T> collection, final List<UpdateOp> updateOps,
+ final Map<String,
T> oldDocs) {
+ Map<String, UpdateOp> bulkOperations = createMap(updateOps);
+ Set<String> lackingDocs = difference(bulkOperations.keySet(),
oldDocs.keySet());
+ oldDocs.putAll(findDocuments(collection, lackingDocs));
+
+ CacheChangesTracker tracker = null;
+ if (collection == NODES) {
+ tracker = nodesCache.registerTracker(bulkOperations.keySet());
+ }
+
+ try {
+ final BulkRequestResult bulkResult = sendBulkRequest(collection,
bulkOperations.values(), oldDocs, false);
+ final Set<String> potentiallyUpdatedDocsSet =
difference(bulkOperations.keySet(), bulkResult.failedUpdates);
+
+ // fetch all the docs which haven't failed, they might have passed
+ final Map<String, T> updatedDocsMap = findDocuments(collection,
potentiallyUpdatedDocsSet);
+
+ if (collection == NODES) {
+
+ List<NodeDocument> docsToCache = new ArrayList<>();
+
+ if (bulkResult.modifiedCount ==
potentiallyUpdatedDocsSet.size()) {
+ // all documents had been updated, now we can simply
+ // apply the update op on oldDocs and update the cache
+ potentiallyUpdatedDocsSet.forEach(key -> {
+ T oldDoc = oldDocs.get(key);
+ if (oldDoc != null && oldDoc != NULL) {
+ NodeDocument newDoc = (NodeDocument)
applyChanges(collection, oldDoc, bulkOperations.get(key));
+ docsToCache.add(newDoc);
+ }
+ });
+ } else {
+ // some documents might have not been updated, lets fetch
them from database
+ // and found out which had not been updated
+ updatedDocsMap.forEach((key, value) -> {
+ T oldDoc = oldDocs.get(key);
+ if (isNull(oldDoc) || oldDoc == NULL ||
Objects.equals(oldDoc.getModCount(), value.getModCount())) {
+ // simply ignore updating the document cache in
case
+ // 1. oldDoc is null
+ // 2. document didn't get updated i.e. modCount is
same after update operation
+ log("Skipping updating doc cache for {}", key);
+ } else {
+ NodeDocument newDoc = (NodeDocument)
applyChanges(collection, oldDoc, bulkOperations.get(key));
+ docsToCache.add(newDoc);
+ }
+ });
+ }
+ nodesCache.putNonConflictingDocs(tracker, docsToCache);
+ }
+ oldDocs.keySet().removeAll(bulkResult.failedUpdates);
+
+ final Map<UpdateOp, T> result = new HashMap<>(oldDocs.size());
+
+ // document might have been updated, if updated then add oldDoc
else add null to result
+ bulkOperations.entrySet().stream().filter(e ->
!bulkResult.failedUpdates.contains(e.getKey())).forEach(e -> {
+ T updated = updatedDocsMap.get(e.getKey());
+ T oldDoc = oldDocs.get(e.getKey());
+ if (oldDoc == null || oldDoc == NULL ||
Objects.equals(oldDoc.getModCount(), updated.getModCount())) {
+ // add null value in result cause, either this document
didn't exist
+ // at time of modify operation, and we didn't anything for
it
+ // or oldDoc is present and modCount is same,
+ // so document had not been updated.
+ log("{} didn't get updated, returning null.", e.getKey());
Review Comment:
Same as above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]