codelipenghui commented on code in PR #21946:
URL: https://github.com/apache/pulsar/pull/21946#discussion_r1473766679
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##########
@@ -153,4 +179,203 @@ public void
testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception
admin2.topics().delete(topicName);
});
}
+
+ private void injectMockReplicatorProducerBuilder(
+ BiFunction<ProducerConfigurationData,
ProducerImpl, ProducerImpl> producerDecorator)
+ throws Exception {
+ String cluster2 = pulsar2.getConfig().getClusterName();
+ BrokerService brokerService = pulsar1.getBrokerService();
+ // Wait for the internal client created.
+ final String topicNameTriggerInternalClientCreate =
+ BrokerTestUtil.newUniqueName("persistent://" +
defaultNamespace + "/tp_");
+
admin1.topics().createNonPartitionedTopic(topicNameTriggerInternalClientCreate);
+ waitReplicatorStarted(topicNameTriggerInternalClientCreate);
+ cleanupTopics(() -> {
+ admin1.topics().delete(topicNameTriggerInternalClientCreate);
+ admin2.topics().delete(topicNameTriggerInternalClientCreate);
+ });
+
+ // Inject spy client.
+ ConcurrentOpenHashMap<String, PulsarClient>
+ replicationClients =
WhiteboxImpl.getInternalState(brokerService, "replicationClients");
+ PulsarClientImpl internalClient = (PulsarClientImpl)
replicationClients.get(cluster2);
+ PulsarClient spyClient = spy(internalClient);
+ replicationClients.put(cluster2, spyClient);
+
+ // Inject producer decorator.
+ doAnswer(invocation -> {
+ Schema schema = (Schema) invocation.getArguments()[0];
+ ProducerBuilderImpl producerBuilder = (ProducerBuilderImpl)
internalClient.newProducer(schema);
+ ProducerBuilder spyProducerBuilder = spy(producerBuilder);
+ doAnswer(ignore -> {
+ CompletableFuture<Producer> producerFuture = new
CompletableFuture<>();
+ final ProducerImpl p = (ProducerImpl) producerBuilder.create();
+ new FastThreadLocalThread(() -> {
+ try {
+ ProducerImpl newProducer =
producerDecorator.apply(producerBuilder.getConf(), p);
+ producerFuture.complete(newProducer);
+ } catch (Exception ex) {
+ producerFuture.completeExceptionally(ex);
+ }
+ }).start();
+ return producerFuture;
+ }).when(spyProducerBuilder).createAsync();
+ return spyProducerBuilder;
+ }).when(spyClient).newProducer(any(Schema.class));
+ }
+
+ private SpyCursor spyCursor(PersistentTopic persistentTopic, String
cursorName) throws Exception {
+ ManagedLedgerImpl ml = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
+ ManagedCursorImpl cursor = (ManagedCursorImpl)
ml.getCursors().get(cursorName);
+ ManagedCursorImpl spyCursor = spy(cursor);
+ // remove cursor.
+ ml.getCursors().removeCursor(cursorName);
+ ml.deactivateCursor(cursor);
+ // Add the spy one. addCursor(ManagedCursorImpl cursor)
+ Method m = ManagedLedgerImpl.class.getDeclaredMethod("addCursor", new
Class[]{ManagedCursorImpl.class});
+ m.setAccessible(true);
+ m.invoke(ml, new Object[]{spyCursor});
+ return new SpyCursor(cursor, spyCursor);
+ }
+
+ @Data
+ @AllArgsConstructor
+ static class SpyCursor {
+ ManagedCursorImpl original;
+ ManagedCursorImpl spy;
+ }
+
+ private CursorCloseSignal makeCursorClosingDelay(SpyCursor spyCursor)
throws Exception {
+ CountDownLatch startCloseSignal = new CountDownLatch(1);
+ CountDownLatch startCallbackSignal = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ AsyncCallbacks.CloseCallback originalCallback =
(AsyncCallbacks.CloseCallback) invocation.getArguments()[0];
+ Object ctx = invocation.getArguments()[1];
+ AsyncCallbacks.CloseCallback newCallback = new
AsyncCallbacks.CloseCallback() {
+ @Override
+ public void closeComplete(Object ctx) {
+ new FastThreadLocalThread(new Runnable() {
+ @Override
+ @SneakyThrows
+ public void run() {
+ startCallbackSignal.await();
+ originalCallback.closeComplete(ctx);
+ }
+ }).start();
+ }
+
+ @Override
+ public void closeFailed(ManagedLedgerException exception,
Object ctx) {
+ new FastThreadLocalThread(new Runnable() {
+ @Override
+ @SneakyThrows
+ public void run() {
+ startCallbackSignal.await();
+ originalCallback.closeFailed(exception, ctx);
+ }
+ }).start();
+ }
+ };
+ startCloseSignal.await();
+ spyCursor.original.asyncClose(newCallback, ctx);
+ return null;
+
}).when(spyCursor.spy).asyncClose(any(AsyncCallbacks.CloseCallback.class),
any());
+ return new CursorCloseSignal(startCloseSignal, startCallbackSignal);
+ }
+
+ @AllArgsConstructor
+ static class CursorCloseSignal {
+ CountDownLatch startCloseSignal;
+ CountDownLatch startCallbackSignal;
+
+ void startClose() {
+ startCloseSignal.countDown();
+ }
+
+ void startCallback() {
+ startCallbackSignal.countDown();
+ }
+ }
+
+ /**
+ * See the description and execution flow:
https://github.com/apache/pulsar/pull/21946.
+ * Steps:
+ * - Create topic, but the internal producer of Replicator created failed.
+ * - Unload bundle, the Replicator will be closed, but the internal
producer creation retry has not executed yet.
+ * - The internal producer creation retry execute successfully, the
"repl.cursor" has not been closed yet.
+ * - The topic is wholly closed.
+ * - Verify: the delayed created internal producer will be closed.
+ */
+ @Test
+ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws
Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ defaultNamespace + "/tp_");
+ // Inject an error for "replicator.producer" creation.
+ // The delay time of next retry to create producer is below:
+ // 0.1s, 0.2, 0.4, 0.8, 1.6s, 3.2s, 6.4s...
+ // If the retry counter is larger than 6, the next creation will be
slow enough to close Replicator.
+ final AtomicInteger createProducerCounter = new AtomicInteger();
+ final int failTimes = 6;
+ injectMockReplicatorProducerBuilder((producerCnf, orginalProducer) -> {
+ if (topicName.equals(producerCnf.getTopicName())) {
+ // There is a switch to determine create producer successfully
or not.
+ if (createProducerCounter.incrementAndGet() > failTimes) {
+ return orginalProducer;
+ }
+ log.info("Retry create replicator.producer count: {}",
createProducerCounter);
+ // Release producer and fail callback.
+ orginalProducer.closeAsync();
+ throw new RuntimeException("mock error");
+ }
+ return orginalProducer;
+ });
+
+ // Create topic.
+ admin1.topics().createNonPartitionedTopic(topicName);
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+ PersistentReplicator replicator =
+ (PersistentReplicator)
persistentTopic.getReplicators().values().iterator().next();
+ // Since we inject a producer creation error, the replicator can not
start successfully.
+ assertFalse(replicator.isConnected());
+
+ // Stuck the closing of the cursor("pulsar.repl"), until the internal
producer of the replicator started.
+ SpyCursor spyCursor =
+ spyCursor(persistentTopic, "pulsar.repl." +
pulsar2.getConfig().getClusterName());
+ CursorCloseSignal cursorCloseSignal =
makeCursorClosingDelay(spyCursor);
+
+ // Unload bundle: call "topic.close(false)".
+ // Stuck start new producer, until the state of replicator change to
Stopped.
+ // The next once of "createProducerSuccessAfterFailTimes" to create
producer will be successfully.
+
Awaitility.await().pollInterval(Duration.ofMillis(100)).atMost(Duration.ofSeconds(60)).untilAsserted(()
-> {
+ assertTrue(createProducerCounter.get() >= failTimes);
+ });
+ CompletableFuture<Void> topicCloseFuture = persistentTopic.close(true);
+ Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
+ String state = String.valueOf(replicator.getState());
+ assertTrue(state.equals("Stopped") || state.equals("Terminated"));
+ });
+
+ // Delay close cursor, until "replicator.producer" create successfully.
+ // The next once retry time of create "replicator.producer" will be
3.2s.
+ Thread.sleep(4 * 1000);
+ log.info("Replicator.state: {}", replicator.getState());
+ cursorCloseSignal.startClose();
+ cursorCloseSignal.startCallback();
Review Comment:
Ok, but why the producer will not be closed in this case?
<img width="914" alt="image"
src="https://github.com/apache/pulsar/assets/12592133/73ef2bbd-778e-4054-9b1e-7dad439ef282">
It should be finally closed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]