This is an automated email from the ASF dual-hosted git repository.
sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new f388317a6c8 HDDS-12554. Support callback on completed reconfiguration
(#8391)
f388317a6c8 is described below
commit f388317a6c848708fe762e833e5201ceecee0b6f
Author: Sarveksha Yeshavantha Raju
<[email protected]>
AuthorDate: Fri Jun 6 12:08:39 2025 +0530
HDDS-12554. Support callback on completed reconfiguration (#8391)
---
.../hadoop/hdds/utils/BackgroundService.java | 43 +++--
.../apache/hadoop/ozone/HddsDatanodeService.java | 2 +
.../hadoop/hdds/conf/ReconfigurableBase.java | 209 +++++++++++++++++++++
.../hdds/conf/ReconfigurationChangeCallback.java | 29 +++
.../hadoop/hdds/conf/ReconfigurationHandler.java | 54 +++++-
.../hdds/scm/server/StorageContainerManager.java | 2 +
.../ozone/reconfig/TestOmReconfiguration.java | 2 +
.../hadoop/ozone/shell/TestReconfigShell.java | 55 +++++-
.../src/test/resources/ozone-site.xml | 12 ++
.../org/apache/hadoop/ozone/om/OzoneManager.java | 13 +-
.../ozone/om/service/DirectoryDeletingService.java | 23 +++
11 files changed, 428 insertions(+), 16 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java
index 959bee8d8c5..a5df9a1776e 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java
@@ -41,12 +41,14 @@ public abstract class BackgroundService {
LoggerFactory.getLogger(BackgroundService.class);
// Executor to launch child tasks
- private final ScheduledThreadPoolExecutor exec;
- private final ThreadGroup threadGroup;
+ private ScheduledThreadPoolExecutor exec;
+ private ThreadGroup threadGroup;
private final String serviceName;
- private final long interval;
+ private long interval;
private final long serviceTimeoutInNanos;
- private final TimeUnit unit;
+ private TimeUnit unit;
+ private final int threadPoolSize;
+ private final String threadNamePrefix;
private final PeriodicalTask service;
public BackgroundService(String serviceName, long interval,
@@ -62,14 +64,9 @@ public BackgroundService(String serviceName, long interval,
this.serviceName = serviceName;
this.serviceTimeoutInNanos = TimeDuration.valueOf(serviceTimeout, unit)
.toLong(TimeUnit.NANOSECONDS);
- threadGroup = new ThreadGroup(serviceName);
- ThreadFactory threadFactory = new ThreadFactoryBuilder()
- .setThreadFactory(r -> new Thread(threadGroup, r))
- .setDaemon(true)
- .setNameFormat(threadNamePrefix + serviceName + "#%d")
- .build();
- exec = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(
- threadPoolSize, threadFactory);
+ this.threadPoolSize = threadPoolSize;
+ this.threadNamePrefix = threadNamePrefix;
+ initExecutorAndThreadGroup();
service = new PeriodicalTask();
}
@@ -103,10 +100,20 @@ public void runPeriodicalTaskNow() throws Exception {
}
// start service
- public void start() {
+ public synchronized void start() {
+ if (exec == null || exec.isShutdown() || exec.isTerminated()) {
+ initExecutorAndThreadGroup();
+ }
+ LOG.info("Starting service {} with interval {} {}", serviceName,
+ interval, unit.name().toLowerCase());
exec.scheduleWithFixedDelay(service, 0, interval, unit);
}
+ protected synchronized void setInterval(long newInterval, TimeUnit newUnit) {
+ this.interval = newInterval;
+ this.unit = newUnit;
+ }
+
public abstract BackgroundTaskQueue getTasks();
/**
@@ -172,4 +179,14 @@ public void shutdown() {
threadGroup.destroy();
}
}
+
+ private void initExecutorAndThreadGroup() {
+ threadGroup = new ThreadGroup(serviceName);
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setThreadFactory(r -> new Thread(threadGroup, r))
+ .setDaemon(true)
+ .setNameFormat(threadNamePrefix + serviceName + "#%d")
+ .build();
+ exec = (ScheduledThreadPoolExecutor)
Executors.newScheduledThreadPool(threadPoolSize, threadFactory);
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
index 585cab9d38a..31dab87935e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
@@ -287,6 +287,8 @@ public String getNamespace() {
.register(REPLICATION_STREAMS_LIMIT_KEY,
this::reconfigReplicationStreamsLimit);
+
reconfigurationHandler.setReconfigurationCompleteCallback(reconfigurationHandler.defaultLoggingCallback());
+
datanodeStateMachine = new DatanodeStateMachine(this, datanodeDetails,
conf,
dnCertClient, secretKeyClient, this::terminateDatanode,
reconfigurationHandler);
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java
new file mode 100644
index 00000000000..3d3f7e29662
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.conf;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.ConfigRedactor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.conf.Reconfigurable;
+import org.apache.hadoop.conf.ReconfigurationException;
+import org.apache.hadoop.conf.ReconfigurationTaskStatus;
+import org.apache.hadoop.conf.ReconfigurationUtil;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class to support dynamic reconfiguration of configuration properties
at runtime.
+ */
+public abstract class ReconfigurableBase extends Configured implements
Reconfigurable {
+ private static final Logger LOG =
LoggerFactory.getLogger(ReconfigurableBase.class);
+ private final ReconfigurationUtil reconfigurationUtil = new
ReconfigurationUtil();
+ private Thread reconfigThread = null;
+ private volatile boolean shouldRun = true;
+ private final Object reconfigLock = new Object();
+ private long startTime = 0L;
+ private long endTime = 0L;
+ private Map<ReconfigurationUtil.PropertyChange, Optional<String>> status =
null;
+ private final Collection<Consumer<ReconfigurationTaskStatus>>
reconfigurationCompleteCallbacks = new ArrayList<>();
+
+ public ReconfigurableBase(Configuration conf) {
+ super(conf == null ? new Configuration() : conf);
+ }
+
+ protected abstract Configuration getNewConf();
+
+ @VisibleForTesting
+ public Collection<ReconfigurationUtil.PropertyChange>
getChangedProperties(Configuration newConf,
+ Configuration oldConf) {
+ return this.reconfigurationUtil.parseChangedProperties(newConf, oldConf);
+ }
+
+ public void startReconfigurationTask() throws IOException {
+ synchronized (this.reconfigLock) {
+ String errorMessage;
+ if (!this.shouldRun) {
+ errorMessage = "The server is stopped.";
+ LOG.warn(errorMessage);
+ throw new IOException(errorMessage);
+ } else if (this.reconfigThread != null) {
+ errorMessage = "Another reconfiguration task is running.";
+ LOG.warn(errorMessage);
+ throw new IOException(errorMessage);
+ } else {
+ this.reconfigThread = new ReconfigurationThread(this);
+ this.reconfigThread.setDaemon(true);
+ this.reconfigThread.setName("Reconfiguration Task");
+ this.reconfigThread.start();
+ this.startTime = Time.now();
+ }
+ }
+ }
+
+ public ReconfigurationTaskStatus getReconfigurationTaskStatus() {
+ synchronized (this.reconfigLock) {
+ return this.reconfigThread != null ? new
ReconfigurationTaskStatus(this.startTime, 0L, null) :
+ new ReconfigurationTaskStatus(this.startTime, this.endTime,
this.status);
+ }
+ }
+
+ public void shutdownReconfigurationTask() {
+ Thread tempThread;
+ synchronized (this.reconfigLock) {
+ this.shouldRun = false;
+ if (this.reconfigThread == null) {
+ return;
+ }
+
+ tempThread = this.reconfigThread;
+ this.reconfigThread = null;
+ }
+
+ try {
+ tempThread.join();
+ } catch (InterruptedException ignored) {
+ }
+
+ }
+
+ @Override
+ public final void reconfigureProperty(String property, String newVal) throws
ReconfigurationException {
+ if (this.isPropertyReconfigurable(property)) {
+ LOG.info("changing property " + property + " to " + newVal);
+ synchronized (this.getConf()) {
+ this.getConf().get(property);
+ String effectiveValue = this.reconfigurePropertyImpl(property, newVal);
+ if (newVal != null) {
+ this.getConf().set(property, effectiveValue);
+ } else {
+ this.getConf().unset(property);
+ }
+ }
+ } else {
+ throw new ReconfigurationException(property, newVal,
this.getConf().get(property));
+ }
+ }
+
+ @Override
+ public abstract Collection<String> getReconfigurableProperties();
+
+ @Override
+ public boolean isPropertyReconfigurable(String property) {
+ return this.getReconfigurableProperties().contains(property);
+ }
+
+ protected abstract String reconfigurePropertyImpl(String var1, String var2)
throws ReconfigurationException;
+
+ private static class ReconfigurationThread extends Thread {
+ private final ReconfigurableBase parent;
+
+ ReconfigurationThread(ReconfigurableBase base) {
+ this.parent = base;
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Starting reconfiguration task.");
+ Configuration oldConf = this.parent.getConf();
+ Configuration newConf = this.parent.getNewConf();
+ Collection<ReconfigurationUtil.PropertyChange> changes =
this.parent.getChangedProperties(newConf, oldConf);
+ Map<ReconfigurationUtil.PropertyChange, Optional<String>> results =
Maps.newHashMap();
+ ConfigRedactor oldRedactor = new ConfigRedactor(oldConf);
+ ConfigRedactor newRedactor = new ConfigRedactor(newConf);
+
+ for (ReconfigurationUtil.PropertyChange change : changes) {
+ String errorMessage = null;
+ String oldValRedacted = oldRedactor.redact(change.prop, change.oldVal);
+ String newValRedacted = newRedactor.redact(change.prop, change.newVal);
+ if (!this.parent.isPropertyReconfigurable(change.prop)) {
+ LOG.info(String.format("Property %s is not configurable: old value:
%s, new value: %s",
+ change.prop, oldValRedacted, newValRedacted));
+ } else {
+ LOG.info("Change property: " + change.prop + " from \"" +
+ (change.oldVal == null ? "<default>" : oldValRedacted) + "\" to
\"" +
+ (change.newVal == null ? "<default>" : newValRedacted) + "\".");
+
+ try {
+ String effectiveValue =
this.parent.reconfigurePropertyImpl(change.prop, change.newVal);
+ if (change.newVal != null) {
+ oldConf.set(change.prop, effectiveValue);
+ } else {
+ oldConf.unset(change.prop);
+ }
+ } catch (ReconfigurationException reconfException) {
+ Throwable cause = reconfException.getCause();
+ errorMessage = cause == null ? reconfException.getMessage() :
cause.getMessage();
+ LOG.error("Failed to reconfigure property {}: {}", change.prop,
errorMessage, reconfException);
+ }
+
+ results.put(change, Optional.ofNullable(errorMessage));
+ }
+ }
+
+ synchronized (this.parent.reconfigLock) {
+ this.parent.endTime = Time.now();
+ this.parent.status = Collections.unmodifiableMap(results);
+ this.parent.reconfigThread = null;
+
+ for (Consumer<ReconfigurationTaskStatus> callback :
parent.reconfigurationCompleteCallbacks) {
+ try {
+ callback.accept(parent.getReconfigurationTaskStatus());
+ } catch (Exception e) {
+ LOG.warn("Reconfiguration complete callback threw exception", e);
+ }
+ }
+ }
+ }
+ }
+
+ public void
addReconfigurationCompleteCallback(Consumer<ReconfigurationTaskStatus>
callback) {
+ synchronized (reconfigLock) {
+ this.reconfigurationCompleteCallbacks.add(callback);
+ }
+ }
+
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationChangeCallback.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationChangeCallback.java
new file mode 100644
index 00000000000..810df7870d1
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationChangeCallback.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.conf;
+
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Callback interface to handle configuration changes after a reconfiguration
task completes.
+ */
+@FunctionalInterface
+public interface ReconfigurationChangeCallback {
+ void onPropertiesChanged(Map<String, Boolean> changedKeys, Configuration
newConf);
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java
index a594bfa2760..979525f7a1a 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java
@@ -22,18 +22,22 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
import java.util.function.UnaryOperator;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.ReconfigurableBase;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
+import org.apache.hadoop.conf.ReconfigurationUtil;
import org.apache.hadoop.hdds.protocol.ReconfigureProtocol;
import org.apache.ratis.util.function.CheckedConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Keeps track of reconfigurable properties and the corresponding functions
@@ -42,16 +46,64 @@
public class ReconfigurationHandler extends ReconfigurableBase
implements ReconfigureProtocol {
+ private static final Logger LOG =
LoggerFactory.getLogger(ReconfigurationHandler.class);
private final String name;
private final CheckedConsumer<String, IOException> requireAdminPrivilege;
private final Map<String, UnaryOperator<String>> properties =
new ConcurrentHashMap<>();
+ private final List<ReconfigurationChangeCallback> completeCallbacks = new
ArrayList<>();
+ private BiConsumer<ReconfigurationTaskStatus, Configuration>
reconfigurationStatusListener;
+
+ public void registerCompleteCallback(ReconfigurationChangeCallback callback)
{
+ completeCallbacks.add(callback);
+ }
+
+ public void
setReconfigurationCompleteCallback(BiConsumer<ReconfigurationTaskStatus,
Configuration>
+ statusListener) {
+ this.reconfigurationStatusListener = statusListener;
+ }
+
+ public BiConsumer<ReconfigurationTaskStatus, Configuration>
defaultLoggingCallback() {
+ return (status, conf) -> {
+ if (status.getStatus() != null && !status.getStatus().isEmpty()) {
+ LOG.info("Reconfiguration completed with {} updated properties.",
+ status.getStatus().size());
+ } else {
+ LOG.info("Reconfiguration complete. No properties were changed.");
+ }
+ };
+ }
+
+ private void triggerCompleteCallbacks(ReconfigurationTaskStatus status,
Configuration newConf) {
+ if (status.getStatus() != null && !status.getStatus().isEmpty()) {
+ Map<String, Boolean> changedKeys = new HashMap<>();
+ for (ReconfigurationUtil.PropertyChange change :
status.getStatus().keySet()) {
+ boolean deleted = change.newVal == null;
+ changedKeys.put(change.prop, !deleted);
+ }
+ for (ReconfigurationChangeCallback callback : completeCallbacks) {
+ callback.onPropertiesChanged(changedKeys, newConf);
+ }
+ }
+
+ if (reconfigurationStatusListener != null) {
+ reconfigurationStatusListener.accept(status, newConf);
+ }
+ }
+
public ReconfigurationHandler(String name, OzoneConfiguration config,
CheckedConsumer<String, IOException> requireAdminPrivilege) {
super(config);
this.name = name;
this.requireAdminPrivilege = requireAdminPrivilege;
+
+ // Register callback on reconfiguration complete
+ addReconfigurationCompleteCallback(status -> {
+ Configuration newConf = getNewConf();
+ triggerCompleteCallbacks(status, newConf);
+ });
+
}
public ReconfigurationHandler register(
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 7773a91fec9..36b29c6a079 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -403,6 +403,8 @@ private StorageContainerManager(OzoneConfiguration conf,
.register(OZONE_READONLY_ADMINISTRATORS,
this::reconfOzoneReadOnlyAdmins);
+
reconfigurationHandler.setReconfigurationCompleteCallback(reconfigurationHandler.defaultLoggingCallback());
+
initializeSystemManagers(conf, configurator);
if (isSecretKeyEnable(securityConfig)) {
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestOmReconfiguration.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestOmReconfiguration.java
index 05e7e2f0f3e..55172f78f00 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestOmReconfiguration.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestOmReconfiguration.java
@@ -19,6 +19,7 @@
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_READONLY_ADMINISTRATORS;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED_DEFAULT;
@@ -51,6 +52,7 @@ void reconfigurableProperties() {
.add(OZONE_KEY_DELETING_LIMIT_PER_TASK)
.add(OZONE_OM_VOLUME_LISTALL_ALLOWED)
.add(OZONE_READONLY_ADMINISTRATORS)
+ .add(OZONE_DIR_DELETING_SERVICE_INTERVAL)
.addAll(new OmConfig().reconfigurableProperties())
.build();
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestReconfigShell.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestReconfigShell.java
index b43536f5bd7..289044f89db 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestReconfigShell.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestReconfigShell.java
@@ -18,13 +18,18 @@
package org.apache.hadoop.ozone.shell;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
import static org.assertj.core.api.Assertions.assertThat;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
-import org.apache.hadoop.conf.ReconfigurableBase;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.ReconfigurationException;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.ReconfigurableBase;
+import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -33,7 +38,9 @@
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.admin.OzoneAdmin;
import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.GenericTestUtils.LogCapturer;
import org.apache.ozone.test.NonHATests;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -47,6 +54,8 @@
public abstract class TestReconfigShell implements NonHATests.TestCase {
private OzoneAdmin ozoneAdmin;
+ private OzoneConfiguration conf;
+ private ReconfigurationHandler reconfigurationHandler;
private GenericTestUtils.PrintStreamCapturer out;
private GenericTestUtils.PrintStreamCapturer err;
@@ -55,6 +64,8 @@ void capture() {
out = GenericTestUtils.captureOut();
err = GenericTestUtils.captureErr();
ozoneAdmin = new OzoneAdmin();
+ conf = new OzoneConfiguration();
+ reconfigurationHandler =
cluster().getOzoneManager().getReconfigurationHandler();
}
@AfterEach
@@ -77,6 +88,35 @@ void testOzoneManagerGetReconfigurationProperties() {
executeAndAssertProperties(om.getReconfigurationHandler(), "OM", socket);
}
+ @Test
+ void testDirectoryDeletingServiceIntervalReconfiguration() throws
ReconfigurationException {
+ OzoneManager om = cluster().getOzoneManager();
+ InetSocketAddress socket = om.getOmRpcServerAddr();
+ LogCapturer logCapturer =
LogCapturer.captureLogs(DirectoryDeletingService.class);
+
+ String initialInterval = "1m";
+ String intervalFromXML = "2m"; //config value set in ozone-site.xml
+ long intervalFromXMLInSeconds = TimeUnit.MINUTES.toSeconds(2); //120
seconds
+
+
reconfigurationHandler.reconfigurePropertyImpl(OZONE_DIR_DELETING_SERVICE_INTERVAL,
initialInterval);
+
assertThat(reconfigurationHandler.getConf().get(OZONE_DIR_DELETING_SERVICE_INTERVAL)).isEqualTo(initialInterval);
+
+ //Start the reconfiguration task
+ executeAndAssertStart("OM", socket);
+ //If config value is set in ozone-site.xml then it is picked up during
reconfiguration
+
assertThat(conf.get(OZONE_DIR_DELETING_SERVICE_INTERVAL)).isEqualTo(intervalFromXML);
+
+ executeAndAssertStatus("OM", socket);
+
assertThat(reconfigurationHandler.getConf().get(OZONE_DIR_DELETING_SERVICE_INTERVAL)).isEqualTo(intervalFromXML);
+ assertThat(out.get()).contains(
+ String.format("SUCCESS: Changed property %s",
OZONE_DIR_DELETING_SERVICE_INTERVAL)
+ );
+ assertThat(logCapturer.getOutput()).contains(
+ String.format("Updating and restarting DirectoryDeletingService with
interval: %d %s",
+ intervalFromXMLInSeconds, TimeUnit.SECONDS.name().toLowerCase())
+ );
+ }
+
@Test
void testStorageContainerManagerGetReconfigurationProperties() {
StorageContainerManager scm = cluster().getStorageContainerManager();
@@ -130,4 +170,17 @@ private void executeForInServiceDatanodes(int
expectedCount) {
private String getAddress(InetSocketAddress socket) {
return socket.getHostString() + ":" + socket.getPort();
}
+
+ private void executeAndAssertStart(String service, InetSocketAddress socket)
{
+ String address = socket.getHostString() + ":" + socket.getPort();
+ ozoneAdmin.getCmd().execute("reconfig", "--service", service, "--address",
address, "start");
+ assertThat(out.get()).contains(service + ": Started reconfiguration task
on node [" + address + "]");
+ }
+
+ private void executeAndAssertStatus(String service, InetSocketAddress
socket) {
+ String address = socket.getHostString() + ":" + socket.getPort();
+ ozoneAdmin.getCmd().execute("reconfig", "--service", service, "--address",
address, "status");
+ assertThat(out.get()).contains(service + ": Reconfiguring status for node
[" + address + "]: started");
+ }
+
}
diff --git a/hadoop-ozone/integration-test/src/test/resources/ozone-site.xml
b/hadoop-ozone/integration-test/src/test/resources/ozone-site.xml
index 5ea2eb89dfa..2b07b1d060d 100644
--- a/hadoop-ozone/integration-test/src/test/resources/ozone-site.xml
+++ b/hadoop-ozone/integration-test/src/test/resources/ozone-site.xml
@@ -127,5 +127,17 @@
<name>ozone.client.datastream.window.size</name>
<value>8MB</value>
</property>
+ <property>
+ <name>ozone.readonly.administrators</name>
+ <value>admin</value>
+ </property>
+ <property>
+ <name>ozone.administrators</name>
+ <value>admin</value>
+ </property>
+ <property>
+ <name>ozone.directory.deleting.service.interval</name>
+ <value>2m</value>
+ </property>
</configuration>
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index ad1741df0a5..ec2b9964f53 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -52,6 +52,7 @@
import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT_DEFAULT;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_EDEKCACHELOADER_INITIAL_DELAY_MS_DEFAULT;
@@ -280,6 +281,7 @@
import org.apache.hadoop.ozone.om.s3.S3SecretCacheProvider;
import org.apache.hadoop.ozone.om.s3.S3SecretStoreProvider;
import org.apache.hadoop.ozone.om.service.CompactDBService;
+import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
import org.apache.hadoop.ozone.om.service.OMRangerBGSyncService;
import org.apache.hadoop.ozone.om.service.QuotaRepairTask;
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
@@ -494,6 +496,7 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
// instance creation every single time.
private UncheckedAutoCloseableSupplier<IOmMetadataReader> rcOmMetadataReader;
private OmSnapshotManager omSnapshotManager;
+ private volatile DirectoryDeletingService dirDeletingService;
@SuppressWarnings("methodlength")
private OzoneManager(OzoneConfiguration conf, StartupOption startupOption)
@@ -519,7 +522,10 @@ private OzoneManager(OzoneConfiguration conf,
StartupOption startupOption)
this::reconfOzoneReadOnlyAdmins)
.register(OZONE_OM_VOLUME_LISTALL_ALLOWED,
this::reconfigureAllowListAllVolumes)
.register(OZONE_KEY_DELETING_LIMIT_PER_TASK,
- this::reconfOzoneKeyDeletingLimitPerTask);
+ this::reconfOzoneKeyDeletingLimitPerTask)
+ .register(OZONE_DIR_DELETING_SERVICE_INTERVAL,
this::reconfOzoneDirDeletingServiceInterval);
+
+
reconfigurationHandler.setReconfigurationCompleteCallback(reconfigurationHandler.defaultLoggingCallback());
versionManager = new OMLayoutVersionManager(omStorage.getLayoutVersion());
upgradeFinalizer = new OMUpgradeFinalizer(versionManager);
@@ -5148,6 +5154,11 @@ private String reconfigureAllowListAllVolumes(String
newVal) {
return String.valueOf(allowListAllVolumes);
}
+ private String reconfOzoneDirDeletingServiceInterval(String newVal) {
+ getConfiguration().set(OZONE_DIR_DELETING_SERVICE_INTERVAL, newVal);
+ return newVal;
+ }
+
public void validateReplicationConfig(ReplicationConfig replicationConfig)
throws OMException {
try {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
index 7451032492e..ad90490101c 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
@@ -17,6 +17,9 @@
package org.apache.hadoop.ozone.om.service;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT;
+
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
@@ -29,6 +32,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
@@ -100,9 +104,28 @@ public DirectoryDeletingService(long interval, TimeUnit
unit,
this.isRunningOnAOS = new AtomicBoolean(false);
this.dirDeletingCorePoolSize = dirDeletingServiceCorePoolSize;
deletedDirSupplier = new DeletedDirSupplier();
+ registerReconfigCallbacks(ozoneManager.getReconfigurationHandler(),
configuration);
taskCount.set(0);
}
+ public void registerReconfigCallbacks(ReconfigurationHandler handler,
OzoneConfiguration conf) {
+ handler.registerCompleteCallback((changedKeys, newConf) -> {
+ if (changedKeys.containsKey(OZONE_DIR_DELETING_SERVICE_INTERVAL)) {
+ updateAndRestart(conf);
+ }
+ });
+ }
+
+ private synchronized void updateAndRestart(OzoneConfiguration conf) {
+ long newInterval =
conf.getTimeDuration(OZONE_DIR_DELETING_SERVICE_INTERVAL,
+ OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT, TimeUnit.SECONDS);
+ LOG.info("Updating and restarting DirectoryDeletingService with interval:
{} {}",
+ newInterval, TimeUnit.SECONDS.name().toLowerCase());
+ shutdown();
+ setInterval(newInterval, TimeUnit.SECONDS);
+ start();
+ }
+
private boolean shouldRun() {
if (getOzoneManager() == null) {
// OzoneManager can be null for testing
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]