This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new d404339 HDDS-5456. Inject a Clock into Replication Manager to allow
timeouts to be tested (#2425)
d404339 is described below
commit d4043397d566c4a1b49ba06264b2b94b127562ff
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Sun Jul 18 22:31:59 2021 +0100
HDDS-5456. Inject a Clock into Replication Manager to allow timeouts to be
tested (#2425)
---
.../apache/hadoop/ozone/common/MonotonicClock.java | 81 ++++++++++++++++++++++
.../hdds/scm/container/ReplicationManager.java | 19 ++---
.../hdds/scm/server/StorageContainerManager.java | 5 +-
.../hdds/scm/container/TestReplicationManager.java | 42 +++++++----
.../main/java/org/apache/ozone/test/TestClock.java | 76 ++++++++++++++++++++
5 files changed, 199 insertions(+), 24 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/MonotonicClock.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/MonotonicClock.java
new file mode 100644
index 0000000..62a323d
--- /dev/null
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/MonotonicClock.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.common;
+
+import org.apache.hadoop.util.Time;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+
+/**
+ * This is a class which implements the Clock interface. It is a copy of the
+ * Java Clock.SystemClock only it uses MonotonicNow (nanotime) rather than
+ * System.currentTimeMills.
+ */
+
+public final class MonotonicClock extends Clock {
+
+ private final ZoneId zoneId;
+
+ public MonotonicClock(ZoneId zone) {
+ this.zoneId = zone;
+ }
+
+ @Override
+ public ZoneId getZone() {
+ return zoneId;
+ }
+
+ @Override
+ public Clock withZone(ZoneId zone) {
+ if (zone.equals(this.zoneId)) { // intentional NPE
+ return this;
+ }
+ return new MonotonicClock(zone);
+ }
+
+ @Override
+ public long millis() {
+ return Time.monotonicNow();
+ }
+
+ @Override
+ public Instant instant() {
+ return Instant.ofEpochMilli(millis());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof MonotonicClock) {
+ return zoneId.equals(((MonotonicClock) obj).zoneId);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return zoneId.hashCode() + 1;
+ }
+
+ @Override
+ public String toString() {
+ return "MonotonicClock[" + zoneId + "]";
+ }
+
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
index d1218a8..2903653 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.scm.container;
import java.io.IOException;
+import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
@@ -72,7 +73,6 @@ import
org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.ExitUtil;
-import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.GeneratedMessage;
@@ -166,6 +166,7 @@ public class ReplicationManager implements MetricsSource,
SCMService {
private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
private final long waitTimeInMillis;
private long lastTimeToBeReadyInMillis = 0;
+ private final Clock clock;
/**
* Constructs ReplicationManager instance with the given configuration.
@@ -182,7 +183,8 @@ public class ReplicationManager implements MetricsSource,
SCMService {
final EventPublisher eventPublisher,
final SCMContext scmContext,
final SCMServiceManager serviceManager,
- final NodeManager nodeManager) {
+ final NodeManager nodeManager,
+ final java.time.Clock clock) {
this.containerManager = containerManager;
this.containerPlacement = containerPlacement;
this.eventPublisher = eventPublisher;
@@ -193,6 +195,7 @@ public class ReplicationManager implements MetricsSource,
SCMService {
this.inflightReplication = new ConcurrentHashMap<>();
this.inflightDeletion = new ConcurrentHashMap<>();
this.minHealthyForMaintenance = rmConf.getMaintenanceReplicaMinimum();
+ this.clock = clock;
this.waitTimeInMillis = conf.getTimeDuration(
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
@@ -276,13 +279,13 @@ public class ReplicationManager implements MetricsSource,
SCMService {
private synchronized void run() {
try {
while (running) {
- final long start = Time.monotonicNow();
+ final long start = clock.millis();
final List<ContainerInfo> containers =
containerManager.getContainers();
containers.forEach(this::processContainer);
LOG.info("Replication Monitor Thread took {} milliseconds for" +
- " processing {} containers.", Time.monotonicNow() - start,
+ " processing {} containers.", clock.millis() - start,
containers.size());
wait(rmConf.getInterval());
@@ -446,7 +449,7 @@ public class ReplicationManager implements MetricsSource,
SCMService {
final Map<ContainerID, List<InflightAction>> inflightActions,
final Predicate<InflightAction> filter) {
final ContainerID id = container.containerID();
- final long deadline = Time.monotonicNow() - rmConf.getEventTimeout();
+ final long deadline = clock.millis() - rmConf.getEventTimeout();
if (inflightActions.containsKey(id)) {
final List<InflightAction> actions = inflightActions.get(id);
@@ -1087,7 +1090,7 @@ public class ReplicationManager implements MetricsSource,
SCMService {
final CommandForDatanode<T> datanodeCommand =
new CommandForDatanode<>(datanode.getUuid(), command);
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
- tracker.accept(new InflightAction(datanode, Time.monotonicNow()));
+ tracker.accept(new InflightAction(datanode, clock.millis()));
}
/**
@@ -1279,7 +1282,7 @@ public class ReplicationManager implements MetricsSource,
SCMService {
// transition from PAUSING to RUNNING
if (serviceStatus != ServiceStatus.RUNNING) {
LOG.info("Service {} transitions to RUNNING.", getServiceName());
- lastTimeToBeReadyInMillis = Time.monotonicNow();
+ lastTimeToBeReadyInMillis = clock.millis();
serviceStatus = ServiceStatus.RUNNING;
}
} else {
@@ -1296,7 +1299,7 @@ public class ReplicationManager implements MetricsSource,
SCMService {
try {
// If safe mode is off, then this SCMService starts to run with a delay.
return serviceStatus == ServiceStatus.RUNNING &&
- Time.monotonicNow() - lastTimeToBeReadyInMillis >= waitTimeInMillis;
+ clock.millis() - lastTimeToBeReadyInMillis >= waitTimeInMillis;
} finally {
serviceLock.unlock();
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index c739757..be590d6 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -126,6 +126,7 @@ import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
+import org.apache.hadoop.ozone.common.MonotonicClock;
import org.apache.hadoop.ozone.common.Storage.StorageState;
import org.apache.hadoop.ozone.lease.LeaseManager;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
@@ -147,6 +148,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
+import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -600,7 +602,8 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
eventQueue,
scmContext,
serviceManager,
- scmNodeManager);
+ scmNodeManager,
+ new MonotonicClock(ZoneOffset.UTC));
}
if(configurator.getScmSafeModeManager() != null) {
scmSafeModeManager = configurator.getScmSafeModeManager();
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
index 5401d0a..38b06e1 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.TestClock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -50,6 +51,8 @@ import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -89,6 +92,7 @@ public class TestReplicationManager {
private ContainerManagerV2 containerManager;
private OzoneConfiguration conf;
private SCMNodeManager scmNodeManager;
+ private TestClock clock;
@Before
public void setup()
@@ -150,19 +154,8 @@ public class TestReplicationManager {
Mockito.any(DatanodeDetails.class)))
.thenReturn(NodeStatus.inServiceHealthy());
- SCMServiceManager serviceManager = new SCMServiceManager();
-
- replicationManager = new ReplicationManager(
- conf,
- containerManager,
- containerPlacementPolicy,
- eventQueue,
- SCMContext.emptyContext(),
- serviceManager,
- nodeManager);
-
- serviceManager.notifyStatusChanged();
- Thread.sleep(100L);
+ clock = new TestClock(Instant.now(), ZoneId.of("UTC"));
+ createReplicationManager(new ReplicationManagerConfiguration());
}
private void createReplicationManager(ReplicationManagerConfiguration rmConf)
@@ -174,7 +167,6 @@ public class TestReplicationManager {
config.setFromObject(rmConf);
SCMServiceManager serviceManager = new SCMServiceManager();
-
replicationManager = new ReplicationManager(
config,
containerManager,
@@ -182,7 +174,8 @@ public class TestReplicationManager {
eventQueue,
SCMContext.emptyContext(),
serviceManager,
- nodeManager);
+ nodeManager,
+ clock);
serviceManager.notifyStatusChanged();
Thread.sleep(100L);
@@ -1093,6 +1086,25 @@ public class TestReplicationManager {
assertReplicaScheduled(0);
}
+ @Test
+ public void testReplicateCommandTimeout() throws
+ SCMException, InterruptedException {
+ long timeout = new ReplicationManagerConfiguration().getEventTimeout();
+
+ final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
+ addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+ addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+ assertReplicaScheduled(1);
+
+ // Already a pending replica, so nothing scheduled
+ assertReplicaScheduled(0);
+
+ // Advance the clock past the timeout, and there should be a replica
+ // scheduled
+ clock.fastForward(timeout + 1000);
+ assertReplicaScheduled(1);
+ }
+
private ContainerInfo createContainer(LifeCycleState containerState)
throws SCMException {
final ContainerInfo container = getContainer(containerState);
diff --git
a/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/TestClock.java
b/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/TestClock.java
new file mode 100644
index 0000000..8cf0c7e
--- /dev/null
+++ b/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/TestClock.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ozone.test;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.temporal.TemporalAmount;
+
+/**
+ * An implementation of Clock which allows the time to be set to an instant and
+ * moved forward and back. Intended for use only in tests.
+ */
+
+public class TestClock extends Clock {
+
+ private Instant instant;
+ private final ZoneId zoneId;
+
+ public TestClock(Instant instant, ZoneId zone) {
+ this.instant = instant;
+ this.zoneId = zone;
+ }
+
+ @Override
+ public ZoneId getZone() {
+ return zoneId;
+ }
+
+ @Override
+ public Clock withZone(ZoneId zone) {
+ return new TestClock(Instant.now(), zone);
+ }
+
+ @Override
+ public Instant instant() {
+ return instant;
+ }
+
+ public void fastForward(long millis) {
+ set(instant().plusMillis(millis));
+ }
+
+ public void fastForward(TemporalAmount temporalAmount) {
+ set(instant().plus(temporalAmount));
+ }
+
+ public void rewind(long millis) {
+ set(instant().minusMillis(millis));
+ }
+
+ public void rewind(TemporalAmount temporalAmount) {
+ set(instant().minus(temporalAmount));
+ }
+
+ public void set(Instant newInstant) {
+ this.instant = newInstant;
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]