This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a4c6c4187d4 [improve][broker] make managedLedgerOffloadedReadPriority
compatible with broker property (#17803)
a4c6c4187d4 is described below
commit a4c6c4187d4a43326ec421577e394b596ab2cac3
Author: YingQun Zhong <[email protected]>
AuthorDate: Tue Nov 1 09:35:21 2022 +0800
[improve][broker] make managedLedgerOffloadedReadPriority compatible with
broker property (#17803)
---
conf/broker.conf | 4 ++
.../common/policies/data/OffloadPoliciesImpl.java | 9 ++++-
.../common/policies/data/OffloadPoliciesTest.java | 46 ++++++++++++++++++++++
3 files changed, 57 insertions(+), 2 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 76f75a7d825..85ffc11d34f 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1149,6 +1149,10 @@ managedLedgerOffloadDeletionLagMs=14400000
# (default is -1, which is disabled)
managedLedgerOffloadAutoTriggerSizeThresholdBytes=-1
+# Read priority when ledgers exists in both bookkeeper and the second layer
storage
+# (tiered-storage-first/bookkeeper-first, default is tiered-storage-first)
+managedLedgerDataReadPriority=tiered-storage-first
+
# The number of seconds before triggering automatic offload to long term
storage
# (default is -1, which is disabled)
managedLedgerOffloadThresholdInSeconds=-1
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
index 9b4e8506c89..4eee063cc96 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
@@ -75,6 +75,7 @@ public class OffloadPoliciesImpl implements Serializable,
OffloadPolicies {
public static final String OFFLOAD_THRESHOLD_NAME_IN_CONF_FILE =
"managedLedgerOffloadAutoTriggerSizeThresholdBytes";
public static final String DELETION_LAG_NAME_IN_CONF_FILE =
"managedLedgerOffloadDeletionLagMs";
+ public static final String DATA_READ_PRIORITY_NAME_IN_CONF_FILE =
"managedLedgerDataReadPriority";
public static final OffloadedReadPriority DEFAULT_OFFLOADED_READ_PRIORITY =
OffloadedReadPriority.TIERED_STORAGE_FIRST;
@@ -261,9 +262,10 @@ public class OffloadPoliciesImpl implements Serializable,
OffloadPolicies {
Long.parseLong(properties.getProperty(DELETION_LAG_NAME_IN_CONF_FILE)));
}
- if (properties.containsKey("managedLedgerDataReadPriority")) {
+ if (!properties.containsKey("managedLedgerOffloadedReadPriority")
+ &&
properties.containsKey(DATA_READ_PRIORITY_NAME_IN_CONF_FILE)) {
setManagedLedgerOffloadedReadPriority(
-
OffloadedReadPriority.fromString(properties.getProperty("managedLedgerDataReadPriority")));
+
OffloadedReadPriority.fromString(properties.getProperty(DATA_READ_PRIORITY_NAME_IN_CONF_FILE)));
}
}
@@ -479,6 +481,9 @@ public class OffloadPoliciesImpl implements Serializable,
OffloadPolicies {
} else if
(field.getName().equals("managedLedgerOffloadDeletionLagInMillis")) {
object =
properties.getProperty("managedLedgerOffloadDeletionLagInMillis",
properties.getProperty(DELETION_LAG_NAME_IN_CONF_FILE));
+ } else if
(field.getName().equals("managedLedgerOffloadedReadPriority")) {
+ object =
properties.getProperty("managedLedgerOffloadedReadPriority",
+
properties.getProperty(DATA_READ_PRIORITY_NAME_IN_CONF_FILE));
} else {
object = properties.get(field.getName());
}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
index f6496af2cfe..c0d45389821 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
@@ -308,4 +308,50 @@ public class OffloadPoliciesTest {
Assert.assertNull(offloadPolicies.getS3ManagedLedgerOffloadRegion());
}
+
+ @Test
+ public void brokerPropertyCompatibleTest() {
+ final Long brokerOffloadThreshold = 0L;
+ final Long brokerDeletionLag = 2000L;
+ final String brokerReadPriority = "bookkeeper-first";
+
+ // 1. mergeConfiguration test
+ Properties brokerProperties = new Properties();
+
brokerProperties.setProperty("managedLedgerOffloadAutoTriggerSizeThresholdBytes",
"" + brokerOffloadThreshold);
+ brokerProperties.setProperty("managedLedgerOffloadDeletionLagMs", "" +
brokerDeletionLag);
+ brokerProperties.setProperty("managedLedgerDataReadPriority", "" +
brokerReadPriority);
+ OffloadPoliciesImpl offloadPolicies =
+ OffloadPoliciesImpl.mergeConfiguration(null, null, brokerProperties);
+ Assert.assertNotNull(offloadPolicies);
+
Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(),
brokerOffloadThreshold);
+
Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(),
brokerDeletionLag);
+
Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().toString(),
brokerReadPriority);
+
+ // 2. compatibleWithBrokerConfigFile test
+ brokerProperties = new Properties();
+
brokerProperties.setProperty("managedLedgerOffloadAutoTriggerSizeThresholdBytes",
"" + brokerOffloadThreshold);
+ brokerProperties.setProperty("managedLedgerOffloadDeletionLagMs", "" +
brokerDeletionLag);
+ brokerProperties.setProperty("managedLedgerDataReadPriority", "" +
brokerReadPriority);
+ offloadPolicies = OffloadPoliciesImpl.create(brokerProperties);
+ Assert.assertNotNull(offloadPolicies);
+
Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(),
brokerOffloadThreshold);
+
Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(),
brokerDeletionLag);
+
Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().toString(),
brokerReadPriority);
+
+
+ brokerProperties = new Properties();
+ // 2.1 offload properties name in OffloadPoliciesImpl
+ brokerProperties.setProperty("managedLedgerOffloadThresholdInBytes",
"" + (brokerOffloadThreshold));
+
brokerProperties.setProperty("managedLedgerOffloadDeletionLagInMillis", "" +
(brokerDeletionLag));
+ brokerProperties.setProperty("managedLedgerOffloadedReadPriority", ""
+ (brokerReadPriority));
+ // 2.2 offload properties name in conf file
+
brokerProperties.setProperty("managedLedgerOffloadAutoTriggerSizeThresholdBytes",
"" + brokerOffloadThreshold + 30);
+ brokerProperties.setProperty("managedLedgerOffloadDeletionLagMs", "" +
brokerDeletionLag + 30);
+ brokerProperties.setProperty("managedLedgerDataReadPriority", "" +
"tiered-storage-first");
+ offloadPolicies = OffloadPoliciesImpl.create(brokerProperties);
+ Assert.assertNotNull(offloadPolicies);
+
Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(),
brokerOffloadThreshold);
+
Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(),
brokerDeletionLag);
+
Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().toString(),
brokerReadPriority);
+ }
}