This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new a348980 HDDS-6134. Move replication-specific config to
ReplicationServer (#2943)
a348980 is described below
commit a348980cec368b7cd0a9acb09640146a24f62a2c
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue Jan 11 15:50:43 2022 +0100
HDDS-6134. Move replication-specific config to ReplicationServer (#2943)
---
.../common/statemachine/DatanodeConfiguration.java | 32 ---------
.../common/statemachine/DatanodeStateMachine.java | 5 +-
.../container/replication/ReplicationServer.java | 56 ++++++++++++++--
.../replication/ReplicationSupervisor.java | 8 +++
.../statemachine/TestDatanodeConfiguration.java | 11 ----
.../replication/TestReplicationConfig.java | 75 ++++++++++++++++++++++
6 files changed, 137 insertions(+), 50 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
index 14ae4c9..be83d9b 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
@@ -37,8 +37,6 @@ public class DatanodeConfiguration {
private static final Logger LOG =
LoggerFactory.getLogger(DatanodeConfiguration.class);
- static final String REPLICATION_STREAMS_LIMIT_KEY =
- "hdds.datanode.replication.streams.limit";
static final String CONTAINER_DELETE_THREADS_MAX_KEY =
"hdds.datanode.container.delete.threads.max";
static final String PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY =
@@ -57,8 +55,6 @@ public class DatanodeConfiguration {
static final boolean CHUNK_DATA_VALIDATION_CHECK_DEFAULT = false;
- static final int REPLICATION_MAX_STREAMS_DEFAULT = 10;
-
static final long PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT = 60;
static final int FAILED_VOLUMES_TOLERATED_DEFAULT = -1;
@@ -72,19 +68,6 @@ public class DatanodeConfiguration {
Duration.ofMinutes(10).toMillis();
/**
- * The maximum number of replication commands a single datanode can execute
- * simultaneously.
- */
- @Config(key = "replication.streams.limit",
- type = ConfigType.INT,
- defaultValue = "10",
- tags = {DATANODE},
- description = "The maximum number of replication commands a single " +
- "datanode can execute simultaneously"
- )
- private int replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT;
-
- /**
* Number of threads per volume that Datanode will use for chunk read.
*/
@Config(key = "read.chunk.threads.per.volume",
@@ -264,13 +247,6 @@ public class DatanodeConfiguration {
@PostConstruct
public void validate() {
- if (replicationMaxStreams < 1) {
- LOG.warn(REPLICATION_STREAMS_LIMIT_KEY + " must be greater than zero " +
- "and was set to {}. Defaulting to {}",
- replicationMaxStreams, REPLICATION_MAX_STREAMS_DEFAULT);
- replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT;
- }
-
if (containerDeleteThreads < 1) {
LOG.warn(CONTAINER_DELETE_THREADS_MAX_KEY + " must be greater than zero"
+
" and was set to {}. Defaulting to {}",
@@ -316,18 +292,10 @@ public class DatanodeConfiguration {
}
}
- public void setReplicationMaxStreams(int replicationMaxStreams) {
- this.replicationMaxStreams = replicationMaxStreams;
- }
-
public void setContainerDeleteThreads(int containerDeleteThreads) {
this.containerDeleteThreads = containerDeleteThreads;
}
- public int getReplicationMaxStreams() {
- return replicationMaxStreams;
- }
-
public int getContainerDeleteThreads() {
return containerDeleteThreads;
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index ee5e87a..e1fc297 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -55,6 +55,7 @@ import
org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
import
org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator;
import org.apache.hadoop.ozone.container.replication.MeasuredReplicator;
+import
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
import
org.apache.hadoop.ozone.container.replication.ReplicationSupervisorMetrics;
import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
@@ -166,9 +167,11 @@ public class DatanodeStateMachine implements Closeable {
replicatorMetrics = new MeasuredReplicator(replicator);
+ ReplicationConfig replicationConfig =
+ conf.getObject(ReplicationConfig.class);
supervisor =
new ReplicationSupervisor(container.getContainerSet(), context,
- replicatorMetrics, dnConf.getReplicationMaxStreams());
+ replicatorMetrics, replicationConfig);
replicationSupervisorMetrics =
ReplicationSupervisorMetrics.create(supervisor);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
index dd5f4c4..bf8d6f1 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,7 +22,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
-import org.apache.hadoop.hdds.conf.ConfigTag;
+import org.apache.hadoop.hdds.conf.ConfigType;
+import org.apache.hadoop.hdds.conf.PostConstruct;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.tracing.GrpcServerInterceptor;
@@ -39,6 +40,9 @@ import
org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.hdds.conf.ConfigTag.DATANODE;
+import static org.apache.hadoop.hdds.conf.ConfigTag.MANAGEMENT;
+
/**
* Separated network server for server2server container replication.
*/
@@ -128,12 +132,33 @@ public class ReplicationServer {
/**
* Replication-related configuration.
*/
- @ConfigGroup(prefix = "hdds.datanode.replication")
+ @ConfigGroup(prefix = ReplicationConfig.PREFIX)
public static final class ReplicationConfig {
- @Config(key = "port", defaultValue = "9886", description = "Port used for"
- + " the server2server replication server", tags = {
- ConfigTag.MANAGEMENT})
+ public static final String PREFIX = "hdds.datanode.replication";
+ public static final String STREAMS_LIMIT_KEY = "streams.limit";
+
+ public static final String REPLICATION_STREAMS_LIMIT_KEY =
+ PREFIX + "." + STREAMS_LIMIT_KEY;
+
+ public static final int REPLICATION_MAX_STREAMS_DEFAULT = 10;
+
+ /**
+ * The maximum number of replication commands a single datanode can execute
+ * simultaneously.
+ */
+ @Config(key = STREAMS_LIMIT_KEY,
+ type = ConfigType.INT,
+ defaultValue = "10",
+ tags = {DATANODE},
+ description = "The maximum number of replication commands a single " +
+ "datanode can execute simultaneously"
+ )
+ private int replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT;
+
+ @Config(key = "port", defaultValue = "9886",
+ description = "Port used for the server2server replication server",
+ tags = {DATANODE, MANAGEMENT})
private int port;
public int getPort() {
@@ -144,6 +169,25 @@ public class ReplicationServer {
this.port = portParam;
return this;
}
+
+ public int getReplicationMaxStreams() {
+ return replicationMaxStreams;
+ }
+
+ public void setReplicationMaxStreams(int replicationMaxStreams) {
+ this.replicationMaxStreams = replicationMaxStreams;
+ }
+
+ @PostConstruct
+ public void validate() {
+ if (replicationMaxStreams < 1) {
+ LOG.warn(REPLICATION_STREAMS_LIMIT_KEY + " must be greater than zero "
+
+ "and was set to {}. Defaulting to {}",
+ replicationMaxStreams, REPLICATION_MAX_STREAMS_DEFAULT);
+ replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT;
+ }
+ }
+
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
index 05a4173..4cb826c 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
import com.google.common.annotations.VisibleForTesting;
@@ -73,6 +74,13 @@ public class ReplicationSupervisor {
public ReplicationSupervisor(
ContainerSet containerSet, StateContext context,
+ ContainerReplicator replicator, ReplicationConfig replicationConfig) {
+ this(containerSet, context, replicator,
+ replicationConfig.getReplicationMaxStreams());
+ }
+
+ public ReplicationSupervisor(
+ ContainerSet containerSet, StateContext context,
ContainerReplicator replicator, int poolSize) {
this(containerSet, context, replicator, new ThreadPoolExecutor(
poolSize, poolSize, 60, TimeUnit.SECONDS,
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java
index 1b42654..5f1b0a6 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java
@@ -28,8 +28,6 @@ import static
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConf
import static
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_MIN_GAP_DEFAULT;
import static
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_TIMEOUT_DEFAULT;
import static
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_TIMEOUT_KEY;
-import static
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.REPLICATION_MAX_STREAMS_DEFAULT;
-import static
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.REPLICATION_STREAMS_LIMIT_KEY;
import static
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY;
import static
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT;
import static
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.FAILED_DATA_VOLUMES_TOLERATED_KEY;
@@ -46,14 +44,12 @@ public class TestDatanodeConfiguration {
@Test
public void acceptsValidValues() {
// GIVEN
- int validReplicationLimit = 123;
int validDeleteThreads = 42;
long validDiskCheckIntervalMinutes = 60;
int validFailedVolumesTolerated = 10;
long validDiskCheckMinGap = 2;
long validDiskCheckTimeout = 1;
OzoneConfiguration conf = new OzoneConfiguration();
- conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, validReplicationLimit);
conf.setInt(CONTAINER_DELETE_THREADS_MAX_KEY, validDeleteThreads);
conf.setLong(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY,
validDiskCheckIntervalMinutes);
@@ -70,7 +66,6 @@ public class TestDatanodeConfiguration {
DatanodeConfiguration subject =
conf.getObject(DatanodeConfiguration.class);
// THEN
- assertEquals(validReplicationLimit, subject.getReplicationMaxStreams());
assertEquals(validDeleteThreads, subject.getContainerDeleteThreads());
assertEquals(validDiskCheckIntervalMinutes,
subject.getPeriodicDiskCheckIntervalMinutes());
@@ -87,14 +82,12 @@ public class TestDatanodeConfiguration {
@Test
public void overridesInvalidValues() {
// GIVEN
- int invalidReplicationLimit = -5;
int invalidDeleteThreads = 0;
long invalidDiskCheckIntervalMinutes = -1;
int invalidFailedVolumesTolerated = -2;
long invalidDiskCheckMinGap = -1;
long invalidDiskCheckTimeout = -1;
OzoneConfiguration conf = new OzoneConfiguration();
- conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, invalidReplicationLimit);
conf.setInt(CONTAINER_DELETE_THREADS_MAX_KEY, invalidDeleteThreads);
conf.setLong(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY,
invalidDiskCheckIntervalMinutes);
@@ -111,8 +104,6 @@ public class TestDatanodeConfiguration {
DatanodeConfiguration subject =
conf.getObject(DatanodeConfiguration.class);
// THEN
- assertEquals(REPLICATION_MAX_STREAMS_DEFAULT,
- subject.getReplicationMaxStreams());
assertEquals(CONTAINER_DELETE_THREADS_DEFAULT,
subject.getContainerDeleteThreads());
assertEquals(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT,
@@ -136,8 +127,6 @@ public class TestDatanodeConfiguration {
DatanodeConfiguration subject =
conf.getObject(DatanodeConfiguration.class);
// THEN
- assertEquals(REPLICATION_MAX_STREAMS_DEFAULT,
- subject.getReplicationMaxStreams());
assertEquals(CONTAINER_DELETE_THREADS_DEFAULT,
subject.getContainerDeleteThreads());
assertEquals(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT,
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationConfig.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationConfig.java
new file mode 100644
index 0000000..6ab32d6
--- /dev/null
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationConfig.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
+import org.junit.Test;
+
+import static
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig.REPLICATION_MAX_STREAMS_DEFAULT;
+import static
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig.REPLICATION_STREAMS_LIMIT_KEY;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link ReplicationConfig}.
+ */
+public class TestReplicationConfig {
+
+ @Test
+ public void acceptsValidValues() {
+ // GIVEN
+ int validReplicationLimit = 123;
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, validReplicationLimit);
+
+ // WHEN
+ ReplicationConfig subject = conf.getObject(ReplicationConfig.class);
+
+ // THEN
+ assertEquals(validReplicationLimit, subject.getReplicationMaxStreams());
+ }
+
+ @Test
+ public void overridesInvalidValues() {
+ // GIVEN
+ int invalidReplicationLimit = -5;
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, invalidReplicationLimit);
+
+ // WHEN
+ ReplicationConfig subject = conf.getObject(ReplicationConfig.class);
+
+ // THEN
+ assertEquals(REPLICATION_MAX_STREAMS_DEFAULT,
+ subject.getReplicationMaxStreams());
+ }
+
+ @Test
+ public void isCreatedWitDefaultValues() {
+ // GIVEN
+ OzoneConfiguration conf = new OzoneConfiguration();
+
+ // WHEN
+ ReplicationConfig subject = conf.getObject(ReplicationConfig.class);
+
+ // THEN
+ assertEquals(REPLICATION_MAX_STREAMS_DEFAULT,
+ subject.getReplicationMaxStreams());
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]