mreutegg commented on code in PR #635:
URL: https://github.com/apache/jackrabbit-oak/pull/635#discussion_r927376212


##########
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBuilder.java:
##########
@@ -100,6 +100,12 @@
     static final int MANY_CHILDREN_THRESHOLD = Integer.getInteger(
             "oak.documentMK.manyChildren", 50);
 
+    public static final int DEFAULT_THROTTLING_THRESHOLD = Integer.getInteger(

Review Comment:
   Can you please add some comments what this threshold is about?



##########
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Configuration.java:
##########
@@ -267,4 +267,13 @@
             @Option(label = "STRICT", value = "STRICT"),
             @Option(label = "LENIENT", value = "LENIENT")})
     String leaseCheckMode() default "STRICT";
+
+    @AttributeDefinition(
+            name = "Document Node Store throttling",
+            description = "Boolean value indicating whether throttling should 
be " +
+                    "enabled for document node store or not. The Default value 
is " +
+                    DocumentNodeStoreService.DEFAULT_THROTTLE_DOCUMENT_STORE +
+                    "Note that this value can be overridden via framework " +
+                    "property 'oak.documentstore.throttleDocumentStore'")

Review Comment:
   I'm not too happy with the name, because it suggests the DocumentStore is 
always throttled and _documentstore_ appears twice in the property name.



##########
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBuilder.java:
##########
@@ -100,6 +100,12 @@
     static final int MANY_CHILDREN_THRESHOLD = Integer.getInteger(
             "oak.documentMK.manyChildren", 50);
 
+    public static final int DEFAULT_THROTTLING_THRESHOLD = Integer.getInteger(
+            "oak.documentMK.throttlingThreshold", 2);
+
+    public static final long DEFAULT_THROTTLING_TIME = Long.getLong(

Review Comment:
   Please some comment what this is and add time unit to name. E.g. 
throttlingTimeMs for milliseconds.



##########
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ThrottlingMetrics.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+/**
+ * Interface to expose throttling metrics.
+ *
+ * Concrete implementations of this interface are required to provide 
implementations
+ * of this class expose their throttling metrics
+ */
+public interface ThrottlingMetrics {
+
+    int threshold();
+
+    double currValue();
+
+    long throttlingTime();

Review Comment:
   Can you please add JavaDoc to these three methods and explain what those 
values are?



##########
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStoreThrottling.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.AtomicDouble;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStoreThrottling;
+import org.bson.BsonTimestamp;
+import org.bson.Document;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+public class MongoDocumentStoreThrottling implements DocumentStoreThrottling {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoDocumentStoreThrottling.class);
+    static final String TS_TIME = "ts";
+    private static final String NATURAL = "$natural";
+    private static final String MAX_SIZE = "maxSize";
+    private static final String OPLOG_RS = "oplog.rs";
+    public static final String SIZE = "size";
+    private final ScheduledExecutorService throttlingExecutor;
+    private final AtomicDouble oplogWindow;
+    private final MongoDatabase localDb;
+
+    public MongoDocumentStoreThrottling(final @NotNull MongoDatabase localDb, 
final @NotNull MongoThrottlingMetrics mongoThrottlingMetrics) {
+        this.throttlingExecutor = Executors.newSingleThreadScheduledExecutor();
+        this.oplogWindow = mongoThrottlingMetrics.oplogWindow;
+        this.localDb = localDb;
+    }
+
+    public void updateMetrics() {
+        throttlingExecutor.scheduleAtFixedRate(() -> {
+            Document document = localDb.runCommand(new Document("collStats", 
OPLOG_RS));
+            if (!document.containsKey(MAX_SIZE) || 
!document.containsKey(SIZE)) {
+                LOG.warn("Could not get stats for local.{}  collection. 
collstats returned: {}.", OPLOG_RS, document);
+            } else {
+                int maxSize = document.getInteger(MAX_SIZE);
+                double maxSizeGb = (double) maxSize / (1024 * 1024 * 1024);
+                int usedSize = document.getInteger(SIZE);
+                double usedSizeGb = Math.ceil(((double) usedSize / (1024 * 
1024 * 1024)) * 1000000) / 1000000;
+                MongoCollection<Document> localDbCollection = 
localDb.getCollection(OPLOG_RS);
+                Document first = localDbCollection.find().sort(new 
Document(NATURAL, 1)).limit(1).first();
+                Document last = localDbCollection.find().sort(new 
Document(NATURAL, -1)).limit(1).first();
+
+                if (Objects.isNull(first) || Objects.isNull(last)) {
+                    LOG.warn("Objects not found in local.oplog.rs -- is this a 
new and empty db instance?");
+                } else {
+                    if (!first.containsKey(TS_TIME) || 
!last.containsKey(TS_TIME)) {
+                        LOG.warn("ts element not found in oplog objects");
+                    } else {
+                        oplogWindow.set(updateOplogWindow(maxSizeGb, 
usedSizeGb, first, last));
+                    }
+                }
+            }
+        }, 10, 30, SECONDS);
+    }
+
+    // helper methods
+    @VisibleForTesting
+    double updateOplogWindow(final double maxSize, final double usedSize, 
final @NotNull Document first,

Review Comment:
   I think it's perfectly fine to have this package private. The method can 
even be static. It does not access any state of MongoDocumentStoreThrottling.



##########
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStoreThrottling.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.AtomicDouble;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStoreThrottling;
+import org.bson.BsonTimestamp;
+import org.bson.Document;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+public class MongoDocumentStoreThrottling implements DocumentStoreThrottling {

Review Comment:
   I think this class should have a different name. AFAICS it doesn't do any 
throttling. Its main purpose is to calculate the oplog window.



##########
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/ThrottlingDocumentStoreWrapper.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.math.DoubleMath;
+import org.apache.jackrabbit.oak.cache.CacheStats;
+import org.apache.jackrabbit.oak.plugins.document.Collection;
+import org.apache.jackrabbit.oak.plugins.document.Document;
+import org.apache.jackrabbit.oak.plugins.document.ThrottlingMetrics;
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException;
+import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+import static java.lang.Thread.sleep;
+
+/**
+ * Wrapper of another DocumentStore that does a throttling check on any method
+ * invocation (create or update) and throttled the system if under high load.
+ * <p>
+ *     TODO update issue
+ * @see "https://issues.apache.org/jira/browse/OAK-2739 for more details"

Review Comment:
   This looks unrelated. Remove?



##########
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/ThrottlingDocumentStoreWrapper.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.math.DoubleMath;
+import org.apache.jackrabbit.oak.cache.CacheStats;
+import org.apache.jackrabbit.oak.plugins.document.Collection;
+import org.apache.jackrabbit.oak.plugins.document.Document;
+import org.apache.jackrabbit.oak.plugins.document.ThrottlingMetrics;
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException;
+import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+import static java.lang.Thread.sleep;
+
+/**
+ * Wrapper of another DocumentStore that does a throttling check on any method
+ * invocation (create or update) and throttled the system if under high load.
+ * <p>
+ *     TODO update issue
+ * @see "https://issues.apache.org/jira/browse/OAK-2739 for more details"
+ */
+public class ThrottlingDocumentStoreWrapper implements DocumentStore {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ThrottlingDocumentStoreWrapper.class);
+
+    @NotNull
+    private final DocumentStore store;
+    public ThrottlingDocumentStoreWrapper(final @NotNull DocumentStore store) {
+        this.store = store;
+    }
+
+    @Override
+    public <T extends Document> T find(final Collection<T> collection, final 
String key) {
+        return store.find(collection, key);
+    }
+
+    @Override
+    public <T extends Document> T find(final Collection<T> collection, final 
String key,
+                                       final int maxCacheAge) {
+        return store.find(collection, key, maxCacheAge);
+    }
+
+    @NotNull
+    @Override
+    public <T extends Document> List<T> query(final Collection<T> collection, 
final String fromKey,
+                                              final String toKey, final int 
limit) {
+        return store.query(collection, fromKey, toKey, limit);
+    }
+
+    @Override
+    @NotNull
+    public <T extends Document> List<T> query(final Collection<T> collection, 
final String fromKey,
+                                              final String toKey, final String 
indexedProperty,
+                                              final long startValue, final int 
limit) {
+        return store.query(collection, fromKey, toKey, indexedProperty, 
startValue, limit);
+    }
+
+    @Override
+    public <T extends Document> void remove(Collection<T> collection, String 
key) {
+        performThrottling();
+        store.remove(collection, key);
+    }
+
+    @Override
+    public <T extends Document> void remove(Collection<T> collection, 
List<String> keys) {
+        performThrottling();
+        store.remove(collection, keys);
+    }
+
+    @Override
+    public <T extends Document> int remove(final Collection<T> collection, 
final Map<String, Long> toRemove) {
+        performThrottling();
+        return store.remove(collection, toRemove);
+    }
+
+    @Override
+    public <T extends Document> int remove(final Collection<T> collection, 
final String indexedProperty,
+                                           final long startValue, final long 
endValue) throws DocumentStoreException {
+        performThrottling();
+        return store.remove(collection, indexedProperty, startValue, endValue);
+    }
+
+    @Override
+    public <T extends Document> boolean create(final Collection<T> collection, 
final List<UpdateOp> updateOps) {
+        performThrottling();
+        return store.create(collection, updateOps);
+    }
+
+    @Override
+    public <T extends Document> T createOrUpdate(final Collection<T> 
collection, final UpdateOp update) {
+        performThrottling();
+        return store.createOrUpdate(collection, update);
+    }
+
+    @Override
+    public <T extends Document> List<T> createOrUpdate(final Collection<T> 
collection, final List<UpdateOp> updateOps) {
+        performThrottling();
+        return store.createOrUpdate(collection, updateOps);
+    }
+
+    @Override
+    public <T extends Document> T findAndUpdate(final Collection<T> 
collection, final UpdateOp update) {
+        performThrottling();
+        return store.findAndUpdate(collection, update);
+    }
+
+    @Override
+    public CacheInvalidationStats invalidateCache() {
+        return store.invalidateCache();
+    }
+    
+    @Override
+    public CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+        return store.invalidateCache(keys);
+    }
+
+    @Override
+    public <T extends Document> void invalidateCache(Collection<T> collection, 
String key) {
+        store.invalidateCache(collection, key);
+    }
+
+    @Override
+    public void dispose() {
+        store.dispose();
+    }
+
+    @Override
+    public <T extends Document> T getIfCached(final Collection<T> collection, 
final String key) {
+        return store.getIfCached(collection, key);
+    }
+
+    @Override
+    public void setReadWriteMode(String readWriteMode) {
+        store.setReadWriteMode(readWriteMode);
+    }
+
+    @Override
+    public Iterable<CacheStats> getCacheStats() {
+        return store.getCacheStats();
+    }
+
+    @Override
+    public Map<String, String> getMetadata() {
+        return store.getMetadata();
+    }
+
+    @NotNull
+    @Override
+    public Map<String, String> getStats() {
+        return store.getStats();
+    }
+
+    @Override
+    public long determineServerTimeDifferenceMillis() {
+        return store.determineServerTimeDifferenceMillis();
+    }
+
+    /**
+     * Return the size limit for node name based on the document store 
implementation
+     *
+     * @return node name size limit
+     */
+    @Override
+    public int getNodeNameLimit() {
+        return store.getNodeNameLimit();
+    }
+
+    /**
+     * Return the @{@link ThrottlingMetrics} for the underlying document store
+     *
+     * @return throttling metric for document store
+     */
+    @Override
+    public ThrottlingMetrics throttlingMetrics() {
+        return store.throttlingMetrics();
+    }
+
+    // helper methods
+
+    private void performThrottling() {
+
+        final ThrottlingMetrics metrics = throttlingMetrics();
+        long throttleTime = getThrottleTime(metrics);
+
+        if (throttleTime == 0) {
+            return; // no throttling
+        }
+
+        try {
+            LOG.info("Throttling the system for {} ms", throttleTime);
+            sleep(throttleTime);
+        } catch (InterruptedException e) {
+            // swallow the exception and log it
+            LOG.error("Error while throttling", e);
+        }
+    }
+
+     @VisibleForTesting
+     long getThrottleTime(final ThrottlingMetrics metrics) {
+
+        long throttleTime = metrics.throttlingTime();
+        final double threshold = metrics.threshold();
+        final double currValue = metrics.currValue();
+
+        if (DoubleMath.fuzzyCompare(currValue,threshold/8,  0.001) <= 0) {
+            throttleTime = throttleTime * 8;
+        } else if (DoubleMath.fuzzyCompare(currValue,threshold/4, 0.001) <= 0) 
{
+            throttleTime = throttleTime * 4;
+        } else if (DoubleMath.fuzzyCompare(currValue, threshold/2, 0.001) <= 
0) {
+            throttleTime = throttleTime * 2;
+        } else if (DoubleMath.fuzzyCompare(currValue, threshold,0.001) <= 0) {
+            throttleTime = metrics.throttlingTime();

Review Comment:
   I don't understand this logic. Why is throttleTime only multiplied around 
certain thresholds and not when the value is between thresholds?



##########
oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoThrottlingMetricsTest.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo;
+
+import com.google.common.util.concurrent.AtomicDouble;
+import org.junit.Test;
+
+import static java.lang.Integer.MAX_VALUE;
+import static 
org.apache.jackrabbit.oak.plugins.document.mongo.MongoThrottlingMetrics.of;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class MongoThrottlingMetricsTest {
+
+    @Test
+    public void testCurrValue() {
+        MongoThrottlingMetrics mongoThrottlingMetrics = of(new 
AtomicDouble(MAX_VALUE), 10, 100);
+        assertEquals(MAX_VALUE, mongoThrottlingMetrics.currValue(), 0.001);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNonNullOplog() {

Review Comment:
   This test currently fails. See also Jenkins failure in this PR.



##########
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java:
##########
@@ -137,6 +137,7 @@ public class DocumentNodeStoreService {
     static final int DEFAULT_BLOB_CACHE_SIZE = 16;
     static final String DEFAULT_DB = "oak";
     static final boolean DEFAULT_SO_KEEP_ALIVE = true;
+    static final boolean DEFAULT_THROTTLE_DOCUMENT_STORE = true;

Review Comment:
   Since this is a new feature, I would rather disable it by default.



##########
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java:
##########
@@ -157,6 +158,16 @@ public synchronized int getNodeNameLimit() {
         return store.getNodeNameLimit();
     }
 
+    /**
+     * Return the @{@link ThrottlingMetrics} for the underlying document store
+     *
+     * @return throttling metric for document store
+     */
+    @Override
+    public ThrottlingMetrics throttlingMetrics() {

Review Comment:
   Method should be synchronized.



##########
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreThrottling.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document;
+
+/**
+ * Marker interface to check whether underlying document store needs to 
throttle the incoming requests or not
+ *
+ * Concrete implementation needs to provide the logic for implementing the 
throttling based on their internal usages/statistics
+ */
+public interface DocumentStoreThrottling {

Review Comment:
   This interface is not used as a marker. The only implementation is 
MongoDocumentStoreThrottling. I don't see the value of this interface and would 
rather remove it.



##########
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreThrottling.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document;
+
+/**
+ * Marker interface to check whether underlying document store needs to 
throttle the incoming requests or not
+ *
+ * Concrete implementation needs to provide the logic for implementing the 
throttling based on their internal usages/statistics
+ */
+public interface DocumentStoreThrottling {
+
+    /**
+     * Check whether the underlying document store need to throttle the system 
or not
+     */
+    void updateMetrics();

Review Comment:
   The name of this method and JavaDoc are misleading. The actual 
implementation schedules a task that runs periodically, whereas the method 
suggests a one-time update of metrics.



##########
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStoreThrottling.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.AtomicDouble;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStoreThrottling;
+import org.bson.BsonTimestamp;
+import org.bson.Document;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+public class MongoDocumentStoreThrottling implements DocumentStoreThrottling {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoDocumentStoreThrottling.class);
+    static final String TS_TIME = "ts";
+    private static final String NATURAL = "$natural";
+    private static final String MAX_SIZE = "maxSize";
+    private static final String OPLOG_RS = "oplog.rs";
+    public static final String SIZE = "size";
+    private final ScheduledExecutorService throttlingExecutor;
+    private final AtomicDouble oplogWindow;
+    private final MongoDatabase localDb;
+
+    public MongoDocumentStoreThrottling(final @NotNull MongoDatabase localDb, 
final @NotNull MongoThrottlingMetrics mongoThrottlingMetrics) {
+        this.throttlingExecutor = Executors.newSingleThreadScheduledExecutor();
+        this.oplogWindow = mongoThrottlingMetrics.oplogWindow;

Review Comment:
   Wouldn't it be nicer if the constructor just gets the `oplogWindow` 
`AtomicDouble`? It doesn't need anything else from `MongoThrottlingMetrics`. 



##########
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java:
##########
@@ -295,15 +322,38 @@ public MongoDocumentStore(MongoClient connection, 
MongoDatabase db,
         this.nodeLocks = new StripedNodeDocumentLocks();
         this.nodesCache = builder.buildNodeDocumentCache(this, nodeLocks);
 
+        // if throttling is enabled
+        if (throttleDocumentStore) {
+            this.localDb = connection.getDatabase("local");

Review Comment:
   I think `localDb` can be turned into a local variable.



##########
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStoreThrottling.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.AtomicDouble;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStoreThrottling;
+import org.bson.BsonTimestamp;
+import org.bson.Document;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+public class MongoDocumentStoreThrottling implements DocumentStoreThrottling {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoDocumentStoreThrottling.class);
+    static final String TS_TIME = "ts";
+    private static final String NATURAL = "$natural";
+    private static final String MAX_SIZE = "maxSize";
+    private static final String OPLOG_RS = "oplog.rs";
+    public static final String SIZE = "size";
+    private final ScheduledExecutorService throttlingExecutor;
+    private final AtomicDouble oplogWindow;
+    private final MongoDatabase localDb;
+
+    public MongoDocumentStoreThrottling(final @NotNull MongoDatabase localDb, 
final @NotNull MongoThrottlingMetrics mongoThrottlingMetrics) {
+        this.throttlingExecutor = Executors.newSingleThreadScheduledExecutor();
+        this.oplogWindow = mongoThrottlingMetrics.oplogWindow;
+        this.localDb = localDb;
+    }
+
+    public void updateMetrics() {
+        throttlingExecutor.scheduleAtFixedRate(() -> {
+            Document document = localDb.runCommand(new Document("collStats", 
OPLOG_RS));
+            if (!document.containsKey(MAX_SIZE) || 
!document.containsKey(SIZE)) {
+                LOG.warn("Could not get stats for local.{}  collection. 
collstats returned: {}.", OPLOG_RS, document);
+            } else {
+                int maxSize = document.getInteger(MAX_SIZE);
+                double maxSizeGb = (double) maxSize / (1024 * 1024 * 1024);
+                int usedSize = document.getInteger(SIZE);
+                double usedSizeGb = Math.ceil(((double) usedSize / (1024 * 
1024 * 1024)) * 1000000) / 1000000;
+                MongoCollection<Document> localDbCollection = 
localDb.getCollection(OPLOG_RS);
+                Document first = localDbCollection.find().sort(new 
Document(NATURAL, 1)).limit(1).first();
+                Document last = localDbCollection.find().sort(new 
Document(NATURAL, -1)).limit(1).first();
+
+                if (Objects.isNull(first) || Objects.isNull(last)) {
+                    LOG.warn("Objects not found in local.oplog.rs -- is this a 
new and empty db instance?");
+                } else {
+                    if (!first.containsKey(TS_TIME) || 
!last.containsKey(TS_TIME)) {
+                        LOG.warn("ts element not found in oplog objects");
+                    } else {
+                        oplogWindow.set(updateOplogWindow(maxSizeGb, 
usedSizeGb, first, last));
+                    }
+                }
+            }
+        }, 10, 30, SECONDS);
+    }
+
+    // helper methods
+    @VisibleForTesting
+    double updateOplogWindow(final double maxSize, final double usedSize, 
final @NotNull Document first,
+                                   final @NotNull Document last) {
+        final BsonTimestamp startTime = first.get(TS_TIME, 
BsonTimestamp.class);
+        final BsonTimestamp lastTime = last.get(TS_TIME, BsonTimestamp.class);
+        long timeDiffSec = Math.abs(lastTime.getTime() - startTime.getTime());
+        double timeDiffHr = Math.ceil(((double)timeDiffSec/(60*60)) * 
100000)/100000;
+        double currentOplogHourRate = usedSize /timeDiffHr;

Review Comment:
   I think this may result in a division by zero when when the oplog just has a 
single entry. Add a test and fix?



##########
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStoreThrottling.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.AtomicDouble;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStoreThrottling;
+import org.bson.BsonTimestamp;
+import org.bson.Document;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+public class MongoDocumentStoreThrottling implements DocumentStoreThrottling {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoDocumentStoreThrottling.class);
+    static final String TS_TIME = "ts";
+    private static final String NATURAL = "$natural";
+    private static final String MAX_SIZE = "maxSize";
+    private static final String OPLOG_RS = "oplog.rs";
+    public static final String SIZE = "size";
+    private final ScheduledExecutorService throttlingExecutor;
+    private final AtomicDouble oplogWindow;
+    private final MongoDatabase localDb;
+
+    public MongoDocumentStoreThrottling(final @NotNull MongoDatabase localDb, 
final @NotNull MongoThrottlingMetrics mongoThrottlingMetrics) {
+        this.throttlingExecutor = Executors.newSingleThreadScheduledExecutor();
+        this.oplogWindow = mongoThrottlingMetrics.oplogWindow;
+        this.localDb = localDb;
+    }
+
+    public void updateMetrics() {
+        throttlingExecutor.scheduleAtFixedRate(() -> {
+            Document document = localDb.runCommand(new Document("collStats", 
OPLOG_RS));
+            if (!document.containsKey(MAX_SIZE) || 
!document.containsKey(SIZE)) {
+                LOG.warn("Could not get stats for local.{}  collection. 
collstats returned: {}.", OPLOG_RS, document);
+            } else {
+                int maxSize = document.getInteger(MAX_SIZE);
+                double maxSizeGb = (double) maxSize / (1024 * 1024 * 1024);
+                int usedSize = document.getInteger(SIZE);
+                double usedSizeGb = Math.ceil(((double) usedSize / (1024 * 
1024 * 1024)) * 1000000) / 1000000;
+                MongoCollection<Document> localDbCollection = 
localDb.getCollection(OPLOG_RS);
+                Document first = localDbCollection.find().sort(new 
Document(NATURAL, 1)).limit(1).first();
+                Document last = localDbCollection.find().sort(new 
Document(NATURAL, -1)).limit(1).first();
+
+                if (Objects.isNull(first) || Objects.isNull(last)) {
+                    LOG.warn("Objects not found in local.oplog.rs -- is this a 
new and empty db instance?");
+                } else {
+                    if (!first.containsKey(TS_TIME) || 
!last.containsKey(TS_TIME)) {
+                        LOG.warn("ts element not found in oplog objects");
+                    } else {
+                        oplogWindow.set(updateOplogWindow(maxSizeGb, 
usedSizeGb, first, last));
+                    }
+                }
+            }
+        }, 10, 30, SECONDS);
+    }
+
+    // helper methods
+    @VisibleForTesting
+    double updateOplogWindow(final double maxSize, final double usedSize, 
final @NotNull Document first,
+                                   final @NotNull Document last) {
+        final BsonTimestamp startTime = first.get(TS_TIME, 
BsonTimestamp.class);
+        final BsonTimestamp lastTime = last.get(TS_TIME, BsonTimestamp.class);
+        long timeDiffSec = Math.abs(lastTime.getTime() - startTime.getTime());
+        double timeDiffHr = Math.ceil(((double)timeDiffSec/(60*60)) * 
100000)/100000;
+        double currentOplogHourRate = usedSize /timeDiffHr;
+        double timeLeft = maxSize /currentOplogHourRate;

Review Comment:
   Similarly when `usedSize` is zero, then `currentOplogHourRate` is zero as 
well. It's probably an edge case never called that way, because it implies the 
oplog is empty and then `updateMetrics()` does not call `updateOplogWindow()`. 
I think it would still be good to cover that case in `updateOplogWindow()`.



##########
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStoreThrottling.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.AtomicDouble;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStoreThrottling;
+import org.bson.BsonTimestamp;
+import org.bson.Document;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+public class MongoDocumentStoreThrottling implements DocumentStoreThrottling {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoDocumentStoreThrottling.class);
+    static final String TS_TIME = "ts";
+    private static final String NATURAL = "$natural";
+    private static final String MAX_SIZE = "maxSize";
+    private static final String OPLOG_RS = "oplog.rs";
+    public static final String SIZE = "size";
+    private final ScheduledExecutorService throttlingExecutor;

Review Comment:
   The executor service should be shutdown when not used anymore. I think this 
means `MongoDocumentStoreThrottling` should implement `Closeable` and shutdown 
the executor in `close()`.



##########
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoThrottlingMetrics.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo;
+
+import com.google.common.util.concurrent.AtomicDouble;
+import org.apache.jackrabbit.oak.plugins.document.ThrottlingMetrics;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Java Object to represent Mongo throttling metrics
+ */
+public class MongoThrottlingMetrics implements ThrottlingMetrics {

Review Comment:
   There is nothing MongoDB specific in this class. Maybe this is just a 
default implementation that can be moved to the parent package?



##########
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java:
##########
@@ -443,6 +444,23 @@ public int getNodeNameLimit() {
         }
     }
 
+    /**
+     * Return the @{@link ThrottlingMetrics} for the underlying document store
+     *
+     * @return throttling metric for document store
+     */
+    @Override
+    public ThrottlingMetrics throttlingMetrics() {
+        try {
+            long start = now();
+            final ThrottlingMetrics result = base.throttlingMetrics();
+            updateAndLogTimes("getNodeNameLimit", start, 0, 0);

Review Comment:
   Wrong operation name.
   ```suggestion
               updateAndLogTimes("throttlingMetrics", start, 0, 0);
   ```



##########
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java:
##########
@@ -295,15 +322,38 @@ public MongoDocumentStore(MongoClient connection, 
MongoDatabase db,
         this.nodeLocks = new StripedNodeDocumentLocks();
         this.nodesCache = builder.buildNodeDocumentCache(this, nodeLocks);
 
+        // if throttling is enabled
+        if (throttleDocumentStore) {
+            this.localDb = connection.getDatabase("local");
+            final MongoIterable<String> collectionNames = 
localDb.listCollectionNames();
+            String ol = null;
+            for (String e: collectionNames) {
+                if (Objects.equals(e, OPLOG_RS)) {
+                    ol = OPLOG_RS;
+                    break;
+                }
+            }

Review Comment:
   Could use Guava instead.
   ```suggestion
               String ol = Iterables.tryFind(localDb.listCollectionNames(), 
                       s -> Objects.equals(s, OPLOG_RS)).orNull();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to