This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new d404339  HDDS-5456. Inject a Clock into Replication Manager to allow 
timeouts to be tested (#2425)
d404339 is described below

commit d4043397d566c4a1b49ba06264b2b94b127562ff
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Sun Jul 18 22:31:59 2021 +0100

    HDDS-5456. Inject a Clock into Replication Manager to allow timeouts to be 
tested (#2425)
---
 .../apache/hadoop/ozone/common/MonotonicClock.java | 81 ++++++++++++++++++++++
 .../hdds/scm/container/ReplicationManager.java     | 19 ++---
 .../hdds/scm/server/StorageContainerManager.java   |  5 +-
 .../hdds/scm/container/TestReplicationManager.java | 42 +++++++----
 .../main/java/org/apache/ozone/test/TestClock.java | 76 ++++++++++++++++++++
 5 files changed, 199 insertions(+), 24 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/MonotonicClock.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/MonotonicClock.java
new file mode 100644
index 0000000..62a323d
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/MonotonicClock.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.common;
+
+import org.apache.hadoop.util.Time;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+
+/**
+ * This is a class which implements the Clock interface. It is a copy of the
+ * Java Clock.SystemClock only it uses MonotonicNow (nanotime) rather than
+ * System.currentTimeMills.
+ */
+
+public final class MonotonicClock extends Clock {
+
+  private final ZoneId zoneId;
+
+  public MonotonicClock(ZoneId zone) {
+    this.zoneId = zone;
+  }
+
+  @Override
+  public ZoneId getZone() {
+    return zoneId;
+  }
+
+  @Override
+  public Clock withZone(ZoneId zone) {
+    if (zone.equals(this.zoneId)) {  // intentional NPE
+      return this;
+    }
+    return new MonotonicClock(zone);
+  }
+
+  @Override
+  public long millis() {
+    return Time.monotonicNow();
+  }
+
+  @Override
+  public Instant instant() {
+    return Instant.ofEpochMilli(millis());
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof MonotonicClock) {
+      return zoneId.equals(((MonotonicClock) obj).zoneId);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return zoneId.hashCode() + 1;
+  }
+
+  @Override
+  public String toString() {
+    return "MonotonicClock[" + zoneId + "]";
+  }
+
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
index d1218a8..2903653 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds.scm.container;
 
 import java.io.IOException;
+import java.time.Clock;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -72,7 +73,6 @@ import 
org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.ExitUtil;
-import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.GeneratedMessage;
@@ -166,6 +166,7 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
   private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
   private final long waitTimeInMillis;
   private long lastTimeToBeReadyInMillis = 0;
+  private final Clock clock;
 
   /**
    * Constructs ReplicationManager instance with the given configuration.
@@ -182,7 +183,8 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
                             final EventPublisher eventPublisher,
                             final SCMContext scmContext,
                             final SCMServiceManager serviceManager,
-                            final NodeManager nodeManager) {
+                            final NodeManager nodeManager,
+                            final java.time.Clock clock) {
     this.containerManager = containerManager;
     this.containerPlacement = containerPlacement;
     this.eventPublisher = eventPublisher;
@@ -193,6 +195,7 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
     this.inflightReplication = new ConcurrentHashMap<>();
     this.inflightDeletion = new ConcurrentHashMap<>();
     this.minHealthyForMaintenance = rmConf.getMaintenanceReplicaMinimum();
+    this.clock = clock;
 
     this.waitTimeInMillis = conf.getTimeDuration(
         HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
@@ -276,13 +279,13 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
   private synchronized void run() {
     try {
       while (running) {
-        final long start = Time.monotonicNow();
+        final long start = clock.millis();
         final List<ContainerInfo> containers =
             containerManager.getContainers();
         containers.forEach(this::processContainer);
 
         LOG.info("Replication Monitor Thread took {} milliseconds for" +
-                " processing {} containers.", Time.monotonicNow() - start,
+                " processing {} containers.", clock.millis() - start,
             containers.size());
 
         wait(rmConf.getInterval());
@@ -446,7 +449,7 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
       final Map<ContainerID, List<InflightAction>> inflightActions,
       final Predicate<InflightAction> filter) {
     final ContainerID id = container.containerID();
-    final long deadline = Time.monotonicNow() - rmConf.getEventTimeout();
+    final long deadline = clock.millis() - rmConf.getEventTimeout();
     if (inflightActions.containsKey(id)) {
       final List<InflightAction> actions = inflightActions.get(id);
 
@@ -1087,7 +1090,7 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
     final CommandForDatanode<T> datanodeCommand =
         new CommandForDatanode<>(datanode.getUuid(), command);
     eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
-    tracker.accept(new InflightAction(datanode, Time.monotonicNow()));
+    tracker.accept(new InflightAction(datanode, clock.millis()));
   }
 
   /**
@@ -1279,7 +1282,7 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
         // transition from PAUSING to RUNNING
         if (serviceStatus != ServiceStatus.RUNNING) {
           LOG.info("Service {} transitions to RUNNING.", getServiceName());
-          lastTimeToBeReadyInMillis = Time.monotonicNow();
+          lastTimeToBeReadyInMillis = clock.millis();
           serviceStatus = ServiceStatus.RUNNING;
         }
       } else {
@@ -1296,7 +1299,7 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
     try {
       // If safe mode is off, then this SCMService starts to run with a delay.
       return serviceStatus == ServiceStatus.RUNNING &&
-          Time.monotonicNow() - lastTimeToBeReadyInMillis >= waitTimeInMillis;
+          clock.millis() - lastTimeToBeReadyInMillis >= waitTimeInMillis;
     } finally {
       serviceLock.unlock();
     }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index c739757..be590d6 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -126,6 +126,7 @@ import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
+import org.apache.hadoop.ozone.common.MonotonicClock;
 import org.apache.hadoop.ozone.common.Storage.StorageState;
 import org.apache.hadoop.ozone.lease.LeaseManager;
 import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
@@ -147,6 +148,7 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
+import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -600,7 +602,8 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
           eventQueue,
           scmContext,
           serviceManager,
-          scmNodeManager);
+          scmNodeManager,
+          new MonotonicClock(ZoneOffset.UTC));
     }
     if(configurator.getScmSafeModeManager() != null) {
       scmSafeModeManager = configurator.getScmSafeModeManager();
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
index 5401d0a..38b06e1 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.TestClock;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -50,6 +51,8 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -89,6 +92,7 @@ public class TestReplicationManager {
   private ContainerManagerV2 containerManager;
   private OzoneConfiguration conf;
   private SCMNodeManager scmNodeManager;
+  private TestClock clock;
 
   @Before
   public void setup()
@@ -150,19 +154,8 @@ public class TestReplicationManager {
         Mockito.any(DatanodeDetails.class)))
         .thenReturn(NodeStatus.inServiceHealthy());
 
-    SCMServiceManager serviceManager = new SCMServiceManager();
-
-    replicationManager = new ReplicationManager(
-        conf,
-        containerManager,
-        containerPlacementPolicy,
-        eventQueue,
-        SCMContext.emptyContext(),
-        serviceManager,
-        nodeManager);
-
-    serviceManager.notifyStatusChanged();
-    Thread.sleep(100L);
+    clock = new TestClock(Instant.now(), ZoneId.of("UTC"));
+    createReplicationManager(new ReplicationManagerConfiguration());
   }
 
   private void createReplicationManager(ReplicationManagerConfiguration rmConf)
@@ -174,7 +167,6 @@ public class TestReplicationManager {
     config.setFromObject(rmConf);
 
     SCMServiceManager serviceManager = new SCMServiceManager();
-
     replicationManager = new ReplicationManager(
         config,
         containerManager,
@@ -182,7 +174,8 @@ public class TestReplicationManager {
         eventQueue,
         SCMContext.emptyContext(),
         serviceManager,
-        nodeManager);
+        nodeManager,
+        clock);
 
     serviceManager.notifyStatusChanged();
     Thread.sleep(100L);
@@ -1093,6 +1086,25 @@ public class TestReplicationManager {
     assertReplicaScheduled(0);
   }
 
+  @Test
+  public void testReplicateCommandTimeout() throws
+      SCMException, InterruptedException {
+    long timeout = new ReplicationManagerConfiguration().getEventTimeout();
+
+    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
+    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+    assertReplicaScheduled(1);
+
+    // Already a pending replica, so nothing scheduled
+    assertReplicaScheduled(0);
+
+    // Advance the clock past the timeout, and there should be a replica
+    // scheduled
+    clock.fastForward(timeout + 1000);
+    assertReplicaScheduled(1);
+  }
+
   private ContainerInfo createContainer(LifeCycleState containerState)
       throws SCMException {
     final ContainerInfo container = getContainer(containerState);
diff --git 
a/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/TestClock.java 
b/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/TestClock.java
new file mode 100644
index 0000000..8cf0c7e
--- /dev/null
+++ b/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/TestClock.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ozone.test;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.temporal.TemporalAmount;
+
+/**
+ * An implementation of Clock which allows the time to be set to an instant and
+ * moved forward and back. Intended for use only in tests.
+ */
+
+public class TestClock extends Clock {
+
+  private Instant instant;
+  private final ZoneId zoneId;
+
+  public TestClock(Instant instant, ZoneId zone) {
+    this.instant = instant;
+    this.zoneId = zone;
+  }
+
+  @Override
+  public ZoneId getZone() {
+    return zoneId;
+  }
+
+  @Override
+  public Clock withZone(ZoneId zone) {
+    return new TestClock(Instant.now(), zone);
+  }
+
+  @Override
+  public Instant instant() {
+    return instant;
+  }
+
+  public void fastForward(long millis) {
+    set(instant().plusMillis(millis));
+  }
+
+  public void fastForward(TemporalAmount temporalAmount) {
+    set(instant().plus(temporalAmount));
+  }
+
+  public void rewind(long millis) {
+    set(instant().minusMillis(millis));
+  }
+
+  public void rewind(TemporalAmount temporalAmount) {
+    set(instant().minus(temporalAmount));
+  }
+
+  public void set(Instant newInstant) {
+    this.instant = newInstant;
+  }
+
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to