poorbarcode opened a new pull request, #24178:
URL: https://github.com/apache/pulsar/pull/24178

   ### Motivation
   
   **Issue 1: memory leak when publishing messages with a broken schema**
   - Pulsar client will discard messages which has a broken schema, but it 
never releases the message when discarding the message.
   - You can reproduce the issue with the new test `testBrokenSchema`.
   
   ---
   
   **Issue 2: incorrect replication/producer state**
   - The internal producer of the replicator connected.
   - The producer tries to register a new schema, and the state was changed: 
`Ready -> RegisteringSchema`.
   - The topic's stats response shows there is a producer connected, but the 
value of `replication.connected` or `producer.connected` shows `false`
   - You can reproduce the issue with the new test 
`testProducerConnectStateWhenRegisteringSchema`
   
   ---
   
   **Issue 3: replication lost messages or is out of order**
   
   | `time / task` | `internal producer of replicator` | 
   | 1 | send async `msg1` with a compatible schema |
   | 2 | send asycn `msg2` with a incompatible schema |
   | 3 | send async `msg1` with a compatible schema |
   
   Result:
   - `msg1` was sent
   - `msg2` was discarded due to incompatible schema
   - `msg3` was sent
   
   ---
   
   **Issue 4: Reused a recycled SendCallback, which causes a dangerous issue**
   | `time / task` | `client: publish with a broken schema` | `broker-side: 
handle schema` | `broker-side: disconnect` | `client: close producer`|
   | --- | --- | --- | --- | --- |
   | 1 | Sends new schema to Broker |
   | 2 | | received the request of new schema registration |
   | 3 | | switch to `metadata store threads` or `Bookie client threads` |
   | 4 | | | Disconnect producers due to the unloading topic or others, but the 
socket has not been closed yet |
   | 5 | | | | start to close producer |
   | 6 | | respond to client: `Broken schema`|
   | 7 | Calls "op.callback.sendComplete()", but `op` is still in 
`producer.pendingMessages`<sup>[1]</sup> |
   | 8 | `op.callback` was recycled`|
   | 9 | | | `client-side`: disconnected |
   | 10 | | | | Calls `failPendingMessages`, which provides a failed callback 
for all pending messages |
   
   At `step 10`, the producer will call a failed callback on a recycled 
`SendCallback` which has been recycled at `step 8`, but the object 
`SendCallback` may be used by others, which will cause unexpected and 
dangenrous issues.
   
   **[1]** 
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L894
   
   ```java
       private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, 
SendCallback callback, long expectedCnxEpoch) {
           SchemaInfo schemaInfo = ...;
           getOrCreateSchemaAsync(cnx, schemaInfo).handle((v, ex) -> {
               if (ex instanceof 
PulsarClientException.IncompatibleSchemaException) {
                   msg.setSchemaState(MessageImpl.SchemaState.Broken);
                   callback.sendComplete(t, null); // This line calls 
"callback.sendComplete", but the callback was still related to 
"producer.pendingMessages"
               }
               ...
           });
       }
   ```
   
   You can reproduce `issues 3 and 4` with the new test 
`testIncompatibleMultiVersionSchema `, and you will get various errors, but the 
test is not in order to reproduce a special case.
   
   ---
   
   ### Modifications
   
   - Fix the 3 issues that were described in Motivation
   - Print schemas in one line instead of pretty printing, which is helpful for 
searching and filtering.
   
   
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update 
later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   ### Matching PR in forked repository
   
   PR in forked repository: x


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to