This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new a348980  HDDS-6134. Move replication-specific config to 
ReplicationServer (#2943)
a348980 is described below

commit a348980cec368b7cd0a9acb09640146a24f62a2c
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue Jan 11 15:50:43 2022 +0100

    HDDS-6134. Move replication-specific config to ReplicationServer (#2943)
---
 .../common/statemachine/DatanodeConfiguration.java | 32 ---------
 .../common/statemachine/DatanodeStateMachine.java  |  5 +-
 .../container/replication/ReplicationServer.java   | 56 ++++++++++++++--
 .../replication/ReplicationSupervisor.java         |  8 +++
 .../statemachine/TestDatanodeConfiguration.java    | 11 ----
 .../replication/TestReplicationConfig.java         | 75 ++++++++++++++++++++++
 6 files changed, 137 insertions(+), 50 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
index 14ae4c9..be83d9b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
@@ -37,8 +37,6 @@ public class DatanodeConfiguration {
   private static final Logger LOG =
       LoggerFactory.getLogger(DatanodeConfiguration.class);
 
-  static final String REPLICATION_STREAMS_LIMIT_KEY =
-      "hdds.datanode.replication.streams.limit";
   static final String CONTAINER_DELETE_THREADS_MAX_KEY =
       "hdds.datanode.container.delete.threads.max";
   static final String PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY =
@@ -57,8 +55,6 @@ public class DatanodeConfiguration {
 
   static final boolean CHUNK_DATA_VALIDATION_CHECK_DEFAULT = false;
 
-  static final int REPLICATION_MAX_STREAMS_DEFAULT = 10;
-
   static final long PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT = 60;
 
   static final int FAILED_VOLUMES_TOLERATED_DEFAULT = -1;
@@ -72,19 +68,6 @@ public class DatanodeConfiguration {
       Duration.ofMinutes(10).toMillis();
 
   /**
-   * The maximum number of replication commands a single datanode can execute
-   * simultaneously.
-   */
-  @Config(key = "replication.streams.limit",
-      type = ConfigType.INT,
-      defaultValue = "10",
-      tags = {DATANODE},
-      description = "The maximum number of replication commands a single " +
-          "datanode can execute simultaneously"
-  )
-  private int replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT;
-
-  /**
    * Number of threads per volume that Datanode will use for chunk read.
    */
   @Config(key = "read.chunk.threads.per.volume",
@@ -264,13 +247,6 @@ public class DatanodeConfiguration {
 
   @PostConstruct
   public void validate() {
-    if (replicationMaxStreams < 1) {
-      LOG.warn(REPLICATION_STREAMS_LIMIT_KEY + " must be greater than zero " +
-              "and was set to {}. Defaulting to {}",
-          replicationMaxStreams, REPLICATION_MAX_STREAMS_DEFAULT);
-      replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT;
-    }
-
     if (containerDeleteThreads < 1) {
       LOG.warn(CONTAINER_DELETE_THREADS_MAX_KEY + " must be greater than zero" 
+
               " and was set to {}. Defaulting to {}",
@@ -316,18 +292,10 @@ public class DatanodeConfiguration {
     }
   }
 
-  public void setReplicationMaxStreams(int replicationMaxStreams) {
-    this.replicationMaxStreams = replicationMaxStreams;
-  }
-
   public void setContainerDeleteThreads(int containerDeleteThreads) {
     this.containerDeleteThreads = containerDeleteThreads;
   }
 
-  public int getReplicationMaxStreams() {
-    return replicationMaxStreams;
-  }
-
   public int getContainerDeleteThreads() {
     return containerDeleteThreads;
   }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index ee5e87a..e1fc297 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -55,6 +55,7 @@ import 
org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
 import 
org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator;
 import org.apache.hadoop.ozone.container.replication.MeasuredReplicator;
+import 
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
 import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
 import 
org.apache.hadoop.ozone.container.replication.ReplicationSupervisorMetrics;
 import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
@@ -166,9 +167,11 @@ public class DatanodeStateMachine implements Closeable {
 
     replicatorMetrics = new MeasuredReplicator(replicator);
 
+    ReplicationConfig replicationConfig =
+        conf.getObject(ReplicationConfig.class);
     supervisor =
         new ReplicationSupervisor(container.getContainerSet(), context,
-            replicatorMetrics, dnConf.getReplicationMaxStreams());
+            replicatorMetrics, replicationConfig);
 
     replicationSupervisorMetrics =
         ReplicationSupervisorMetrics.create(supervisor);
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
index dd5f4c4..bf8d6f1 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,7 +22,8 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hdds.conf.Config;
 import org.apache.hadoop.hdds.conf.ConfigGroup;
-import org.apache.hadoop.hdds.conf.ConfigTag;
+import org.apache.hadoop.hdds.conf.ConfigType;
+import org.apache.hadoop.hdds.conf.PostConstruct;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.tracing.GrpcServerInterceptor;
@@ -39,6 +40,9 @@ import 
org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.hdds.conf.ConfigTag.DATANODE;
+import static org.apache.hadoop.hdds.conf.ConfigTag.MANAGEMENT;
+
 /**
  * Separated network server for server2server container replication.
  */
@@ -128,12 +132,33 @@ public class ReplicationServer {
   /**
    * Replication-related configuration.
    */
-  @ConfigGroup(prefix = "hdds.datanode.replication")
+  @ConfigGroup(prefix = ReplicationConfig.PREFIX)
   public static final class ReplicationConfig {
 
-    @Config(key = "port", defaultValue = "9886", description = "Port used for"
-        + " the server2server replication server", tags = {
-        ConfigTag.MANAGEMENT})
+    public static final String PREFIX = "hdds.datanode.replication";
+    public static final String STREAMS_LIMIT_KEY = "streams.limit";
+
+    public static final String REPLICATION_STREAMS_LIMIT_KEY =
+        PREFIX + "." + STREAMS_LIMIT_KEY;
+
+    public static final int REPLICATION_MAX_STREAMS_DEFAULT = 10;
+
+    /**
+     * The maximum number of replication commands a single datanode can execute
+     * simultaneously.
+     */
+    @Config(key = STREAMS_LIMIT_KEY,
+        type = ConfigType.INT,
+        defaultValue = "10",
+        tags = {DATANODE},
+        description = "The maximum number of replication commands a single " +
+            "datanode can execute simultaneously"
+    )
+    private int replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT;
+
+    @Config(key = "port", defaultValue = "9886",
+        description = "Port used for the server2server replication server",
+        tags = {DATANODE, MANAGEMENT})
     private int port;
 
     public int getPort() {
@@ -144,6 +169,25 @@ public class ReplicationServer {
       this.port = portParam;
       return this;
     }
+
+    public int getReplicationMaxStreams() {
+      return replicationMaxStreams;
+    }
+
+    public void setReplicationMaxStreams(int replicationMaxStreams) {
+      this.replicationMaxStreams = replicationMaxStreams;
+    }
+
+    @PostConstruct
+    public void validate() {
+      if (replicationMaxStreams < 1) {
+        LOG.warn(REPLICATION_STREAMS_LIMIT_KEY + " must be greater than zero " 
+
+                "and was set to {}. Defaulting to {}",
+            replicationMaxStreams, REPLICATION_MAX_STREAMS_DEFAULT);
+        replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT;
+      }
+    }
+
   }
 
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
index 05a4173..4cb826c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import 
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
 import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -73,6 +74,13 @@ public class ReplicationSupervisor {
 
   public ReplicationSupervisor(
       ContainerSet containerSet, StateContext context,
+      ContainerReplicator replicator, ReplicationConfig replicationConfig) {
+    this(containerSet, context, replicator,
+        replicationConfig.getReplicationMaxStreams());
+  }
+
+  public ReplicationSupervisor(
+      ContainerSet containerSet, StateContext context,
       ContainerReplicator replicator, int poolSize) {
     this(containerSet, context, replicator, new ThreadPoolExecutor(
         poolSize, poolSize, 60, TimeUnit.SECONDS,
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java
index 1b42654..5f1b0a6 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java
@@ -28,8 +28,6 @@ import static 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConf
 import static 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_MIN_GAP_DEFAULT;
 import static 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_TIMEOUT_DEFAULT;
 import static 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_TIMEOUT_KEY;
-import static 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.REPLICATION_MAX_STREAMS_DEFAULT;
-import static 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.REPLICATION_STREAMS_LIMIT_KEY;
 import static 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY;
 import static 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT;
 import static 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.FAILED_DATA_VOLUMES_TOLERATED_KEY;
@@ -46,14 +44,12 @@ public class TestDatanodeConfiguration {
   @Test
   public void acceptsValidValues() {
     // GIVEN
-    int validReplicationLimit = 123;
     int validDeleteThreads = 42;
     long validDiskCheckIntervalMinutes = 60;
     int validFailedVolumesTolerated = 10;
     long validDiskCheckMinGap = 2;
     long validDiskCheckTimeout = 1;
     OzoneConfiguration conf = new OzoneConfiguration();
-    conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, validReplicationLimit);
     conf.setInt(CONTAINER_DELETE_THREADS_MAX_KEY, validDeleteThreads);
     conf.setLong(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY,
         validDiskCheckIntervalMinutes);
@@ -70,7 +66,6 @@ public class TestDatanodeConfiguration {
     DatanodeConfiguration subject = 
conf.getObject(DatanodeConfiguration.class);
 
     // THEN
-    assertEquals(validReplicationLimit, subject.getReplicationMaxStreams());
     assertEquals(validDeleteThreads, subject.getContainerDeleteThreads());
     assertEquals(validDiskCheckIntervalMinutes,
         subject.getPeriodicDiskCheckIntervalMinutes());
@@ -87,14 +82,12 @@ public class TestDatanodeConfiguration {
   @Test
   public void overridesInvalidValues() {
     // GIVEN
-    int invalidReplicationLimit = -5;
     int invalidDeleteThreads = 0;
     long invalidDiskCheckIntervalMinutes = -1;
     int invalidFailedVolumesTolerated = -2;
     long invalidDiskCheckMinGap = -1;
     long invalidDiskCheckTimeout = -1;
     OzoneConfiguration conf = new OzoneConfiguration();
-    conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, invalidReplicationLimit);
     conf.setInt(CONTAINER_DELETE_THREADS_MAX_KEY, invalidDeleteThreads);
     conf.setLong(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY,
         invalidDiskCheckIntervalMinutes);
@@ -111,8 +104,6 @@ public class TestDatanodeConfiguration {
     DatanodeConfiguration subject = 
conf.getObject(DatanodeConfiguration.class);
 
     // THEN
-    assertEquals(REPLICATION_MAX_STREAMS_DEFAULT,
-        subject.getReplicationMaxStreams());
     assertEquals(CONTAINER_DELETE_THREADS_DEFAULT,
         subject.getContainerDeleteThreads());
     assertEquals(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT,
@@ -136,8 +127,6 @@ public class TestDatanodeConfiguration {
     DatanodeConfiguration subject = 
conf.getObject(DatanodeConfiguration.class);
 
     // THEN
-    assertEquals(REPLICATION_MAX_STREAMS_DEFAULT,
-        subject.getReplicationMaxStreams());
     assertEquals(CONTAINER_DELETE_THREADS_DEFAULT,
         subject.getContainerDeleteThreads());
     assertEquals(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT,
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationConfig.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationConfig.java
new file mode 100644
index 0000000..6ab32d6
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationConfig.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import 
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
+import org.junit.Test;
+
+import static 
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig.REPLICATION_MAX_STREAMS_DEFAULT;
+import static 
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig.REPLICATION_STREAMS_LIMIT_KEY;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link ReplicationConfig}.
+ */
+public class TestReplicationConfig {
+
+  @Test
+  public void acceptsValidValues() {
+    // GIVEN
+    int validReplicationLimit = 123;
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, validReplicationLimit);
+
+    // WHEN
+    ReplicationConfig subject = conf.getObject(ReplicationConfig.class);
+
+    // THEN
+    assertEquals(validReplicationLimit, subject.getReplicationMaxStreams());
+  }
+
+  @Test
+  public void overridesInvalidValues() {
+    // GIVEN
+    int invalidReplicationLimit = -5;
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, invalidReplicationLimit);
+
+    // WHEN
+    ReplicationConfig subject = conf.getObject(ReplicationConfig.class);
+
+    // THEN
+    assertEquals(REPLICATION_MAX_STREAMS_DEFAULT,
+        subject.getReplicationMaxStreams());
+  }
+
+  @Test
+  public void isCreatedWitDefaultValues() {
+    // GIVEN
+    OzoneConfiguration conf = new OzoneConfiguration();
+
+    // WHEN
+    ReplicationConfig subject = conf.getObject(ReplicationConfig.class);
+
+    // THEN
+    assertEquals(REPLICATION_MAX_STREAMS_DEFAULT,
+        subject.getReplicationMaxStreams());
+  }
+
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to