eolivelli commented on a change in pull request #9351:
URL: https://github.com/apache/pulsar/pull/9351#discussion_r571019653
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
##########
@@ -856,172 +764,52 @@ protected void internalSetSubscriptionExpirationTime(int
expirationTime) {
if (expirationTime < 0) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid value
for subscription expiration time");
}
-
- Entry<Policies, Stat> policiesNode = null;
-
- try {
- // Force to read the data s.t. the watch to the cache content is
setup.
- policiesNode = policiesCache().getWithStat(path(POLICIES,
namespaceName.toString())).orElseThrow(
- () -> new RestException(Status.NOT_FOUND, "Namespace " +
namespaceName + " does not exist"));
- policiesNode.getKey().subscription_expiration_time_minutes =
expirationTime;
-
- // Write back the new policies into zookeeper
- globalZk().setData(path(POLICIES, namespaceName.toString()),
- jsonMapper().writeValueAsBytes(policiesNode.getKey()),
policiesNode.getValue().getVersion());
- policiesCache().invalidate(path(POLICIES,
namespaceName.toString()));
-
- log.info("[{}] Successfully updated the subscription expiration
time on namespace {}", clientAppId(),
- namespaceName);
- } catch (KeeperException.NoNodeException e) {
- log.warn("[{}] Failed to update the subscription expiration time
for namespace {}: does not exist",
- clientAppId(), namespaceName);
- throw new RestException(Status.NOT_FOUND, "Namespace does not
exist");
- } catch (KeeperException.BadVersionException e) {
- log.warn(
- "[{}] Failed to update the subscription expiration time on"
- + " namespace {} expected policy node version={} :
concurrent modification",
- clientAppId(), namespaceName,
policiesNode.getValue().getVersion());
- throw new RestException(Status.CONFLICT, "Concurrent
modification");
- } catch (Exception e) {
- log.error("[{}] Failed to update the subscription expiration time
on namespace {}", clientAppId(),
- namespaceName, e);
- throw new RestException(e);
- }
+ updatePolicies(path(POLICIES, namespaceName.toString()), (policies) ->
{
+ policies.subscription_expiration_time_minutes = expirationTime;
+ return policies;
+ });
}
protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse,
AutoTopicCreationOverride
autoTopicCreationOverride) {
final int maxPartitions =
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
-
- if
(!AutoTopicCreationOverride.isValidOverride(autoTopicCreationOverride)) {
- throw new RestException(Status.PRECONDITION_FAILED, "Invalid
configuration for autoTopicCreationOverride");
- }
- if (maxPartitions > 0 &&
autoTopicCreationOverride.defaultNumPartitions > maxPartitions) {
- throw new RestException(Status.NOT_ACCEPTABLE,
- "Number of partitions should be less than or equal to " +
maxPartitions);
+ if (autoTopicCreationOverride != null) {
Review comment:
this `if` statement was not present before, can it be `null` here ?
##########
File path:
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
##########
@@ -253,4 +255,23 @@ public void accept(Notification t) {
break;
}
}
+
+ private CompletableFuture<Void>
executeWithRetry(Supplier<CompletableFuture<Void>> op, String key) {
+ CompletableFuture<Void> result = new CompletableFuture<>();
+ op.get().thenAccept(r -> result.complete(null)).exceptionally((ex) -> {
+ if (ex.getCause() instanceof BadVersionException) {
+ // if resource is updated by other than metadata-cache then
metadata-cache will get bad-version
+ // exception. so, try to invalidate the cache and try one more
time.
+ objCache.synchronous().invalidate(key);
Review comment:
can we call here this `synchronous` operation ?
do we risk a deadlock ?
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
##########
@@ -2041,4 +2044,9 @@ public void testCompactionStatus() throws Exception {
assertTrue(admin.topics().compactionStatus(topicName)
.lastError.contains("Failed at something"));
}
+
+ private void clearCache() {
+ // (MetadataCacheImpl<ClusterData>) pulsar.getPulsarResources().
Review comment:
nit: remove comment
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]