smbecker opened a new issue, #250: URL: https://github.com/apache/pulsar-dotpulsar/issues/250
### Description I have a consumer that I am using `SubscriptionType.Failover` to only allow a single active consumer at a time. However, I would like to configure the topic as `non-persistent` but I am not able to get `SubscriptionType.Failover` to connect correctly for `non-persistent` topics. In the included test, you should see that it passes for `persistent` topics but not for `non-persistent` topics. According to [the docs](https://pulsar.apache.org/docs/next/concepts-messaging/#client-api), failover should work for `non-persistent` topics. Is this a client issue or a server issue? ### Reproduction Steps ```c# public class PulsarTests : IAsyncLifetime { private readonly PulsarContainer container = new PulsarBuilder().WithImage("apachepulsar/pulsar:latest").Build(); [Theory] [InlineData("non-persistent://public/default/test")] [InlineData("persistent://public/default/test")] public async Task can_use_failover(string topic) { ConsumerState state1 = ConsumerState.Closed; ConsumerState state2 = ConsumerState.Closed; await using var client = PulsarClient.Builder().ServiceUrl(new Uri(container.GetBrokerAddress())).Build(); await using var consumer1 = client.NewConsumer() .Topic(topic) .SubscriptionName("test") .ConsumerName("test-1") .SubscriptionType(SubscriptionType.Failover) .StateChangedHandler(e => state1 = e.ConsumerState) .Create(); await using var consumer2 = client.NewConsumer() .Topic(topic) .SubscriptionName("test") .ConsumerName("test-2") .SubscriptionType(SubscriptionType.Failover) .StateChangedHandler(e => state2 = e.ConsumerState) .Create(); using var cts = new CancellationTokenSource(); var attempts = 0; try { // ReSharper disable AccessToDisposedClosure _ = Task.Run(() => consumer1.Receive(cts.Token).AsTask(), cts.Token); _ = Task.Run(() => consumer2.Receive(cts.Token).AsTask(), cts.Token); // ReSharper restore AccessToDisposedClosure while (true) { if (state1 == ConsumerState.Active) { await consumer1.DisposeAsync(); } if (state2 == ConsumerState.Active) { // We should reach here break; } if (attempts > 20) { Assert.Fail("Failed to activate consumer"); break; } attempts++; await Task.Delay(TimeSpan.FromSeconds(1), cts.Token); } } finally { await cts.CancelAsync(); } } public Task InitializeAsync() => container.StartAsync(); public Task DisposeAsync() => container.DisposeAsync().AsTask(); } ``` ### Expected behavior Consumers with `SubscriptionType.Failover` against `non-persistent` topics should be activated. ### Actual behavior Consumers with `SubscriptionType.Failover` against `non-persistent` topics are never activated. ### Regression? _No response_ ### Known Workarounds Use `persistent` topics rather than `non-persistent` topics ### Configuration _No response_ ### Other information _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
