poorbarcode commented on code in PR #21946:
URL: https://github.com/apache/pulsar/pull/21946#discussion_r1473733893
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##########
@@ -153,4 +179,203 @@ public void
testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception
admin2.topics().delete(topicName);
});
}
+
+ private void injectMockReplicatorProducerBuilder(
+ BiFunction<ProducerConfigurationData,
ProducerImpl, ProducerImpl> producerDecorator)
+ throws Exception {
+ String cluster2 = pulsar2.getConfig().getClusterName();
+ BrokerService brokerService = pulsar1.getBrokerService();
+ // Wait for the internal client created.
+ final String topicNameTriggerInternalClientCreate =
+ BrokerTestUtil.newUniqueName("persistent://" +
defaultNamespace + "/tp_");
+
admin1.topics().createNonPartitionedTopic(topicNameTriggerInternalClientCreate);
+ waitReplicatorStarted(topicNameTriggerInternalClientCreate);
+ cleanupTopics(() -> {
+ admin1.topics().delete(topicNameTriggerInternalClientCreate);
+ admin2.topics().delete(topicNameTriggerInternalClientCreate);
+ });
+
+ // Inject spy client.
+ ConcurrentOpenHashMap<String, PulsarClient>
+ replicationClients =
WhiteboxImpl.getInternalState(brokerService, "replicationClients");
+ PulsarClientImpl internalClient = (PulsarClientImpl)
replicationClients.get(cluster2);
+ PulsarClient spyClient = spy(internalClient);
+ replicationClients.put(cluster2, spyClient);
+
+ // Inject producer decorator.
+ doAnswer(invocation -> {
+ Schema schema = (Schema) invocation.getArguments()[0];
+ ProducerBuilderImpl producerBuilder = (ProducerBuilderImpl)
internalClient.newProducer(schema);
+ ProducerBuilder spyProducerBuilder = spy(producerBuilder);
+ doAnswer(ignore -> {
+ CompletableFuture<Producer> producerFuture = new
CompletableFuture<>();
+ final ProducerImpl p = (ProducerImpl) producerBuilder.create();
+ new FastThreadLocalThread(() -> {
+ try {
+ ProducerImpl newProducer =
producerDecorator.apply(producerBuilder.getConf(), p);
+ producerFuture.complete(newProducer);
+ } catch (Exception ex) {
+ producerFuture.completeExceptionally(ex);
+ }
+ }).start();
+ return producerFuture;
+ }).when(spyProducerBuilder).createAsync();
+ return spyProducerBuilder;
+ }).when(spyClient).newProducer(any(Schema.class));
+ }
+
+ private SpyCursor spyCursor(PersistentTopic persistentTopic, String
cursorName) throws Exception {
+ ManagedLedgerImpl ml = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
+ ManagedCursorImpl cursor = (ManagedCursorImpl)
ml.getCursors().get(cursorName);
+ ManagedCursorImpl spyCursor = spy(cursor);
+ // remove cursor.
+ ml.getCursors().removeCursor(cursorName);
+ ml.deactivateCursor(cursor);
+ // Add the spy one. addCursor(ManagedCursorImpl cursor)
+ Method m = ManagedLedgerImpl.class.getDeclaredMethod("addCursor", new
Class[]{ManagedCursorImpl.class});
+ m.setAccessible(true);
+ m.invoke(ml, new Object[]{spyCursor});
+ return new SpyCursor(cursor, spyCursor);
+ }
+
+ @Data
+ @AllArgsConstructor
+ static class SpyCursor {
+ ManagedCursorImpl original;
+ ManagedCursorImpl spy;
+ }
+
+ private CursorCloseSignal makeCursorClosingDelay(SpyCursor spyCursor)
throws Exception {
+ CountDownLatch startCloseSignal = new CountDownLatch(1);
+ CountDownLatch startCallbackSignal = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ AsyncCallbacks.CloseCallback originalCallback =
(AsyncCallbacks.CloseCallback) invocation.getArguments()[0];
+ Object ctx = invocation.getArguments()[1];
+ AsyncCallbacks.CloseCallback newCallback = new
AsyncCallbacks.CloseCallback() {
+ @Override
+ public void closeComplete(Object ctx) {
+ new FastThreadLocalThread(new Runnable() {
+ @Override
+ @SneakyThrows
+ public void run() {
+ startCallbackSignal.await();
+ originalCallback.closeComplete(ctx);
+ }
+ }).start();
+ }
+
+ @Override
+ public void closeFailed(ManagedLedgerException exception,
Object ctx) {
+ new FastThreadLocalThread(new Runnable() {
+ @Override
+ @SneakyThrows
+ public void run() {
+ startCallbackSignal.await();
+ originalCallback.closeFailed(exception, ctx);
+ }
+ }).start();
+ }
+ };
+ startCloseSignal.await();
+ spyCursor.original.asyncClose(newCallback, ctx);
+ return null;
+
}).when(spyCursor.spy).asyncClose(any(AsyncCallbacks.CloseCallback.class),
any());
+ return new CursorCloseSignal(startCloseSignal, startCallbackSignal);
+ }
+
+ @AllArgsConstructor
+ static class CursorCloseSignal {
+ CountDownLatch startCloseSignal;
+ CountDownLatch startCallbackSignal;
+
+ void startClose() {
+ startCloseSignal.countDown();
+ }
+
+ void startCallback() {
+ startCallbackSignal.countDown();
+ }
+ }
+
+ /**
+ * See the description and execution flow:
https://github.com/apache/pulsar/pull/21946.
+ * Steps:
+ * - Create topic, but the internal producer of Replicator created failed.
+ * - Unload bundle, the Replicator will be closed, but the internal
producer creation retry has not executed yet.
+ * - The internal producer creation retry execute successfully, the
"repl.cursor" has not been closed yet.
+ * - The topic is wholly closed.
+ * - Verify: the delayed created internal producer will be closed.
+ */
+ @Test
+ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws
Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ defaultNamespace + "/tp_");
+ // Inject an error for "replicator.producer" creation.
+ // The delay time of next retry to create producer is below:
+ // 0.1s, 0.2, 0.4, 0.8, 1.6s, 3.2s, 6.4s...
+ // If the retry counter is larger than 6, the next creation will be
slow enough to close Replicator.
+ final AtomicInteger createProducerCounter = new AtomicInteger();
+ final int failTimes = 6;
+ injectMockReplicatorProducerBuilder((producerCnf, orginalProducer) -> {
+ if (topicName.equals(producerCnf.getTopicName())) {
+ // There is a switch to determine create producer successfully
or not.
+ if (createProducerCounter.incrementAndGet() > failTimes) {
+ return orginalProducer;
+ }
+ log.info("Retry create replicator.producer count: {}",
createProducerCounter);
+ // Release producer and fail callback.
+ orginalProducer.closeAsync();
+ throw new RuntimeException("mock error");
+ }
+ return orginalProducer;
+ });
+
+ // Create topic.
+ admin1.topics().createNonPartitionedTopic(topicName);
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+ PersistentReplicator replicator =
+ (PersistentReplicator)
persistentTopic.getReplicators().values().iterator().next();
+ // Since we inject a producer creation error, the replicator can not
start successfully.
+ assertFalse(replicator.isConnected());
+
+ // Stuck the closing of the cursor("pulsar.repl"), until the internal
producer of the replicator started.
+ SpyCursor spyCursor =
+ spyCursor(persistentTopic, "pulsar.repl." +
pulsar2.getConfig().getClusterName());
+ CursorCloseSignal cursorCloseSignal =
makeCursorClosingDelay(spyCursor);
+
+ // Unload bundle: call "topic.close(false)".
+ // Stuck start new producer, until the state of replicator change to
Stopped.
+ // The next once of "createProducerSuccessAfterFailTimes" to create
producer will be successfully.
+
Awaitility.await().pollInterval(Duration.ofMillis(100)).atMost(Duration.ofSeconds(60)).untilAsserted(()
-> {
+ assertTrue(createProducerCounter.get() >= failTimes);
+ });
+ CompletableFuture<Void> topicCloseFuture = persistentTopic.close(true);
+ Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
+ String state = String.valueOf(replicator.getState());
+ assertTrue(state.equals("Stopped") || state.equals("Terminated"));
+ });
+
+ // Delay close cursor, until "replicator.producer" create successfully.
+ // The next once retry time of create "replicator.producer" will be
3.2s.
+ Thread.sleep(4 * 1000);
+ log.info("Replicator.state: {}", replicator.getState());
+ cursorCloseSignal.startClose();
+ cursorCloseSignal.startCallback();
Review Comment:
> Why do we need to close the cursor with a delay? To make sure the
Replicator can read messages?
Just make sure the `replicator_producer` will not be closed due to the
cursor is closed.
- The next once-retry time of creating `replicator_producer` will be 3.2s.
- Once the `replicator_producer` is created, Replicator will do
`readMoreEntries`
- `cursor is closed`: Replicator will get a `readEntriesFailed` and close
the `replicator_producer.`
- `cursor is available`: Cursor will pend the reading requests because
there are no entries in the backlog.
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##########
@@ -153,4 +179,203 @@ public void
testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception
admin2.topics().delete(topicName);
});
}
+
+ private void injectMockReplicatorProducerBuilder(
+ BiFunction<ProducerConfigurationData,
ProducerImpl, ProducerImpl> producerDecorator)
+ throws Exception {
+ String cluster2 = pulsar2.getConfig().getClusterName();
+ BrokerService brokerService = pulsar1.getBrokerService();
+ // Wait for the internal client created.
+ final String topicNameTriggerInternalClientCreate =
+ BrokerTestUtil.newUniqueName("persistent://" +
defaultNamespace + "/tp_");
+
admin1.topics().createNonPartitionedTopic(topicNameTriggerInternalClientCreate);
+ waitReplicatorStarted(topicNameTriggerInternalClientCreate);
+ cleanupTopics(() -> {
+ admin1.topics().delete(topicNameTriggerInternalClientCreate);
+ admin2.topics().delete(topicNameTriggerInternalClientCreate);
+ });
+
+ // Inject spy client.
+ ConcurrentOpenHashMap<String, PulsarClient>
+ replicationClients =
WhiteboxImpl.getInternalState(brokerService, "replicationClients");
+ PulsarClientImpl internalClient = (PulsarClientImpl)
replicationClients.get(cluster2);
+ PulsarClient spyClient = spy(internalClient);
+ replicationClients.put(cluster2, spyClient);
+
+ // Inject producer decorator.
+ doAnswer(invocation -> {
+ Schema schema = (Schema) invocation.getArguments()[0];
+ ProducerBuilderImpl producerBuilder = (ProducerBuilderImpl)
internalClient.newProducer(schema);
+ ProducerBuilder spyProducerBuilder = spy(producerBuilder);
+ doAnswer(ignore -> {
+ CompletableFuture<Producer> producerFuture = new
CompletableFuture<>();
+ final ProducerImpl p = (ProducerImpl) producerBuilder.create();
+ new FastThreadLocalThread(() -> {
+ try {
+ ProducerImpl newProducer =
producerDecorator.apply(producerBuilder.getConf(), p);
+ producerFuture.complete(newProducer);
+ } catch (Exception ex) {
+ producerFuture.completeExceptionally(ex);
+ }
+ }).start();
+ return producerFuture;
+ }).when(spyProducerBuilder).createAsync();
+ return spyProducerBuilder;
+ }).when(spyClient).newProducer(any(Schema.class));
+ }
+
+ private SpyCursor spyCursor(PersistentTopic persistentTopic, String
cursorName) throws Exception {
+ ManagedLedgerImpl ml = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
+ ManagedCursorImpl cursor = (ManagedCursorImpl)
ml.getCursors().get(cursorName);
+ ManagedCursorImpl spyCursor = spy(cursor);
+ // remove cursor.
+ ml.getCursors().removeCursor(cursorName);
+ ml.deactivateCursor(cursor);
+ // Add the spy one. addCursor(ManagedCursorImpl cursor)
+ Method m = ManagedLedgerImpl.class.getDeclaredMethod("addCursor", new
Class[]{ManagedCursorImpl.class});
+ m.setAccessible(true);
+ m.invoke(ml, new Object[]{spyCursor});
+ return new SpyCursor(cursor, spyCursor);
+ }
+
+ @Data
+ @AllArgsConstructor
+ static class SpyCursor {
+ ManagedCursorImpl original;
+ ManagedCursorImpl spy;
+ }
+
+ private CursorCloseSignal makeCursorClosingDelay(SpyCursor spyCursor)
throws Exception {
+ CountDownLatch startCloseSignal = new CountDownLatch(1);
+ CountDownLatch startCallbackSignal = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ AsyncCallbacks.CloseCallback originalCallback =
(AsyncCallbacks.CloseCallback) invocation.getArguments()[0];
+ Object ctx = invocation.getArguments()[1];
+ AsyncCallbacks.CloseCallback newCallback = new
AsyncCallbacks.CloseCallback() {
+ @Override
+ public void closeComplete(Object ctx) {
+ new FastThreadLocalThread(new Runnable() {
+ @Override
+ @SneakyThrows
+ public void run() {
+ startCallbackSignal.await();
+ originalCallback.closeComplete(ctx);
+ }
+ }).start();
+ }
+
+ @Override
+ public void closeFailed(ManagedLedgerException exception,
Object ctx) {
+ new FastThreadLocalThread(new Runnable() {
+ @Override
+ @SneakyThrows
+ public void run() {
+ startCallbackSignal.await();
+ originalCallback.closeFailed(exception, ctx);
+ }
+ }).start();
+ }
+ };
+ startCloseSignal.await();
+ spyCursor.original.asyncClose(newCallback, ctx);
+ return null;
+
}).when(spyCursor.spy).asyncClose(any(AsyncCallbacks.CloseCallback.class),
any());
+ return new CursorCloseSignal(startCloseSignal, startCallbackSignal);
+ }
+
+ @AllArgsConstructor
+ static class CursorCloseSignal {
+ CountDownLatch startCloseSignal;
+ CountDownLatch startCallbackSignal;
+
+ void startClose() {
+ startCloseSignal.countDown();
+ }
+
+ void startCallback() {
+ startCallbackSignal.countDown();
+ }
+ }
+
+ /**
+ * See the description and execution flow:
https://github.com/apache/pulsar/pull/21946.
+ * Steps:
+ * - Create topic, but the internal producer of Replicator created failed.
+ * - Unload bundle, the Replicator will be closed, but the internal
producer creation retry has not executed yet.
+ * - The internal producer creation retry execute successfully, the
"repl.cursor" has not been closed yet.
+ * - The topic is wholly closed.
+ * - Verify: the delayed created internal producer will be closed.
+ */
+ @Test
+ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws
Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ defaultNamespace + "/tp_");
+ // Inject an error for "replicator.producer" creation.
+ // The delay time of next retry to create producer is below:
+ // 0.1s, 0.2, 0.4, 0.8, 1.6s, 3.2s, 6.4s...
+ // If the retry counter is larger than 6, the next creation will be
slow enough to close Replicator.
+ final AtomicInteger createProducerCounter = new AtomicInteger();
+ final int failTimes = 6;
+ injectMockReplicatorProducerBuilder((producerCnf, orginalProducer) -> {
+ if (topicName.equals(producerCnf.getTopicName())) {
+ // There is a switch to determine create producer successfully
or not.
+ if (createProducerCounter.incrementAndGet() > failTimes) {
+ return orginalProducer;
+ }
+ log.info("Retry create replicator.producer count: {}",
createProducerCounter);
+ // Release producer and fail callback.
+ orginalProducer.closeAsync();
+ throw new RuntimeException("mock error");
+ }
+ return orginalProducer;
+ });
+
+ // Create topic.
+ admin1.topics().createNonPartitionedTopic(topicName);
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+ PersistentReplicator replicator =
+ (PersistentReplicator)
persistentTopic.getReplicators().values().iterator().next();
+ // Since we inject a producer creation error, the replicator can not
start successfully.
+ assertFalse(replicator.isConnected());
+
+ // Stuck the closing of the cursor("pulsar.repl"), until the internal
producer of the replicator started.
+ SpyCursor spyCursor =
+ spyCursor(persistentTopic, "pulsar.repl." +
pulsar2.getConfig().getClusterName());
+ CursorCloseSignal cursorCloseSignal =
makeCursorClosingDelay(spyCursor);
+
+ // Unload bundle: call "topic.close(false)".
+ // Stuck start new producer, until the state of replicator change to
Stopped.
+ // The next once of "createProducerSuccessAfterFailTimes" to create
producer will be successfully.
+
Awaitility.await().pollInterval(Duration.ofMillis(100)).atMost(Duration.ofSeconds(60)).untilAsserted(()
-> {
+ assertTrue(createProducerCounter.get() >= failTimes);
+ });
+ CompletableFuture<Void> topicCloseFuture = persistentTopic.close(true);
+ Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
+ String state = String.valueOf(replicator.getState());
+ assertTrue(state.equals("Stopped") || state.equals("Terminated"));
+ });
+
+ // Delay close cursor, until "replicator.producer" create successfully.
+ // The next once retry time of create "replicator.producer" will be
3.2s.
+ Thread.sleep(4 * 1000);
+ log.info("Replicator.state: {}", replicator.getState());
+ cursorCloseSignal.startClose();
+ cursorCloseSignal.startCallback();
Review Comment:
> Why do we need to close the cursor with a delay? To make sure the
Replicator can read messages?
Just make sure the `replicator_producer` will not be closed due to the
cursor was closed.
- The next once-retry time of creating `replicator_producer` will be 3.2s.
- Once the `replicator_producer` is created, Replicator will do
`readMoreEntries`
- `cursor is closed`: Replicator will get a `readEntriesFailed` and close
the `replicator_producer.`
- `cursor is available`: Cursor will pend the reading requests because
there are no entries in the backlog.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]