This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 5eb0ec943f3 [fix][sec] Add a check for the input time value (#22023)
5eb0ec943f3 is described below
commit 5eb0ec943f31e3f7c3ed6fde294f3b00f749dae2
Author: Xiangying Meng <[email protected]>
AuthorDate: Mon Feb 26 23:40:02 2024 +0800
[fix][sec] Add a check for the input time value (#22023)
---
.../main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java | 5 ++++-
.../bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java | 2 ++
.../org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java | 2 ++
.../src/main/java/org/apache/pulsar/broker/stats/DimensionStats.java | 2 ++
.../pulsar/broker/stats/prometheus/metrics/LongAdderCounter.java | 2 ++
.../java/org/apache/pulsar/compaction/CompactionRetentionTest.java | 4 +++-
.../main/java/org/apache/pulsar/client/api/ClientConfiguration.java | 1 +
.../java/org/apache/pulsar/client/api/ConsumerConfiguration.java | 1 +
.../apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java | 4 ++++
.../org/apache/pulsar/client/admin/internal/TransactionsImpl.java | 1 +
.../main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java | 1 +
.../main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java | 2 ++
.../src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java | 4 ++++
.../main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java | 1 +
.../org/apache/pulsar/client/impl/ControlledClusterFailover.java | 1 +
.../main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java | 1 +
.../org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java | 2 ++
.../java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java | 1 +
.../pulsar/client/impl/transaction/TransactionBuilderImpl.java | 2 ++
.../src/main/java/org/apache/pulsar/client/util/ObjectCache.java | 2 ++
.../main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java | 2 ++
.../main/java/org/apache/bookkeeper/client/TestStatsProvider.java | 2 ++
22 files changed, 43 insertions(+), 2 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 0c93a5b642c..6ee9c2f9492 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -170,6 +170,7 @@ public class ManagedLedgerConfig {
* the time unit
*/
public void setMinimumRolloverTime(int minimumRolloverTime, TimeUnit unit)
{
+ checkArgument(minimumRolloverTime >= 0);
this.minimumRolloverTimeMs = (int) unit.toMillis(minimumRolloverTime);
checkArgument(maximumRolloverTimeMs >= minimumRolloverTimeMs,
"Minimum rollover time needs to be less than maximum rollover
time");
@@ -195,6 +196,7 @@ public class ManagedLedgerConfig {
* the time unit
*/
public void setMaximumRolloverTime(int maximumRolloverTime, TimeUnit unit)
{
+ checkArgument(maximumRolloverTime >= 0);
this.maximumRolloverTimeMs = unit.toMillis(maximumRolloverTime);
checkArgument(maximumRolloverTimeMs >= minimumRolloverTimeMs,
"Maximum rollover time needs to be greater than minimum
rollover time");
@@ -411,7 +413,8 @@ public class ManagedLedgerConfig {
* time unit for retention time
*/
public ManagedLedgerConfig setRetentionTime(int retentionTime, TimeUnit
unit) {
- this.retentionTimeMs = unit.toMillis(retentionTime);
+ checkArgument(retentionTime >= -1, "The retention time should be -1, 0
or value > 0");
+ this.retentionTimeMs = retentionTime != -1 ?
unit.toMillis(retentionTime) : -1;
return this;
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java
index cf3d7142d61..5a6bc8017b7 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl;
+import static com.google.common.base.Preconditions.checkArgument;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean;
@@ -41,6 +42,7 @@ public class ManagedLedgerFactoryMBeanImpl implements
ManagedLedgerFactoryMXBean
}
public void refreshStats(long period, TimeUnit unit) {
+ checkArgument(period >= 0);
double seconds = unit.toMillis(period) / 1000.0;
if (seconds <= 0.0) {
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
index e057dee9953..7884add9552 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl;
+import static com.google.common.base.Preconditions.checkArgument;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -63,6 +64,7 @@ public class ManagedLedgerMBeanImpl implements
ManagedLedgerMXBean {
}
public void refreshStats(long period, TimeUnit unit) {
+ checkArgument(period >= 0);
double seconds = unit.toMillis(period) / 1000.0;
if (seconds <= 0.0) {
// skip refreshing stats
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/DimensionStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/DimensionStats.java
index 1b6f981ca4e..54965e4c783 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/DimensionStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/DimensionStats.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.stats;
+import static com.google.common.base.Preconditions.checkArgument;
import static io.prometheus.client.CollectorRegistry.defaultRegistry;
import io.prometheus.client.Collector;
import io.prometheus.client.Summary;
@@ -70,6 +71,7 @@ public class DimensionStats {
}
public void recordDimensionTimeValue(long latency, TimeUnit unit) {
+ checkArgument(latency >= 0);
summary.observe(unit.toMillis(latency));
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/LongAdderCounter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/LongAdderCounter.java
index 8ade2bc883f..c2816f5a2a0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/LongAdderCounter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/LongAdderCounter.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.stats.prometheus.metrics;
+import static com.google.common.base.Preconditions.checkArgument;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.stats.Counter;
@@ -57,6 +58,7 @@ public class LongAdderCounter implements Counter {
@Override
public void addLatency(long eventLatency, TimeUnit unit) {
+ checkArgument(eventLatency >= 0);
long valueMillis = unit.toMillis(eventLatency);
counter.add(valueMillis);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
index 98bf2b819c2..88d923f74e1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
@@ -257,7 +257,9 @@ public class CompactionRetentionTest extends
MockedPulsarServiceBaseTest {
ManagedLedgerConfig config = pulsar.getBrokerService()
.getManagedLedgerConfig(TopicName.get(topicName)).get();
Assert.assertEquals(config.getRetentionSizeInMB(),
retentionPolicies.getRetentionSizeInMB());
-
Assert.assertEquals(config.getRetentionTimeMillis(),retentionPolicies.getRetentionTimeInMinutes()
* 60000L);
+ Assert.assertEquals(config.getRetentionTimeMillis(),
retentionPolicies.getRetentionTimeInMinutes() < 0
+ ? retentionPolicies.getRetentionTimeInMinutes()
+ : retentionPolicies.getRetentionTimeInMinutes() *
60000L);
}
private void testCompactionCursorRetention(String topic) throws Exception {
diff --git
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
index 3b0efe64cf5..ea2bba166e6 100644
---
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
+++
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
@@ -368,6 +368,7 @@ public class ClientConfiguration implements Serializable {
* @param unit the time unit in which the duration is defined
*/
public void setConnectionTimeout(int duration, TimeUnit unit) {
+ checkArgument(duration >= 0);
confData.setConnectionTimeoutMs((int) unit.toMillis(duration));
}
diff --git
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
index 81956db56f7..f2101b28704 100644
---
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
+++
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -69,6 +69,7 @@ public class ConsumerConfiguration implements Serializable {
* @return {@link ConsumerConfiguration}
*/
public ConsumerConfiguration setAckTimeout(long ackTimeout, TimeUnit
timeUnit) {
+ checkArgument(ackTimeout >= 0);
long ackTimeoutMillis = timeUnit.toMillis(ackTimeout);
checkArgument(ackTimeoutMillis >= minAckTimeoutMillis,
"Ack timeout should be should be greater than " +
minAckTimeoutMillis + " ms");
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
index 009fa67fbaa..a9d913c0164 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.admin.internal;
+import static com.google.common.base.Preconditions.checkArgument;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -200,18 +201,21 @@ public class PulsarAdminBuilderImpl implements
PulsarAdminBuilder {
@Override
public PulsarAdminBuilder connectionTimeout(int connectionTimeout,
TimeUnit connectionTimeoutUnit) {
+ checkArgument(connectionTimeout >= 0);
this.conf.setConnectionTimeoutMs((int)
connectionTimeoutUnit.toMillis(connectionTimeout));
return this;
}
@Override
public PulsarAdminBuilder readTimeout(int readTimeout, TimeUnit
readTimeoutUnit) {
+ checkArgument(readTimeout >= 0);
this.conf.setReadTimeoutMs((int)
readTimeoutUnit.toMillis(readTimeout));
return this;
}
@Override
public PulsarAdminBuilder requestTimeout(int requestTimeout, TimeUnit
requestTimeoutUnit) {
+ checkArgument(requestTimeout >= 0);
this.conf.setRequestTimeoutMs((int)
requestTimeoutUnit.toMillis(requestTimeout));
return this;
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
index 2d1dd408ef6..046ef3eb370 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
@@ -168,6 +168,7 @@ public class TransactionsImpl extends BaseResource
implements Transactions {
@Override
public CompletableFuture<Map<String, TransactionMetadata>>
getSlowTransactionsByCoordinatorIdAsync(
Integer coordinatorId, long timeout, TimeUnit timeUnit) {
+ checkArgument(timeout >= 0);
WebTarget path = adminV3Transactions.path("slowTransactions");
path = path.path(timeUnit.toMillis(timeout) + "");
if (coordinatorId != null) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
index 68b781e67d2..a1017e66760 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
@@ -329,6 +329,7 @@ public class AutoClusterFailover implements
ServiceUrlProvider {
@Override
public AutoClusterFailoverBuilder checkInterval(long interval,
TimeUnit timeUnit) {
+ checkArgument(interval >= 0L, "check interval time must not be
negative.");
this.checkIntervalMs = timeUnit.toMillis(interval);
return this;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 9a86d81c93f..48b3b2a6bda 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -168,6 +168,7 @@ public class ClientBuilderImpl implements ClientBuilder {
@Override
public ClientBuilder lookupTimeout(int lookupTimeout, TimeUnit unit) {
+ checkArgument(lookupTimeout >= 0, "lookupTimeout must not be
negative");
conf.setLookupTimeoutMs(unit.toMillis(lookupTimeout));
return this;
}
@@ -333,6 +334,7 @@ public class ClientBuilderImpl implements ClientBuilder {
@Override
public ClientBuilder connectionTimeout(int duration, TimeUnit unit) {
+ checkArgument(duration >= 0, "connectionTimeout needs to be >= 0");
conf.setConnectionTimeoutMs((int) unit.toMillis(duration));
return this;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 05081dcaa07..d45c125c590 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -449,6 +449,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
@Override
public void reconsumeLater(Message<?> message, long delayTime, TimeUnit
unit) throws PulsarClientException {
+ checkArgument(delayTime >= 0, "The delay time must not be negative.");
reconsumeLater(message, null, delayTime, unit);
}
@@ -563,6 +564,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
@Override
public CompletableFuture<Void> reconsumeLaterAsync(
Message<?> message, Map<String, String> customProperties, long
delayTime, TimeUnit unit) {
+ checkArgument(delayTime >= 0, "The delay time must not be negative.");
if (!conf.isRetryEnable()) {
return FutureUtil.failedFuture(new
PulsarClientException(RECONSUME_LATER_ERROR_MSG));
}
@@ -599,12 +601,14 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
@Override
public CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?>
message, long delayTime, TimeUnit unit) {
+ checkArgument(delayTime >= 0, "The delay time must not be negative.");
return reconsumeLaterCumulativeAsync(message, null, delayTime, unit);
}
@Override
public CompletableFuture<Void> reconsumeLaterCumulativeAsync(
Message<?> message, Map<String, String> customProperties, long
delayTime, TimeUnit unit) {
+ checkArgument(delayTime >= 0, "The delay time must not be negative.");
if (!conf.isRetryEnable()) {
return FutureUtil.failedFuture(new
PulsarClientException(RECONSUME_LATER_ERROR_MSG));
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index f644c6a1839..895273a90c0 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -497,6 +497,7 @@ public class ConsumerBuilderImpl<T> implements
ConsumerBuilder<T> {
@Override
public ConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(long
duration, TimeUnit unit) {
+ checkArgument(duration >= 0, "expired time of incomplete chunk message
must not be negative");
conf.setExpireTimeOfIncompleteChunkedMessageMillis(unit.toMillis(duration));
return this;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
index 080d328e3f0..9d30108ec7a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
@@ -236,6 +236,7 @@ public class ControlledClusterFailover implements
ServiceUrlProvider {
@Override
public ControlledClusterFailoverBuilder checkInterval(long interval,
@NonNull TimeUnit timeUnit) {
+ checkArgument(interval >= 0, "The check interval time must not be
negative.");
this.interval = timeUnit.toMillis(interval);
return this;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index 2860cda0cee..fd01cef9a21 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -274,6 +274,7 @@ public class ReaderBuilderImpl<T> implements
ReaderBuilder<T> {
@Override
public ReaderBuilder<T> expireTimeOfIncompleteChunkedMessage(long
duration, TimeUnit unit) {
+ checkArgument(duration >= 0, "The expired time must not be negative.");
conf.setExpireTimeOfIncompleteChunkedMessageMillis(unit.toMillis(duration));
return this;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 0b5174a0151..4ea6472b9d8 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;
+import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
@@ -205,6 +206,7 @@ public class TransactionMetaStoreHandler extends
HandlerState
}
public CompletableFuture<TxnID> newTransactionAsync(long timeout, TimeUnit
unit) {
+ checkArgument(timeout >= 0, "The timeout must not be negative.");
if (LOG.isDebugEnabled()) {
LOG.debug("New transaction with timeout in ms {}",
unit.toMillis(timeout));
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index 026f8a1e69e..895949fdf32 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -212,6 +212,7 @@ public class TypedMessageBuilderImpl<T> implements
TypedMessageBuilder<T> {
@Override
public TypedMessageBuilder<T> deliverAfter(long delay, TimeUnit unit) {
+ checkArgument(delay >= 0, "The delay time must not be negative.");
return deliverAt(System.currentTimeMillis() + unit.toMillis(delay));
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
index 0ebfb91e62d..bb0c4968fc7 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl.transaction;
+import static com.google.common.base.Preconditions.checkArgument;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@@ -45,6 +46,7 @@ public class TransactionBuilderImpl implements
TransactionBuilder {
@Override
public TransactionBuilder withTransactionTimeout(long txnTimeout, TimeUnit
timeoutUnit) {
+ checkArgument(txnTimeout >= 0, "The txn timeout must not be
negative.");
this.txnTimeout = txnTimeout;
this.timeUnit = timeoutUnit;
return this;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ObjectCache.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ObjectCache.java
index dc057ffe32d..cf0620edf98 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ObjectCache.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ObjectCache.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.util;
+import static com.google.common.base.Preconditions.checkArgument;
import java.time.Clock;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@@ -33,6 +34,7 @@ public class ObjectCache<T> implements Supplier<T> {
public ObjectCache(Supplier<T> supplier, long cacheDuration, TimeUnit
unit) {
this(supplier, cacheDuration, unit, Clock.systemUTC());
+ checkArgument(cacheDuration >= 0, "The cache duration must not be
negative.");
}
ObjectCache(Supplier<T> supplier, long cacheDuration, TimeUnit unit, Clock
clock) {
diff --git
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
index f0d279ef250..998ef73fbd3 100644
---
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
+++
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.client;
+import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
@@ -364,6 +365,7 @@ public class PulsarMockBookKeeper extends BookKeeper {
}
public synchronized void addEntryDelay(long delay, TimeUnit unit) {
+ checkArgument(delay >= 0, "The delay time must not be negative.");
addEntryDelaysMillis.add(unit.toMillis(delay));
}
diff --git
a/testmocks/src/main/java/org/apache/bookkeeper/client/TestStatsProvider.java
b/testmocks/src/main/java/org/apache/bookkeeper/client/TestStatsProvider.java
index 4d08a7f80df..cf08cc63510 100644
---
a/testmocks/src/main/java/org/apache/bookkeeper/client/TestStatsProvider.java
+++
b/testmocks/src/main/java/org/apache/bookkeeper/client/TestStatsProvider.java
@@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.client;
+import static com.google.common.base.Preconditions.checkArgument;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -65,6 +66,7 @@ public class TestStatsProvider implements StatsProvider {
@Override
public void addLatency(long eventLatency, TimeUnit unit) {
+ checkArgument(eventLatency >= 0, "The event latency must not be
negative.");
long valueMillis = unit.toMillis(eventLatency);
updateMax(val.addAndGet(valueMillis));
}