mreutegg commented on a change in pull request #247:
URL: https://github.com/apache/jackrabbit-oak/pull/247#discussion_r481165574



##########
File path: 
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java
##########
@@ -376,7 +376,7 @@ static String arrayToCsv(Integer[] arr) {
      * internal reader of an existing clusterView document from the settings
      * collection
      **/
-    private static ClusterViewDocument doRead(DocumentNodeStore 
documentNodeStore) {
+    static ClusterViewDocument doRead(DocumentNodeStore documentNodeStore) {

Review comment:
       We can avoid opening this up by looking at the data in the 
`clusterNodes` collection using ClusterNodeInfoDocument. See comment in 
Sweep2Helper.

##########
File path: 
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java
##########
@@ -483,6 +483,15 @@ static boolean matches(Integer[] expected, Set<Integer> 
actual) {
         return new HashSet<Integer>(Arrays.asList(inactiveIds));
     }
 
+    /** Returns the set of active, recovering and inactive ids of this cluster 
view **/
+    Set<Integer> getAllIds() {

Review comment:
       This method is never used. Remove?
   
   The current set of clusterIds is also available in 
DocumentNodeStore#clusterNodes as keys.

##########
File path: 
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2Helper.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.Map;
+
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper methods for sweep2 functionality introduced with OAK-9176.
+ * Kept separate from DocumentNodeStore to limit its size.
+ */
+public class Sweep2Helper {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(Sweep2Helper.class);
+
+    static boolean isSweep2Necessary(DocumentStore store) {
+        NodeDocument rootNodeDoc = store.find(Collection.NODES, 
Utils.getIdFromPath("/"));
+        if (rootNodeDoc == null) {
+            // that'd be very weird
+            LOG.warn("isSweep2Necessary : cannot get root node - assuming no 
sweep2 needed");
+            return false;
+        }
+    
+        if (rootNodeDoc.get("_sweepRev") == null) {
+            // this indicates a pre 1.8 repository upgrade (case 1).
+            // no sweep2 is needed as it is embedded in the normal sweep[1].
+            return false;
+        }
+    
+        // in this case we have a post (>=) 1.8 repository
+        // which might or might not have previously been a pre (<) 1.8
+        // and we need to distinguish those 2 cases - which, to repeat, are:
+        // 2) Oak >= 1.8 which never did an Oak <= 1.6 upgrade:
+        //    -> no sweep2 is needed as OAK-9176 doesn't apply (the repository
+        //       never ran <= 1.6)
+        // 3) Oak >= 1.8 which was previously doing an Oak <= 1.6 upgrade:
+        //    -> A (full) sweep2 is needed. This is the main case of OAK-9176.
+        Map<Revision, String> bcValueMap = rootNodeDoc.getValueMap("_bc");
+        Map<Revision, String> valueMap = 
rootNodeDoc.getValueMap(NodeDocument.REVISIONS);
+        for (Map.Entry<Revision, String> entry : valueMap.entrySet()) {
+            Revision rev = entry.getKey();
+    
+            // consider all clusterIds..
+            String rawCommitValue = entry.getValue();
+            String cv = rawCommitValue;
+            if (cv == null) {
+                // skip
+                continue;
+            }
+            Revision cRev = Utils.resolveCommitRevision(rev, cv);
+            if (cRev.equals(rev)) {
+                // fine
+                continue;
+            } else {
+                // then rev should be in the branch commit list
+                if (bcValueMap.containsKey(rev)) {
+                    // all good
+                    continue;
+                }
+                // otherwise the "_bc" does *not* contain a branch commit
+                // which we suspect it should contain.
+                // that is an indicator of requiring a sweep2
+                // it might not, however, be sufficient, ie it might (really?)
+                // be that it got garbage collected away - in which
+                // case we'd be doing an unnecessary sweep2.
+                // so this is case 3)
+                return true;
+            }
+        }
+    
+        // this is case 2
+        return false;
+    }
+
+    /**
+     * Acquires a cluster singleton lock for doing a sweep2 if a sweep2 is 
necessary.
+     * If instructed this will check whether a sweep2 is necessary at all by
+     * inspecting the relevant node document(s).
+     * @param checkIfSweep2Necessary true permits this method to check
+     * whether sweep2 is necessary, false instructs this method to assume a 
sweep2 is necessary
+     * @return <ul>
+     * <li>
+     * &gt;0: the lock was successfully acquired and a sweep2 must now be done
+     * by the local instance. The returned value represents a simple lock value
+     * which needs to be provided for a successful unlock later on.
+     * </li>
+     * <li>
+     * 0: a sweep2 maybe must be done, but cannot at this point. A later retry 
should be done.
+     * This can happen when another instance is busy doing a sweep2 (and we
+     * monitor that other instance until it is done) - or because no 
discovery-lite
+     * status is available yet (so we don't know if the current owner of the 
sweep2 status
+     * crashed or not and how the local instance fits into the picture)
+     * </li>
+     * <li>
+     * -1: no sweep2 is necessary
+     * </li>
+     * </ul>
+     */
+    static long acquireSweep2LockIfNecessary(DocumentNodeStore ns, int 
clusterId, boolean checkIfSweep2Necessary) {
+        Sweep2StatusDocument status = 
Sweep2StatusDocument.readFrom(ns.getDocumentStore());
+        if (status != null && status.isSwept()) {
+            // nothing left to do.
+            // this should be the most frequent case.
+            return -1;
+        }
+    
+        if (status == null || !status.isSweeping()) {
+            // unknown or invalid status, derive from root document if told to 
check
+            if (checkIfSweep2Necessary && 
!isSweep2Necessary(ns.getDocumentStore())) {
+                // if no sweep2 is necessary, then mark it so in the settings 
collection.
+                if 
(!Sweep2StatusDocument.forceReleaseSweep2LockAndMarkSwept(ns.getDocumentStore(),
 clusterId)) {
+                    LOG.error("acquireSweep2LockIfNecessary: could not set the 
sweep2 status. Sweep2 not necessary though.");
+                }
+                // if we concluded that no sweep2 is necessary but failed to 
update
+                // the status in the settings collection we should continue 
anyway
+                // (without doing a sweep2 in the background)
+                return -1;
+            } else {
+                return Sweep2StatusDocument.acquireSweep2Lock(ns, clusterId);
+            }
+        }
+        // otherwise the status is "sweeping", which could be by ourselves or 
by another instance
+    
+        int lockClusterId = status.getLockClusterId();
+        if (lockClusterId == clusterId) {
+            // the local instance was the originator of the sweeping lock, but 
likely crashed
+            // hence we need to redo the work from scratch as we can't know if 
we finished it properly
+            LOG.info("acquireSweep2LockIfNecessary : sweep2 status was 
sweeping, locked by own instance ({}). "
+                    + "Another sweep2 is required.",
+                    clusterId);
+            return status.getLockValue();
+        }
+    
+        // another instance marked as sweeping - check to see if it is still 
active or it might have crashed
+        ClusterViewDocument clusterViewDoc = ClusterViewDocument.doRead(ns);
+        if (clusterViewDoc == null) {
+            // no clusterView yet - we need to let the caller retry later.
+            // this can happen if the DocumentDiscoveryLiteService wasn't
+            // activated yet or its BackgroundWorker didn't get to
+            // check the view/update it yet.
+            // since that is an async operation, this is a legitimate 
situation.
+            // but instead of retrying within this method, we leave that to the
+            // caller of this method - hence returning that special value of 0 
here.
+            LOG.info("acquireSweep2LockIfNecessary : no clusterView yet, need 
to determine status later");
+            return 0;
+        }
+    
+        if (clusterViewDoc.getActiveIds().contains(lockClusterId)) {
+            // then another instance is busy sweep2-ing, which is fine.
+            // but we should continue monitoring until that other instance is 
done
+            LOG.debug("acquireSweep2LockIfNecessary : another instance (id {}) 
is (still) busy doing a sweep2.",
+                    lockClusterId);
+            return 0;
+        }

Review comment:
       I suggest replacing this with:
   ```suggestion
           if (ClusterNodeInfoDocument.all(ns.getDocumentStore()).stream()
                   .anyMatch(info -> info.getClusterId() == lockClusterId && 
info.isActive())) {
               // then another instance is busy sweep2-ing, which is fine.
               // but we should continue monitoring until that other instance 
is done
               LOG.debug("acquireSweep2LockIfNecessary : another instance (id 
{}) is (still) busy doing a sweep2.",
                       lockClusterId);
               return 0;
           }
   ```
   

##########
File path: 
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeper2.java
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.partition;
+import static com.google.common.collect.Iterables.transform;
+import static com.google.common.collect.Maps.immutableEntry;
+import static com.google.common.collect.Maps.newHashMap;
+import static 
org.apache.jackrabbit.oak.plugins.document.NodeDocument.isDeletedEntry;
+import static 
org.apache.jackrabbit.oak.plugins.document.NodeDocument.removeCommitRoot;
+import static 
org.apache.jackrabbit.oak.plugins.document.NodeDocument.removeRevision;
+import static 
org.apache.jackrabbit.oak.plugins.document.NodeDocument.setDeletedOnce;
+import static 
org.apache.jackrabbit.oak.plugins.document.util.Utils.COMMITROOT_OR_REVISIONS;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.oak.commons.TimeDurationFormatter;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+
+/**
+ * The {@code NodeDocumentSweeper2} is used for the so-called sweep2, which is
+ * a repository traversal updating documents that have missing branch commit 
("_bc") 
+ * properties (details see OAK-9176).
+ * This class is similar to NodeDocumentSweeper as it is based on the same 
principles,
+ * with a few notable exceptions (like it only looks at _commitRoot and 
_revisions etc).
+ * And due to these exceptions the class is forked rather than 
modified/subclasses
+ * (also to enable later refactoring of the NodeDocumentSweeper itself).
+ * <p>
+ * This class is not thread-safe.
+ */
+final class NodeDocumentSweeper2 {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(NodeDocumentSweeper2.class);
+
+    private static final int YIELD_SIZE = 500;
+
+    private static final int INVALIDATE_BATCH_SIZE = 100;
+
+    private static final long LOGINTERVALMS = TimeUnit.MINUTES.toMillis(1);
+
+    private final RevisionContext context;
+
+    private final CommitValueResolver commitValueResolver;
+
+    private final int clusterId;
+
+    private final RevisionVector headRevision;
+
+    private final boolean sweepNewerThanHead;
+
+    private Revision head;
+
+    private long totalCount;
+    private long lastCount;
+    private long startOfScan;
+    private long lastLog;
+
+
+    /**
+     * Creates a new sweeper for the given context. The sweeper is initialized
+     * in the constructor with the head revision provided by the revision
+     * context. This is the head revision used later when the documents are
+     * check for uncommitted changes in
+     * {@link #sweep(Iterable, NodeDocumentSweepListener)}.
+     * <p>
+     * In combination with {@code sweepNewerThanHead == false}, the revision
+     * context may return a head revision that is not up-to-date, as long as it
+     * is consistent with documents passed to the {@code sweep()} method. That
+     * is, the documents must reflect all changes visible from the provided 
head
+     * revision. The sweeper will then only revert uncommitted changes up to 
the
+     * head revision. With {@code sweepNewerThanHead == true}, the sweeper will
+     * also revert uncommitted changes that are newer than the head revision.
+     * This is usually only useful during recovery of a cluster node, when it 
is
+     * guaranteed that there are no in-progress commits newer than the current
+     * head revision.
+     *
+     * @param context the revision context.
+     * @param sweepNewerThanHead whether uncommitted changes newer than the 
head
+     *                 revision should be reverted.
+     */
+    NodeDocumentSweeper2(RevisionContext context,
+                        CommitValueResolver commitValueResolver,
+                        boolean sweepNewerThanHead) {
+        this.context = checkNotNull(context);
+        this.commitValueResolver = checkNotNull(commitValueResolver);
+        this.clusterId = context.getClusterId();
+        this.headRevision= context.getHeadRevision();
+        this.sweepNewerThanHead = sweepNewerThanHead;
+    }
+
+    /**
+     * Performs a sweep and reports the required updates to the given sweep
+     * listener. The returned revision is the new sweep revision for the
+     * clusterId associated with the revision context used to create this
+     * sweeper. The caller is responsible for storing the returned sweep
+     * revision on the root document. This method returns {@code null} if no
+     * update was possible.
+     *
+     * @param documents the documents to sweep
+     * @param listener the listener to receive required sweep update 
operations.
+     * @return the new sweep revision or {@code null} if no updates were done.
+     * @throws DocumentStoreException if reading from the store or writing to
+     *          the store failed.
+     */
+    @Nullable

Review comment:
       Nullable does not apply to void return type.

##########
File path: 
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
##########
@@ -2493,6 +2541,114 @@ private void maybeRefreshHeadRevision() {
 
     }
 
+    /**
+     * Executes the sweep2 (from within a background thread)
+     * @param sweep2Lock the lock value originally acquired
+     * @return true if sweep2 is done or no longer needed, false otherwise (in 
which case it should be retried)
+     * @throws DocumentStoreException
+     */
+    private boolean backgroundSweep2(long sweep2Lock) throws 
DocumentStoreException {
+        if (sweep2Lock == 0) {
+            sweep2Lock = Sweep2Helper.acquireSweep2LockIfNecessary(this, 
clusterId,
+                    false /* no check required as we know in backgroundSweep2 
that it is necessary */);
+            if (sweep2Lock == 0) {
+                // still not well defined, retry in a minute (done in 
BackgroundSweep2Operation)
+                return false;
+            }
+            if (sweep2Lock == -1) {
+                // then we're done
+                return true;
+            }
+        }
+        // sweep2Lock > 0
+
+        // at this point we did properly acquire a lock and can go ahead doing 
sweep2
+        LOG.info("backgroundSweep2: starting sweep2.");
+        int num = forceBackgroundSweep2(new Revision(0, 0, clusterId));
+        LOG.info("backgroundSweep2: finished sweep2, num swept=" + num);
+
+        // release the lock.
+        // Note that in theory someone else could have released our lock, or 
that
+        // the sweep2 status was deleted - that actually doesn't matter:
+        // we just went through a full, successful sweep2 and we want to 
record it
+        // that way - irrespective of any interference with the status
+        // -> hence the 'force' aspect of releasing here
+        if (!Sweep2StatusDocument.forceReleaseSweep2LockAndMarkSwept(store, 
clusterId)) {
+            LOG.error("backgroundSweep2 : sweep2 finished but we failed to 
update the sweep2 status accordingly");
+        }
+        return true;
+    }
+
+    private int forceBackgroundSweep2(Revision startRev) throws 
DocumentStoreException {

Review comment:
       Why is there a Revision argument? It is always `new Revision(0, 0, 
clusterId)`.

##########
File path: 
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2StatusDocument.java
##########
@@ -0,0 +1,180 @@
+package org.apache.jackrabbit.oak.plugins.document;

Review comment:
       License is missing.

##########
File path: 
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeper2.java
##########
@@ -0,0 +1,357 @@
+/*

Review comment:
       General comment about this class: there is a lot of code duplication.

##########
File path: 
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeper2.java
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.partition;
+import static com.google.common.collect.Iterables.transform;
+import static com.google.common.collect.Maps.immutableEntry;
+import static com.google.common.collect.Maps.newHashMap;
+import static 
org.apache.jackrabbit.oak.plugins.document.NodeDocument.isDeletedEntry;
+import static 
org.apache.jackrabbit.oak.plugins.document.NodeDocument.removeCommitRoot;
+import static 
org.apache.jackrabbit.oak.plugins.document.NodeDocument.removeRevision;
+import static 
org.apache.jackrabbit.oak.plugins.document.NodeDocument.setDeletedOnce;
+import static 
org.apache.jackrabbit.oak.plugins.document.util.Utils.COMMITROOT_OR_REVISIONS;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.oak.commons.TimeDurationFormatter;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+
+/**
+ * The {@code NodeDocumentSweeper2} is used for the so-called sweep2, which is
+ * a repository traversal updating documents that have missing branch commit 
("_bc") 
+ * properties (details see OAK-9176).
+ * This class is similar to NodeDocumentSweeper as it is based on the same 
principles,
+ * with a few notable exceptions (like it only looks at _commitRoot and 
_revisions etc).
+ * And due to these exceptions the class is forked rather than 
modified/subclasses
+ * (also to enable later refactoring of the NodeDocumentSweeper itself).
+ * <p>
+ * This class is not thread-safe.
+ */
+final class NodeDocumentSweeper2 {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(NodeDocumentSweeper2.class);
+
+    private static final int YIELD_SIZE = 500;
+
+    private static final int INVALIDATE_BATCH_SIZE = 100;
+
+    private static final long LOGINTERVALMS = TimeUnit.MINUTES.toMillis(1);
+
+    private final RevisionContext context;
+
+    private final CommitValueResolver commitValueResolver;
+
+    private final int clusterId;
+
+    private final RevisionVector headRevision;
+
+    private final boolean sweepNewerThanHead;
+
+    private Revision head;
+
+    private long totalCount;
+    private long lastCount;
+    private long startOfScan;
+    private long lastLog;
+
+
+    /**
+     * Creates a new sweeper for the given context. The sweeper is initialized
+     * in the constructor with the head revision provided by the revision
+     * context. This is the head revision used later when the documents are
+     * check for uncommitted changes in
+     * {@link #sweep(Iterable, NodeDocumentSweepListener)}.
+     * <p>
+     * In combination with {@code sweepNewerThanHead == false}, the revision
+     * context may return a head revision that is not up-to-date, as long as it
+     * is consistent with documents passed to the {@code sweep()} method. That
+     * is, the documents must reflect all changes visible from the provided 
head
+     * revision. The sweeper will then only revert uncommitted changes up to 
the
+     * head revision. With {@code sweepNewerThanHead == true}, the sweeper will
+     * also revert uncommitted changes that are newer than the head revision.
+     * This is usually only useful during recovery of a cluster node, when it 
is
+     * guaranteed that there are no in-progress commits newer than the current
+     * head revision.
+     *
+     * @param context the revision context.
+     * @param sweepNewerThanHead whether uncommitted changes newer than the 
head
+     *                 revision should be reverted.
+     */
+    NodeDocumentSweeper2(RevisionContext context,
+                        CommitValueResolver commitValueResolver,
+                        boolean sweepNewerThanHead) {
+        this.context = checkNotNull(context);
+        this.commitValueResolver = checkNotNull(commitValueResolver);
+        this.clusterId = context.getClusterId();
+        this.headRevision= context.getHeadRevision();
+        this.sweepNewerThanHead = sweepNewerThanHead;
+    }
+
+    /**
+     * Performs a sweep and reports the required updates to the given sweep
+     * listener. The returned revision is the new sweep revision for the
+     * clusterId associated with the revision context used to create this
+     * sweeper. The caller is responsible for storing the returned sweep
+     * revision on the root document. This method returns {@code null} if no
+     * update was possible.
+     *
+     * @param documents the documents to sweep
+     * @param listener the listener to receive required sweep update 
operations.
+     * @return the new sweep revision or {@code null} if no updates were done.

Review comment:
       This method does not return anything.

##########
File path: 
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
##########
@@ -106,6 +108,26 @@ public boolean apply(@Nullable String input) {
         }
     };
 
+    /**
+     * A predicate for property and _deleted names.

Review comment:
       Needs update. Was copy/pasted from existing predicate.

##########
File path: 
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
##########
@@ -718,6 +756,15 @@ public int getMemory() {
 
             backgroundUpdateThread.start();
             backgroundSweepThread.start();
+
+            if (sweep2Lock >= 0) {
+                // sweep2 is necessary - so start a sweep2 background task
+                backgroundSweep2Thread = new Thread(
+                        new BackgroundSweep2Operation(this, isDisposed, 
sweep2Lock),
+                        "DocumentNodeStore background sweep2 thread " + 
threadNamePostfix);
+                backgroundSweep2Thread.setDaemon(true);

Review comment:
       Would rather move this above where all other background threads are 
created but only start the thread here if needed. 

##########
File path: 
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
##########
@@ -983,7 +1005,9 @@ public static void alignWithExternalRevisions(@NotNull 
NodeDocument rootDoc,
     public static void joinQuietly(Thread... threads) {
         for (Thread t : threads) {
             try {
-                t.join();
+                if (t != null) {
+                    t.join();
+                }

Review comment:
       My preference is to revert this change and instead always create the 
backgroundSweep2Thread. See other comment in DocumentNodeStore.

##########
File path: 
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
##########
@@ -106,6 +108,26 @@ public boolean apply(@Nullable String input) {
         }
     };
 
+    /**
+     * A predicate for property and _deleted names.
+     */
+    public static final Predicate<String> 
PROPERTY_OR_DELETED_OR_COMMITROOT_OR_REVISIONS = new Predicate<String>() {
+        @Override
+        public boolean apply(@Nullable String input) {
+            return Utils.isPropertyName(input) || isDeletedEntry(input) || 
isCommitRootEntry(input) || isRevisionsEntry(input);
+        }
+    };
+
+    /**
+     * A predicate for property and _deleted names.

Review comment:
       Needs update. Was copy/pasted from existing predicate.

##########
File path: 
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeper2.java
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.partition;
+import static com.google.common.collect.Iterables.transform;
+import static com.google.common.collect.Maps.immutableEntry;
+import static com.google.common.collect.Maps.newHashMap;
+import static 
org.apache.jackrabbit.oak.plugins.document.NodeDocument.isDeletedEntry;
+import static 
org.apache.jackrabbit.oak.plugins.document.NodeDocument.removeCommitRoot;
+import static 
org.apache.jackrabbit.oak.plugins.document.NodeDocument.removeRevision;
+import static 
org.apache.jackrabbit.oak.plugins.document.NodeDocument.setDeletedOnce;
+import static 
org.apache.jackrabbit.oak.plugins.document.util.Utils.COMMITROOT_OR_REVISIONS;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.oak.commons.TimeDurationFormatter;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+
+/**
+ * The {@code NodeDocumentSweeper2} is used for the so-called sweep2, which is
+ * a repository traversal updating documents that have missing branch commit 
("_bc") 
+ * properties (details see OAK-9176).
+ * This class is similar to NodeDocumentSweeper as it is based on the same 
principles,
+ * with a few notable exceptions (like it only looks at _commitRoot and 
_revisions etc).
+ * And due to these exceptions the class is forked rather than 
modified/subclasses
+ * (also to enable later refactoring of the NodeDocumentSweeper itself).
+ * <p>
+ * This class is not thread-safe.
+ */
+final class NodeDocumentSweeper2 {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(NodeDocumentSweeper2.class);
+
+    private static final int YIELD_SIZE = 500;
+
+    private static final int INVALIDATE_BATCH_SIZE = 100;
+
+    private static final long LOGINTERVALMS = TimeUnit.MINUTES.toMillis(1);
+
+    private final RevisionContext context;
+
+    private final CommitValueResolver commitValueResolver;
+
+    private final int clusterId;
+
+    private final RevisionVector headRevision;
+
+    private final boolean sweepNewerThanHead;
+
+    private Revision head;
+
+    private long totalCount;
+    private long lastCount;
+    private long startOfScan;
+    private long lastLog;
+
+
+    /**
+     * Creates a new sweeper for the given context. The sweeper is initialized
+     * in the constructor with the head revision provided by the revision
+     * context. This is the head revision used later when the documents are
+     * check for uncommitted changes in
+     * {@link #sweep(Iterable, NodeDocumentSweepListener)}.
+     * <p>
+     * In combination with {@code sweepNewerThanHead == false}, the revision
+     * context may return a head revision that is not up-to-date, as long as it
+     * is consistent with documents passed to the {@code sweep()} method. That
+     * is, the documents must reflect all changes visible from the provided 
head
+     * revision. The sweeper will then only revert uncommitted changes up to 
the
+     * head revision. With {@code sweepNewerThanHead == true}, the sweeper will
+     * also revert uncommitted changes that are newer than the head revision.
+     * This is usually only useful during recovery of a cluster node, when it 
is
+     * guaranteed that there are no in-progress commits newer than the current
+     * head revision.
+     *
+     * @param context the revision context.
+     * @param sweepNewerThanHead whether uncommitted changes newer than the 
head
+     *                 revision should be reverted.
+     */
+    NodeDocumentSweeper2(RevisionContext context,
+                        CommitValueResolver commitValueResolver,
+                        boolean sweepNewerThanHead) {
+        this.context = checkNotNull(context);
+        this.commitValueResolver = checkNotNull(commitValueResolver);
+        this.clusterId = context.getClusterId();
+        this.headRevision= context.getHeadRevision();
+        this.sweepNewerThanHead = sweepNewerThanHead;
+    }
+
+    /**
+     * Performs a sweep and reports the required updates to the given sweep
+     * listener. The returned revision is the new sweep revision for the
+     * clusterId associated with the revision context used to create this
+     * sweeper. The caller is responsible for storing the returned sweep
+     * revision on the root document. This method returns {@code null} if no
+     * update was possible.
+     *
+     * @param documents the documents to sweep
+     * @param listener the listener to receive required sweep update 
operations.
+     * @return the new sweep revision or {@code null} if no updates were done.
+     * @throws DocumentStoreException if reading from the store or writing to
+     *          the store failed.
+     */
+    @Nullable
+    void sweep(@NotNull Iterable<NodeDocument> documents,
+                   @NotNull NodeDocumentSweepListener listener)
+            throws DocumentStoreException {
+        performSweep(documents, checkNotNull(listener));
+    }
+
+    /**
+     * @return the head revision vector in use by this sweeper.
+     */
+    RevisionVector getHeadRevision() {
+        return headRevision;
+    }
+
+    //----------------------------< internal 
>----------------------------------
+
+    @Nullable
+    private void performSweep(Iterable<NodeDocument> documents,
+                                  NodeDocumentSweepListener listener)
+            throws DocumentStoreException {
+        head = headRevision.getRevision(clusterId);
+        totalCount = 0;
+        lastCount = 0;
+        startOfScan = context.getClock().getTime();
+        lastLog = startOfScan;
+
+        if (head == null) {
+            LOG.warn("Head revision does not have an entry for " +
+                            "clusterId {}. Sweeping of documents is skipped.",
+                    clusterId);
+            return;
+        }
+
+        Iterable<Map.Entry<Path, UpdateOp>> ops = sweepOperations(documents);
+        for (List<Map.Entry<Path, UpdateOp>> batch : partition(ops, 
INVALIDATE_BATCH_SIZE)) {
+            Map<Path, UpdateOp> updates = newHashMap();
+            for (Map.Entry<Path, UpdateOp> entry : batch) {
+                updates.put(entry.getKey(), entry.getValue());
+            }
+            listener.sweepUpdate(updates);
+        }
+        LOG.debug("Document sweep finished");
+    }
+
+    private Iterable<Map.Entry<Path, UpdateOp>> sweepOperations(
+            final Iterable<NodeDocument> docs) {
+        return filter(transform(docs,
+                new Function<NodeDocument, Map.Entry<Path, UpdateOp>>() {
+
+            int yieldCnt = 0;
+            long lastYield = System.currentTimeMillis();
+
+            @Override
+            public Map.Entry<Path, UpdateOp> apply(NodeDocument doc) {
+                if (++yieldCnt >= YIELD_SIZE) {
+                    try {
+                        Thread.sleep(Math.max(1, System.currentTimeMillis() - 
lastYield));
+                    } catch (InterruptedException e) {
+                        // ignore
+                    }
+                    lastYield = System.currentTimeMillis();
+                    yieldCnt = 0;
+                }
+                return immutableEntry(doc.getPath(), sweepOne(doc));
+            }
+        }), new Predicate<Map.Entry<Path, UpdateOp>>() {
+            @Override
+            public boolean apply(Map.Entry<Path, UpdateOp> input) {
+                return input.getValue() != null;
+            }
+        });
+    }
+
+    private UpdateOp sweepOne(NodeDocument doc) throws DocumentStoreException {
+        UpdateOp op = createUpdateOp(doc);
+        // go through PROPERTY_OR_DELETED_OR_COMMITROOT, whereas :
+        // - PROPERTY : for content changes
+        // - DELETED : for new node (this)
+        // - COMMITROOT : for new child (parent)
+        // - REVISIONS : for commit roots (root for branch commits)
+        for (String property : filter(doc.keySet(), COMMITROOT_OR_REVISIONS)) {
+            Map<Revision, String> valueMap = doc.getLocalMap(property);
+            for (Map.Entry<Revision, String> entry : valueMap.entrySet()) {
+                Revision rev = entry.getKey();
+
+                // consider any clusterId for sweep2
+
+                Revision cRev = getCommitRevision(doc, rev);
+                if (cRev == null) {
+                    uncommitted(doc, property, rev, op);

Review comment:
       This will remove valid committed changes!
   
   NodeDocumentSweeper2 runs on a CommitValueResolver without sweep revisions. 
Starting with 1.8 and introduction of the sweep revision, the revision garbage 
collector is allowed to remove split documents with commit information older 
than the sweep revision. The changes referring to the removed commit 
information must still be considered committed because of to the sweep revision.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to