sijie opened a new pull request #7133:
URL: https://github.com/apache/pulsar/pull/7133
*Motivation*
When a reader attempts to open a non-durable cursor on an empty manager
ledger.
NPE is thrown and reader is not able to be created on the topic.
```
2020-06-01 12:07:30.673 [pulsar-client-io-66-2] WARN
org.apache.pulsar.client.impl.PulsarClientImpl -
[persistent://public/default/test-partition-1] Failed to get create topic reader
java.util.concurrent.CompletionException:
org.apache.pulsar.client.api.PulsarClientException$TimeoutException: 10333
lookup request timedout after ms 30000
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
[na:1.8.0_242]
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
[na:1.8.0_242]
at
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:714)
[na:1.8.0_242]
at
java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701)
~[na:1.8.0_242]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
[na:1.8.0_242]
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
[na:1.8.0_242]
at
org.apache.pulsar.client.impl.ClientCnx.checkRequestTimeout(ClientCnx.java:1026)
~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
at
org.apache.pulsar.client.impl.ClientCnx.lambda$channelActive$0(ClientCnx.java:187)
~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
at
org.apache.pulsar.shade.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
at
org.apache.pulsar.shade.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:176)
~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
at
org.apache.pulsar.shade.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
at
org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
~[pulsar-flink-connector_2.11-2.4. 20.jar:2.4.20]
at
org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
at
org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
at
org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
at
org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_242]
Caused by:
org.apache.pulsar.client.api.PulsarClientException$TimeoutException: 10333
lookup request timedout after ms 30000
at
org.apache.pulsar.client.impl.ClientCnx.checkRequestTimeout(ClientCnx.java:1025)
~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
... 10 common frames omitted
```
NPE:
```
Caused by: java.lang.NullPointerException
at
org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl.recoverCursor(NonDurableCursorImpl.java:65)
~[org.apache.pulsar-managed-ledger-2.5.1.jar:2.5.1]
at
org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl.<init>(NonDurableCursorImpl.java:51)
~[org.apache.pulsar-managed-ledger-2.5.1.jar:2.5.1]
at
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.newNonDurableCursor(ManagedLedgerImpl.java:855)
~[org.apache.pulsar-managed-ledger-2.5.1.jar:2.5.1]
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$getNonDurableSubscription$13(PersistentTopic.java:692)
~[org.apache.pulsar-pulsar-broker-2.5.1.jar:2.5.1]
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:274)
~[org.apache.pulsar-pulsar-common-2.5.1.jar:2.5.1] at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
~[org.apache.pulsar-pulsar-common-2.5.1.jar:2.5.1]
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.getNonDurableSubscription(PersistentTopic.java:675)
~[org.apache.pulsar-pulsar-broker-2.5.1.jar:2.5.1]
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:584)
~[org.apache.pulsar-pulsar-broker-2.5.1.jar:2.5.1]
at
org.apache.pulsar.broker.service.ServerCnx.lambda$null$11(ServerCnx.java:824)
~[org.apache.pulsar-pulsar-broker-2.5.1.jar:2.5.1]
at
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
~[?:1.8.0_242]
```
*Modifications*
- Change the `nextPosition` logic to set entryId of nextPosition to 0 if it
is an invalid entry (entryId < 0)
- Handle the case where `readPosition` is null
- Fix the PersistentTopic logic to only handle valid entry position
(ledgerId >= 0 && entryId >= 0)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]