michaeljmarshall commented on code in PR #17411:
URL: https://github.com/apache/pulsar/pull/17411#discussion_r997707881
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2361,8 +2361,11 @@ private void
maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
if (config.getLedgerOffloader() != null
&& config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
&& config.getLedgerOffloader().getOffloadPolicies() != null
- &&
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
!= null
- &&
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
>= 0) {
+ &&
((config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
!= null
+ &&
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
>= 0)
+ ||
(config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInMillis()
!= null
+ &&
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInMillis()
>= 0))
Review Comment:
I think the changes made in this file might belong in a separate PR. Would
you mind making a separate PR for them?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -1089,12 +1091,25 @@ protected void handleSubscribe(final CommandSubscribe
subscribe) {
}
Optional<Map<String, String>> subscriptionProperties =
SubscriptionOption.getPropertiesMap(
subscribe.getSubscriptionPropertiesList());
+
+ boolean createTopicIfDoesNotExist = forceTopicCreation ||
(isAuthorizedToCreateTopic
+ &&
service.isAllowAutoTopicCreation(topicName.toString()));
Review Comment:
I think we want the following in order to retain the existing logic:
```suggestion
boolean createTopicIfDoesNotExist =
isAuthorizedToCreateTopic || (forceTopicCreation
&&
service.isAllowAutoTopicCreation(topicName.toString()));
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -1307,7 +1326,25 @@ protected void handleProducer(final CommandProducer
cmdProducer) {
producerId, schema == null ? "absent" : "present");
}
- service.getOrCreateTopic(topicName.toString()).thenCompose((Topic
topic) -> {
+ boolean createTopicIfDoesNotExist = isAuthorizedToCreateTopic
+ && service.isAllowAutoTopicCreation(topicName.toString());
+ service.getTopic(topicName.toString(),
createTopicIfDoesNotExist).thenCompose(optTopic -> {
+ if (optTopic.isEmpty()) {
+ if (isAuthorizedToCreateTopic) {
+ return FutureUtil
+ .failedFuture(new TopicNotFoundException(
+ "Topic " + topicName + " does not
exist"));
+ } else {
+ String msg = "Topic to produce does not exists and the
Client is not"
+ + " authorized to create topic";
+ log.warn("[{}] {} with role {}", remoteAddress, msg,
getPrincipal());
+ ctx.writeAndFlush(Commands.newError(requestId,
ServerError.AuthorizationError,
+ msg));
+ return null;
Review Comment:
Similarly, we need to throw an exception here. The error handling includes
handling the dangling `producer` future, and we need to do that to prevent a
build up of unnecessary futures.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -1089,12 +1091,25 @@ protected void handleSubscribe(final CommandSubscribe
subscribe) {
}
Optional<Map<String, String>> subscriptionProperties =
SubscriptionOption.getPropertiesMap(
subscribe.getSubscriptionPropertiesList());
+
+ boolean createTopicIfDoesNotExist = forceTopicCreation ||
(isAuthorizedToCreateTopic
+ &&
service.isAllowAutoTopicCreation(topicName.toString()));
service.getTopic(topicName.toString(),
createTopicIfDoesNotExist)
.thenCompose(optTopic -> {
if (!optTopic.isPresent()) {
- return FutureUtil
- .failedFuture(new
TopicNotFoundException(
- "Topic " + topicName + " does
not exist"));
+ if (isAuthorizedToCreateTopic) {
+ return FutureUtil
+ .failedFuture(new
TopicNotFoundException(
+ "Topic " + topicName + "
does not exist"));
+ } else {
+ String msg = "Topic to subscribe does not
exists and the Client is not"
+ + " authorized to create topic";
+ log.warn("[{}] {} with role {}",
remoteAddress, msg, getPrincipal());
+ consumers.remove(consumerId,
consumerFuture);
+
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError,
+ msg));
+ return null;
Review Comment:
I don't think returning `null` here will give the desired behavior unless we
modify the `thenAccept` block. It seems like the current paradigm in this
section of the code is to throw an exception, so I think it would be
appropriate to do that here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]