hangc0276 commented on a change in pull request #8717:
URL: https://github.com/apache/pulsar/pull/8717#discussion_r548548467
##########
File path:
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
##########
@@ -39,7 +40,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-
+import lombok.val;
Review comment:
please remove this import
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -1262,6 +1263,12 @@ public void openLedgerFailed(ManagedLedgerException
exception, Object ctx) {
topicLevelOffloadPolicies,
OffloadPolicies.oldPoliciesCompatible(nsLevelOffloadPolicies,
policies.orElse(null)),
getPulsar().getConfig().getProperties());
+
+ if (offloadPolicies != null &&
serviceConfig.getManagedLedgerDataReadPriority() != null) {
+ offloadPolicies.setManagedLedgerOffloadedReadPriority(
Review comment:
Maybe we should check whether offloadPolicies already have
`ManagedLedgerOffloadedReadPriority` value.
##########
File path:
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
##########
@@ -69,53 +71,140 @@ public void testOffloadRead() throws Exception {
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
config.setLedgerOffloader(offloader);
- ManagedLedgerImpl ledger =
(ManagedLedgerImpl)factory.open("my_test_ledger", config);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("my_test_ledger", config);
for (int i = 0; i < 25; i++) {
String content = "entry-" + i;
ledger.addEntry(content.getBytes());
}
- Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+ assertEquals(ledger.getLedgersInfoAsList().size(), 3);
ledger.offloadPrefix(ledger.getLastConfirmedEntry());
- Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 3);
- Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
- .filter(e ->
e.getOffloadContext().getComplete()).count(), 2);
+ assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+ assertEquals(ledger.getLedgersInfoAsList().stream()
+ .filter(e -> e.getOffloadContext().getComplete()).count(), 2);
Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getComplete());
Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getComplete());
Review comment:
`Assert.assertTrue` may sync with `assertEquals `
##########
File path:
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
##########
@@ -23,22 +23,19 @@
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
Review comment:
the blank line may not delete.
##########
File path:
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
##########
@@ -1434,15 +1433,19 @@
+ "Of course, this may degrade consumption throughput.
Default is 10ms.")
private int managedLedgerNewEntriesCheckDelayInMillis = 10;
+ @FieldContext(category = CATEGORY_STORAGE_ML,
+ doc = "Read priority when ledgers exists in both bookkeeper and
the second layer storage.")
+ private String managedLedgerDataReadPriority =
OffloadPolicies.OffloadedReadPriority.OFFLOADED_FIRST.getValue();
+
/*** --- Load balancer --- ****/
@FieldContext(
- category = CATEGORY_LOAD_BALANCER,
- doc = "Enable load balancer"
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Enable load balancer"
)
private boolean loadBalancerEnabled = true;
@Deprecated
@FieldContext(
- category = CATEGORY_LOAD_BALANCER,
+ category = CATEGORY_LOAD_BALANCER,
Review comment:
no blank
##########
File path:
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
##########
@@ -23,22 +23,19 @@
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
import com.google.common.collect.ImmutableSet;
-
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
-
Review comment:
same with upper.
##########
File path:
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
##########
@@ -23,22 +23,19 @@
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
import com.google.common.collect.ImmutableSet;
-
Review comment:
the blank line may not delete.
##########
File path:
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
##########
@@ -21,9 +21,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-
Review comment:
may not delete the new line.
##########
File path:
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
##########
@@ -25,13 +25,12 @@
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
-
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
Review comment:
add new blank
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]