poorbarcode opened a new pull request, #24178: URL: https://github.com/apache/pulsar/pull/24178
### Motivation **Issue 1: memory leak when publishing messages with a broken schema** - Pulsar client will discard messages which has a broken schema, but it never releases the message when discarding the message. - You can reproduce the issue with the new test `testBrokenSchema`. --- **Issue 2: incorrect replication/producer state** - The internal producer of the replicator connected. - The producer tries to register a new schema, and the state was changed: `Ready -> RegisteringSchema`. - The topic's stats response shows there is a producer connected, but the value of `replication.connected` or `producer.connected` shows `false` - You can reproduce the issue with the new test `testProducerConnectStateWhenRegisteringSchema` --- **Issue 3: replication lost messages or is out of order** | `time / task` | `internal producer of replicator` | | 1 | send async `msg1` with a compatible schema | | 2 | send asycn `msg2` with a incompatible schema | | 3 | send async `msg1` with a compatible schema | Result: - `msg1` was sent - `msg2` was discarded due to incompatible schema - `msg3` was sent --- **Issue 4: Reused a recycled SendCallback, which causes a dangerous issue** | `time / task` | `client: publish with a broken schema` | `broker-side: handle schema` | `broker-side: disconnect` | `client: close producer`| | --- | --- | --- | --- | --- | | 1 | Sends new schema to Broker | | 2 | | received the request of new schema registration | | 3 | | switch to `metadata store threads` or `Bookie client threads` | | 4 | | | Disconnect producers due to the unloading topic or others, but the socket has not been closed yet | | 5 | | | | start to close producer | | 6 | | respond to client: `Broken schema`| | 7 | Calls "op.callback.sendComplete()", but `op` is still in `producer.pendingMessages`<sup>[1]</sup> | | 8 | `op.callback` was recycled`| | 9 | | | `client-side`: disconnected | | 10 | | | | Calls `failPendingMessages`, which provides a failed callback for all pending messages | At `step 10`, the producer will call a failed callback on a recycled `SendCallback` which has been recycled at `step 8`, but the object `SendCallback` may be used by others, which will cause unexpected and dangenrous issues. **[1]** https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L894 ```java private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback callback, long expectedCnxEpoch) { SchemaInfo schemaInfo = ...; getOrCreateSchemaAsync(cnx, schemaInfo).handle((v, ex) -> { if (ex instanceof PulsarClientException.IncompatibleSchemaException) { msg.setSchemaState(MessageImpl.SchemaState.Broken); callback.sendComplete(t, null); // This line calls "callback.sendComplete", but the callback was still related to "producer.pendingMessages" } ... }); } ``` You can reproduce `issues 3 and 4` with the new test `testIncompatibleMultiVersionSchema `, and you will get various errors, but the test is not in order to reproduce a special case. --- ### Modifications - Fix the 3 issues that were described in Motivation - Print schemas in one line instead of pretty printing, which is helpful for searching and filtering. ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> ### Matching PR in forked repository PR in forked repository: x -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
