This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 026de443b71 [fix][broker] Run ResourceGroup tasks only when
tenants/namespaces registered (#24859)
026de443b71 is described below
commit 026de443b7134e3c15b1e9438204035cd0c2cc59
Author: Vinkal <[email protected]>
AuthorDate: Fri Oct 31 17:01:25 2025 +0530
[fix][broker] Run ResourceGroup tasks only when tenants/namespaces
registered (#24859)
Signed-off-by: Vinkal Chudgar <[email protected]>
(cherry picked from commit 06f7142d32d66a31d05cbe37d57e8bcaf44dce07)
---
.../apache/pulsar/broker/ServiceConfiguration.java | 7 +-
.../broker/resourcegroup/ResourceGroupService.java | 113 ++++++--
.../resourcegroup/ResourceGroupServiceTest.java | 302 ++++++++++++++++++++-
3 files changed, 396 insertions(+), 26 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 6cfc1d4f7dc..99682bfbb33 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1134,7 +1134,12 @@ public class ServiceConfiguration implements
PulsarConfiguration {
@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
- doc = "Default interval to publish usage reports if
resourceUsagePublishToTopic is enabled."
+ doc = "Interval (in seconds) for ResourceGroupService periodic
tasks while resource groups are actively "
+ + "attached to tenants or namespaces. Periodic tasks start
automatically when the first attachment "
+ + "is registered and stop automatically when no
attachments remain. "
+ + "If a ResourceUsageTransportManager is configured (see
resourceUsageTransportClassName), "
+ + "this interval also controls how frequently, usage
reports are published for cross-broker "
+ + "coordination. Dynamic changes take effect at runtime
and reschedule any running tasks."
)
private int resourceUsageTransportPublishIntervalInSecs = 60;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
index 29633ab19fe..379f9f870ea 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
@@ -213,6 +213,9 @@ public class ResourceGroupService implements AutoCloseable{
// Associate this tenant name with the RG.
this.tenantToRGsMap.put(tenantName, rg);
rgTenantRegisters.labels(resourceGroupName).inc();
+
+ // Ensure schedulers are started if this is the first registration.
+ maybeStartSchedulers();
}
/**
@@ -235,6 +238,9 @@ public class ResourceGroupService implements AutoCloseable{
// Dissociate this tenant name from the RG.
this.tenantToRGsMap.remove(tenantName, rg);
rgTenantUnRegisters.labels(resourceGroupName).inc();
+
+ // If this was the last registration (tenant or namespace), stop
schedulers.
+ maybeStopSchedulersIfIdle();
}
/**
@@ -266,6 +272,9 @@ public class ResourceGroupService implements AutoCloseable{
// Associate this NS-name with the RG.
this.namespaceToRGsMap.put(fqNamespaceName, rg);
rgNamespaceRegisters.labels(resourceGroupName).inc();
+
+ // Ensure schedulers are started if this is the first registration.
+ maybeStartSchedulers();
}
/**
@@ -290,6 +299,9 @@ public class ResourceGroupService implements AutoCloseable{
// Dissociate this NS-name from the RG.
this.namespaceToRGsMap.remove(fqNamespaceName, rg);
rgNamespaceUnRegisters.labels(resourceGroupName).inc();
+
+ // If this was the last registration (tenant or namespace), stop
schedulers.
+ maybeStopSchedulersIfIdle();
}
/**
@@ -306,10 +318,16 @@ public class ResourceGroupService implements
AutoCloseable{
public void close() throws Exception {
if (aggregateLocalUsagePeriodicTask != null) {
aggregateLocalUsagePeriodicTask.cancel(true);
+ aggregateLocalUsagePeriodicTask = null;
}
if (calculateQuotaPeriodicTask != null) {
calculateQuotaPeriodicTask.cancel(true);
+ calculateQuotaPeriodicTask = null;
}
+
+ // Ensure the flag is consistent with the stopped state.
+ schedulersRunning.set(false);
+
resourceGroupsMap.clear();
tenantToRGsMap.clear();
namespaceToRGsMap.clear();
@@ -540,6 +558,9 @@ public class ResourceGroupService implements AutoCloseable{
// Periodically aggregate the usage from all topics known to the
BrokerService.
// Visibility for unit testing.
protected void aggregateResourceGroupLocalUsages() {
+ if (!shouldRunPeriodicTasks()) {
+ return;
+ }
final Summary.Timer aggrUsageTimer =
rgUsageAggregationLatency.startTimer();
BrokerService bs = this.pulsar.getBrokerService();
Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats();
@@ -578,7 +599,7 @@ public class ResourceGroupService implements AutoCloseable{
// cancel and re-schedule this task if the period of execution has
changed.
ServiceConfiguration config = pulsar.getConfiguration();
long newPeriodInSeconds =
config.getResourceUsageTransportPublishIntervalInSecs();
- if (newPeriodInSeconds != this.aggregateLocalUsagePeriodInSeconds) {
+ if (schedulersRunning.get() && newPeriodInSeconds !=
this.aggregateLocalUsagePeriodInSeconds) {
if (this.aggregateLocalUsagePeriodicTask == null) {
log.error("aggregateResourceGroupLocalUsages: Unable to find
running task to cancel when "
+ "publish period changed from {} to {} {}",
@@ -602,6 +623,9 @@ public class ResourceGroupService implements AutoCloseable{
// from the reports received from other brokers.
// [Visibility for unit testing.]
protected void calculateQuotaForAllResourceGroups() {
+ if (!shouldRunPeriodicTasks()) {
+ return;
+ }
// Calculate the quota for the next window for this RG, based on the
observed usage.
final Summary.Timer quotaCalcTimer =
rgQuotaCalculationLatency.startTimer();
BytesAndMessagesCount updatedQuota = new BytesAndMessagesCount();
@@ -668,7 +692,7 @@ public class ResourceGroupService implements AutoCloseable{
// cancel and re-schedule this task if the period of execution has
changed.
ServiceConfiguration config = pulsar.getConfiguration();
long newPeriodInSeconds =
config.getResourceUsageTransportPublishIntervalInSecs();
- if (newPeriodInSeconds != this.resourceUsagePublishPeriodInSeconds) {
+ if (schedulersRunning.get() && newPeriodInSeconds !=
this.resourceUsagePublishPeriodInSeconds) {
if (this.calculateQuotaPeriodicTask == null) {
log.error("calculateQuotaForAllResourceGroups: Unable to find
running task to cancel when "
+ "publish period changed from {} to {} {}",
@@ -690,23 +714,72 @@ public class ResourceGroupService implements
AutoCloseable{
}
}
- private void initialize() {
- ServiceConfiguration config = this.pulsar.getConfiguration();
- long periodInSecs =
config.getResourceUsageTransportPublishIntervalInSecs();
- this.aggregateLocalUsagePeriodInSeconds =
this.resourceUsagePublishPeriodInSeconds = periodInSecs;
- this.aggregateLocalUsagePeriodicTask =
this.pulsar.getExecutor().scheduleAtFixedRate(
+ // Returns true if at least one tenant or namespace is registered to
resource group.
+ private boolean hasActiveResourceGroups() {
+ return !tenantToRGsMap.isEmpty() || !namespaceToRGsMap.isEmpty();
+ }
+
+ /**
+ * Whether the periodic ResourceGroupService tasks (aggregation & quota
calculation) should run.
+ * True only when:
+ * 1. the scheduler flag is set,
+ * 2. at least one Resource Group exists locally, and
+ * 3. at least one tenant or namespace is registered to Resource Group.
+ */
+ private boolean shouldRunPeriodicTasks() {
+ return schedulersRunning.get()
+ && !resourceGroupsMap.isEmpty()
+ && hasActiveResourceGroups();
+ }
+
+ // Start periodic aggregation/quota schedulers if we actually need them.
+ private void maybeStartSchedulers() {
+ if (!hasActiveResourceGroups()) {
+ return;
+ }
+ if (schedulersRunning.compareAndSet(false, true)) {
+ final long periodInSecs =
pulsar.getConfiguration().getResourceUsageTransportPublishIntervalInSecs();
+ this.aggregateLocalUsagePeriodInSeconds =
this.resourceUsagePublishPeriodInSeconds = periodInSecs;
+ this.aggregateLocalUsagePeriodicTask =
pulsar.getExecutor().scheduleAtFixedRate(
catchingAndLoggingThrowables(this::aggregateResourceGroupLocalUsages),
- periodInSecs,
- periodInSecs,
- this.timeUnitScale);
- this.calculateQuotaPeriodicTask =
this.pulsar.getExecutor().scheduleAtFixedRate(
+ periodInSecs, periodInSecs, timeUnitScale);
+ this.calculateQuotaPeriodicTask =
pulsar.getExecutor().scheduleAtFixedRate(
catchingAndLoggingThrowables(this::calculateQuotaForAllResourceGroups),
- periodInSecs,
- periodInSecs,
- this.timeUnitScale);
- maxIntervalForSuppressingReportsMSecs =
-
TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds) *
MaxUsageReportSuppressRounds;
+ periodInSecs, periodInSecs, timeUnitScale);
+ maxIntervalForSuppressingReportsMSecs =
+
TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds) *
MaxUsageReportSuppressRounds;
+ if (log.isInfoEnabled()) {
+ log.info("Started ResourceGroupService periodic tasks with
period={} {}", periodInSecs, timeUnitScale);
+ }
+ }
+ }
+ // Stop schedulers when no tenant or namespace registrations remain.
+ private void maybeStopSchedulersIfIdle() {
+ if (hasActiveResourceGroups()) {
+ return;
+ }
+ if (schedulersRunning.compareAndSet(true, false)) {
+ if (aggregateLocalUsagePeriodicTask != null) {
+ aggregateLocalUsagePeriodicTask.cancel(true);
+ aggregateLocalUsagePeriodicTask = null;
+ }
+ if (calculateQuotaPeriodicTask != null) {
+ calculateQuotaPeriodicTask.cancel(true);
+ calculateQuotaPeriodicTask = null;
+ }
+ if (log.isInfoEnabled()) {
+ log.info("Stopped ResourceGroupService periodic tasks because
no registrations remain");
+ }
+ }
+ }
+ private void initialize() {
+ // Store the configured interval. Do not start periodic tasks
unconditionally here.
+ // Schedulers are started by maybeStartSchedulers() when the first
tenant/namespace is registered.
+ final long periodInSecs =
pulsar.getConfiguration().getResourceUsageTransportPublishIntervalInSecs();
+ this.aggregateLocalUsagePeriodInSeconds =
this.resourceUsagePublishPeriodInSeconds = periodInSecs;
+ // if any tenant/namespace registrations already exist,
maybeStartSchedulers() will start the schedulers now.
+ maybeStartSchedulers();
}
private void checkRGCreateParams(String rgName,
org.apache.pulsar.common.policies.data.ResourceGroup rgConfig)
@@ -761,6 +834,9 @@ public class ResourceGroupService implements AutoCloseable{
// Allow a pluggable scale on time units; for testing periodic
functionality.
private TimeUnit timeUnitScale;
+ private final java.util.concurrent.atomic.AtomicBoolean schedulersRunning =
+ new java.util.concurrent.atomic.AtomicBoolean(false);
+
// The maximum number of successive rounds that we can suppress reporting
local usage, because there was no
// substantial change from the prior round. This is to ensure the
reporting does not become too chatty.
// Set this value to one more than the cadence of sending reports; e.g.,
if you want to send every 3rd report,
@@ -864,4 +940,9 @@ public class ResourceGroupService implements AutoCloseable{
ScheduledFuture<?> getCalculateQuotaPeriodicTask() {
return this.calculateQuotaPeriodicTask;
}
+
+ @VisibleForTesting
+ boolean isSchedulersRunning() {
+ return schedulersRunning.get();
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
index f47f7686743..64b9e5d1272 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
@@ -229,9 +229,6 @@ public class ResourceGroupServiceTest extends
MockedPulsarServiceBaseTest {
Assert.assertTrue(myBoolSet.contains(false));
}
- rgs.unRegisterTenant(rgName, tenantName);
- rgs.unRegisterNameSpace(rgName, tenantAndNamespace);
-
BytesAndMessagesCount publishQuota =
rgs.getPublishRateLimiters(rgName);
// Calculated quota is synthetically set to the number of
quota-calculation callbacks.
@@ -242,7 +239,7 @@ public class ResourceGroupServiceTest extends
MockedPulsarServiceBaseTest {
Assert.assertEquals(publishQuota.bytes,
rgConfig.getPublishRateInBytes().longValue());
}
- // Calculate the quota synchronously to avoid waiting for a periodic
call within ResourceGroupService.
+ // Calculate the quota synchronously while an attachment still exists
rgs.calculateQuotaForAllResourceGroups();
publishQuota = rgs.getPublishRateLimiters(rgName);
// The bytes/messages are (synthetically) set from
numAnonymousQuotaCalculations in the above round of
@@ -251,18 +248,57 @@ public class ResourceGroupServiceTest extends
MockedPulsarServiceBaseTest {
Assert.assertTrue(publishQuota.messages > 0 && publishQuota.messages
<= numAnonymousQuotaCalculations);
Assert.assertTrue(publishQuota.bytes > 0 && publishQuota.bytes <=
numAnonymousQuotaCalculations);
+ // Now it is safe to detach. After this point the service is
intentionally idle.
+ rgs.unRegisterTenant(rgName, tenantName);
+ rgs.unRegisterNameSpace(rgName, tenantAndNamespace);
+
rgs.resourceGroupDelete(rgName);
Assert.assertThrows(PulsarAdminException.class, () ->
rgs.getPublishRateLimiters(rgName));
Assert.assertEquals(rgs.getNumResourceGroups(), 0);
}
- @Test
+ /**
+ * Validates that ResourceGroupService#close() cancels scheduled tasks,
clears futures and state.
+ * Steps:
+ * 1) Start periodic tasks by creating a resource group and attaching a
namespace.
+ * 2) Assert both futures are non-null (tasks are scheduled) and the
schedulersRunning flag is true.
+ * 3) Let try-with-resources close the service, then assert both futures
are null, schedulersRunning is false,
+ * and the resource group map is cleared.
+ */
+ @Test(timeOut = 60000)
public void testClose() throws Exception {
- ResourceGroupService service = new ResourceGroupService(pulsar,
TimeUnit.MILLISECONDS, null, null);
- service.close();
-
Assert.assertTrue(service.getAggregateLocalUsagePeriodicTask().isCancelled());
-
Assert.assertTrue(service.getCalculateQuotaPeriodicTask().isCancelled());
+ final String rg = "rg-close";
+ final NamespaceName ns = NamespaceName.get("t-close/ns-close");
+
+ // Create the service once so we can assert state after the
try-with-resources closes it.
+ ResourceGroupService service = createResourceGroupService();
+ try (ResourceGroupService ignored = service) {
+ // Start the periodic tasks: create RG and attach a namespace.
+ service.resourceGroupCreate(rg, new
org.apache.pulsar.common.policies.data.ResourceGroup());
+ service.registerNameSpace(rg, ns);
+
+ // Ensure tasks are started
+ Assert.assertNotNull(service.getAggregateLocalUsagePeriodicTask(),
+ "Aggregate task should be scheduled.");
+ Assert.assertNotNull(service.getCalculateQuotaPeriodicTask(),
+ "Quota task should be scheduled.");
+ Assert.assertTrue(service.isSchedulersRunning(),
+ "SchedulersRunning flag should be true when tasks are
active.");
+
+ // Do not call service.close() here. The try-with-resources will
close it and ensure cleanup.
+ }
+
+ // After the try-with-resources block, service.close() has been
invoked automatically.
+ // Postconditions: futures cleared to null, internal state cleared,
flag off.
+ Assert.assertNull(service.getAggregateLocalUsagePeriodicTask(),
+ "Aggregate task future must be null after close().");
+ Assert.assertNull(service.getCalculateQuotaPeriodicTask(),
+ "Quota task future must be null after close().");
+ Assert.assertEquals(service.getNumResourceGroups(), 0,
+ "Resource group map should be cleared by close().");
+ Assert.assertFalse(service.isSchedulersRunning(),
+ "SchedulersRunning flag should be false after close().");
}
private ResourceGroupService rgs;
@@ -275,4 +311,252 @@ public class ResourceGroupServiceTest extends
MockedPulsarServiceBaseTest {
this.conf.setResourceUsageTransportPublishIntervalInSecs(PUBLISH_INTERVAL_SECS);
admin.clusters().createCluster("test",
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
}
+ /**
+ * Helper method to create a fresh ResourceGroupService instance for
testing.
+ * Each test should create its own instance to ensure isolation.
+ */
+ private ResourceGroupService createResourceGroupService() {
+ return new ResourceGroupService(pulsar);
+ }
+
+ /**
+ * Validates the lazy scheduling lifecycle and deterministic rescheduling
of ResourceGroupService.
+ * Asserts that:
+ * 1) On cold start, and after creating a Resource Group (RG) without any
attachments,
+ * no periodic tasks are scheduled.
+ * 2) Registering the first attachment (tenant or namespace) starts both
periodic tasks.
+ * 3) Updating the publish interval causes rescheduling
+ * - calling aggregateResourceGroupLocalUsages() reschedules only the
aggregation task;
+ * - calling calculateQuotaForAllResourceGroups() reschedules only the
quota-calculation task.
+ * 4) When the last attachment is unregistered (i.e., no tenants or
namespaces remain attached to any RG),
+ * both periodic tasks are cancelled and their ScheduledFuture fields
are cleared.
+ */
+
+ @Test(timeOut = 60000)
+ public void testLazyStartStopAndReschedule() throws Exception {
+ final String rgName = "rg-lazy";
+ final NamespaceName ns = NamespaceName.get("t-lazy/ns-lazy");
+ final int oldInterval =
this.conf.getResourceUsageTransportPublishIntervalInSecs();
+
+ try (ResourceGroupService rgs = createResourceGroupService()) {
+ // Cold start: nothing scheduled
+ Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(),
+ "Aggregate task should be null on cold start.");
+ Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(),
+ "Quota task should be null on cold start.");
+
+ // Create a resource group but do not attach yet. There should
still be nothing scheduled.
+ rgs.resourceGroupCreate(rgName, new
org.apache.pulsar.common.policies.data.ResourceGroup());
+ Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(),
+ "Aggregate task should remain null without attachments.");
+ Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(),
+ "Quota task should remain null without attachments.");
+
+ // Attach a namespace. Both schedulers must start.
+ rgs.registerNameSpace(rgName, ns);
+ Assert.assertNotNull(rgs.getAggregateLocalUsagePeriodicTask(),
+ "Aggregate task should start on first attachment.");
+ Assert.assertNotNull(rgs.getCalculateQuotaPeriodicTask(),
+ "Quota task should start on first attachment.");
+
Assert.assertFalse(rgs.getAggregateLocalUsagePeriodicTask().isCancelled(),
+ "Aggregate task should be running.");
+
Assert.assertFalse(rgs.getCalculateQuotaPeriodicTask().isCancelled(),
+ "Quota task should be running.");
+
+ // Capture current scheduled futures
+ java.util.concurrent.ScheduledFuture<?> oldAgg =
rgs.getAggregateLocalUsagePeriodicTask();
+ java.util.concurrent.ScheduledFuture<?> oldCalc =
rgs.getCalculateQuotaPeriodicTask();
+
+ // Change publish interval dynamically
+ int newPeriod = oldInterval + 1;
+
this.conf.setResourceUsageTransportPublishIntervalInSecs(newPeriod);
+
+ // Trigger aggregate reschedule
+ rgs.aggregateResourceGroupLocalUsages();
+ java.util.concurrent.ScheduledFuture<?> midAgg =
rgs.getAggregateLocalUsagePeriodicTask();
+ java.util.concurrent.ScheduledFuture<?> midCalc =
rgs.getCalculateQuotaPeriodicTask();
+
+ Assert.assertNotSame(midAgg, oldAgg, "Aggregate task should be
rescheduled with a new future.");
+ Assert.assertTrue(oldAgg.isCancelled(), "Old aggregate task should
be cancelled on reschedule.");
+ Assert.assertSame(midCalc, oldCalc, "Quota task should not be
rescheduled by aggregate path.");
+ Assert.assertFalse(oldCalc.isCancelled(),
+ "Old quota task should still be active before its own
reschedule.");
+ Assert.assertFalse(midAgg.isCancelled(), "New aggregate task
should be active.");
+
+ // Now trigger calculate reschedule
+ rgs.calculateQuotaForAllResourceGroups();
+ java.util.concurrent.ScheduledFuture<?> newAgg =
rgs.getAggregateLocalUsagePeriodicTask();
+ java.util.concurrent.ScheduledFuture<?> newCalc =
rgs.getCalculateQuotaPeriodicTask();
+
+ Assert.assertSame(newAgg, midAgg,
+ "Aggregate task was already rescheduled. Future should
remain the same.");
+ Assert.assertNotSame(newCalc, oldCalc, "Quota task should be
rescheduled with a new future.");
+ Assert.assertTrue(oldCalc.isCancelled(), "Old quota task should be
cancelled on its reschedule.");
+ Assert.assertFalse(newCalc.isCancelled(), "New quota task should
be active.");
+
+ // Detach the last attachment. Schedulers must stop and futures
should be cleared.
+ rgs.unRegisterNameSpace(rgName, ns);
+ Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(),
+ "Aggregate task should be cleared after last detach.");
+ Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(),
+ "Quota task should be cleared after last detach.");
+
+ // Cleanup resource group
+ rgs.resourceGroupDelete(rgName);
+ Assert.assertEquals(rgs.getNumResourceGroups(), 0, "Resource group
should be deleted.");
+ } finally {
+
this.conf.setResourceUsageTransportPublishIntervalInSecs(oldInterval);
+ }
+ }
+
+ /**
+ * Verifies that creating a ResourceGroup without tenant or namespace
attachments
+ * does NOT trigger scheduler initialization. This ensures the lazy-start
optimization
+ * correctly avoids unnecessary overhead when ResourceGroups are
configured but unused.
+ */
+ @Test(timeOut = 60000)
+ public void testNoStartOnRGCreateOnly() throws Exception {
+ final String rg = "rg-create-only";
+ final int oldInterval =
this.conf.getResourceUsageTransportPublishIntervalInSecs();
+
+ try (ResourceGroupService rgs = createResourceGroupService()) {
+ // No tasks at cold start
+ Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(),
+ "Aggregate task should be null at cold start.");
+ Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(), "Quota task
should be null at cold start.");
+
+ // Create resource group without attachments. There should still
be nothing scheduled.
+ rgs.resourceGroupCreate(rg, new
org.apache.pulsar.common.policies.data.ResourceGroup());
+ Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(),
+ "Aggregate task should remain null without attachments.");
+ Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(),
+ "Quota task should remain null without attachments.");
+
+ // Calling periodic methods directly must not create schedulers
+ rgs.aggregateResourceGroupLocalUsages();
+ rgs.calculateQuotaForAllResourceGroups();
+ Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(),
+ "Aggregate task should remain null after direct calls.");
+ Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(),
+ "Quota task should remain null after direct calls.");
+
+ // Dynamic config while stopped must not trigger scheduling
+
this.conf.setResourceUsageTransportPublishIntervalInSecs(oldInterval + 7);
+ rgs.aggregateResourceGroupLocalUsages();
+ rgs.calculateQuotaForAllResourceGroups();
+ Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(),
+ "Aggregate task should remain null after config change
while stopped.");
+ Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(),
+ "Quota task should remain null after config change while
stopped.");
+
+ // Cleanup resource group
+ rgs.resourceGroupDelete(rg);
+ Assert.assertEquals(rgs.getNumResourceGroups(), 0, "Resource group
should be deleted.");
+ } finally {
+
this.conf.setResourceUsageTransportPublishIntervalInSecs(oldInterval);
+ }
+ }
+
+ /**
+ * Validates that attaching a tenant (without namespace attachment) to a
ResourceGroup
+ * triggers scheduler initialization, and that detaching the last tenant
stops the schedulers.
+ */
+ @Test(timeOut = 60000)
+ public void testStartOnTenantAttachment() throws Exception {
+ final String rg = "rg-tenant-only";
+ final String tenant = "t-attach";
+
+ try (ResourceGroupService rgs = createResourceGroupService()) {
+ rgs.resourceGroupCreate(rg, new
org.apache.pulsar.common.policies.data.ResourceGroup());
+ rgs.registerTenant(rg, tenant);
+
+ Assert.assertNotNull(rgs.getAggregateLocalUsagePeriodicTask(),
+ "Aggregate task should start on first tenant attachment.");
+ Assert.assertNotNull(rgs.getCalculateQuotaPeriodicTask(),
+ "Quota task should start on first tenant attachment.");
+
Assert.assertFalse(rgs.getAggregateLocalUsagePeriodicTask().isCancelled(),
+ "Aggregate task should be running.");
+
Assert.assertFalse(rgs.getCalculateQuotaPeriodicTask().isCancelled(),
+ "Quota task should be running.");
+
+ // Detach and ensure schedulers stop
+ rgs.unRegisterTenant(rg, tenant);
+ Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(),
+ "Aggregate task should be cleared after last detach.");
+ Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(),
+ "Quota task should be cleared after last detach.");
+
+ rgs.resourceGroupDelete(rg);
+ Assert.assertEquals(rgs.getNumResourceGroups(), 0, "Resource group
should be deleted.");
+ }
+ }
+
+ /**
+ * Tests scheduler lifecycle when a ResourceGroup has both tenant and
namespace attachments.
+ * Verifies that schedulers remain active as long as ANY attachment
exists, and only stop
+ * when the last attachment (tenant or namespace) is removed.
+ */
+ @Test(timeOut = 60000)
+ public void testStopOnLastDetachWithMixedRefs() throws Exception {
+ final String rg = "rg-mixed";
+ final String tenant = "t-mixed";
+ final NamespaceName ns = NamespaceName.get("t-mixed/ns1");
+
+ try (ResourceGroupService rgs = createResourceGroupService()) {
+ rgs.resourceGroupCreate(rg, new
org.apache.pulsar.common.policies.data.ResourceGroup());
+
+ // Attach both a tenant and a namespace
+ rgs.registerTenant(rg, tenant);
+ rgs.registerNameSpace(rg, ns);
+
+ Assert.assertNotNull(rgs.getAggregateLocalUsagePeriodicTask(),
"Aggregate task should be started.");
+ Assert.assertNotNull(rgs.getCalculateQuotaPeriodicTask(), "Quota
task should be started.");
+
+ // Remove one reference. Tasks should still be present.
+ rgs.unRegisterTenant(rg, tenant);
+ Assert.assertNotNull(rgs.getAggregateLocalUsagePeriodicTask(),
+ "Aggregate task should remain with remaining namespace
attachment.");
+ Assert.assertNotNull(rgs.getCalculateQuotaPeriodicTask(),
+ "Quota task should remain with remaining namespace
attachment.");
+
+ // Remove last reference. Tasks should stop.
+ rgs.unRegisterNameSpace(rg, ns);
+ Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(),
+ "Aggregate task should be cleared after last detach.");
+ Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(),
+ "Quota task should be cleared after last detach.");
+
+ rgs.resourceGroupDelete(rg);
+ Assert.assertEquals(rgs.getNumResourceGroups(), 0, "Resource group
should be deleted.");
+ }
+ }
+
+ /**
+ * Ensures that dynamic configuration changes to the publish interval do
NOT cause
+ * scheduler initialization when no attachments exist. This validates the
guard logic
+ * that prevents spurious rescheduling attempts on stopped schedulers.
+ */
+ @Test(timeOut = 60000)
+ public void testNoRescheduleWhenStopped() throws Exception {
+ final int oldInterval =
this.conf.getResourceUsageTransportPublishIntervalInSecs();
+
+ try (ResourceGroupService rgs = createResourceGroupService()) {
+ // Ensure stopped state
+ Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(),
"Aggregate task should be null.");
+ Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(), "Quota task
should be null.");
+
+ // Change interval while stopped
+
this.conf.setResourceUsageTransportPublishIntervalInSecs(oldInterval + 13);
+
+ // Call both periodic methods directly. Futures must remain null.
+ rgs.aggregateResourceGroupLocalUsages();
+ rgs.calculateQuotaForAllResourceGroups();
+ Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(),
"Aggregate task should remain null.");
+ Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(), "Quota task
should remain null.");
+ } finally {
+
this.conf.setResourceUsageTransportPublishIntervalInSecs(oldInterval);
+ }
+ }
+
+
}
\ No newline at end of file