lhotari opened a new pull request, #25012:
URL: https://github.com/apache/pulsar/pull/25012

   ### Motivation
   
   Geo-replication will fail with `schemaValidationEnforced` enabled in certain 
cases currently. 
   The replication will be in a loop loggin "IncompatibleSchemaException: 
Producers cannot connect or send message without a schema to topics with a 
schemawhen SchemaValidationEnforced is enabled" warnings and errors.
   
   Example logs from the test that reproduces the issue:
   ```
   2025-11-24T17:45:05,002 - WARN  - [pulsar-io-64-7:ClientCnx] - [id: 
0xca37c05a, L:/127.0.0.1:61297 - R:localhost/127.0.0.1:61194] Received error 
from server: 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: 
Producers cannot connect or send message without a schema to topics with a 
schemawhen SchemaValidationEnforced is enabled caused by 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: 
Producers cannot connect or send message without a schema to topics with a 
schemawhen SchemaValidationEnforced is enabled
   2025-11-24T17:45:05,002 - ERROR - [pulsar-io-64-7:ProducerImpl] - 
[persistent://public/always-compatible/tp_-206e9fd0-3dab-49eb-99f6-ebfa1a8452d8]
 [pulsar.repl.r1-->r2] Failed to create producer: 
{"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException:
 Producers cannot connect or send message without a schema to topics with a 
schemawhen SchemaValidationEnforced is enabled caused by 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: 
Producers cannot connect or send message without a schema to topics with a 
schemawhen SchemaValidationEnforced is enabled","reqId":1217737249645833457, 
"remote":"localhost/127.0.0.1:61194", "local":"/127.0.0.1:61297"}
   2025-11-24T17:45:05,002 - WARN  - [pulsar-io-64-7:AbstractReplicator] - 
[persistent://public/always-compatible/tp_-206e9fd0-3dab-49eb-99f6-ebfa1a8452d8 
| r1-->r2] Failed to create remote producer 
(org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException:
 
{"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException:
 Producers cannot connect or send message without a schema to topics with a 
schemawhen SchemaValidationEnforced is enabled caused by 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: 
Producers cannot connect or send message without a schema to topics with a 
schemawhen SchemaValidationEnforced is enabled","reqId":1217737249645833457, 
"remote":"localhost/127.0.0.1:61194", "local":"/127.0.0.1:61297"}), retrying in 
0.382 s
   2025-11-24T17:45:05,012 - INFO  - [pulsar-io-64-5:AbstractReplicator] - 
[persistent://public/always-compatible/tp_-206e9fd0-3dab-49eb-99f6-ebfa1a8452d8 
| r1-->r2] Starting replicator
   2025-11-24T17:45:05,013 - INFO  - [pulsar-io-64-10:ConnectionPool] - [[id: 
0x153925b5, L:/127.0.0.1:61298 - R:localhost/127.0.0.1:61194]] Connected to 
server
   2025-11-24T17:45:05,013 - INFO  - [pulsar-io-105-14:ServerCnx] - 
[/127.0.0.1:61298] connected with clientVersion=Pulsar-Java-v4.2.0-SNAPSHOT, 
clientProtocolVersion=21, proxyVersion=null
   2025-11-24T17:45:05,014 - INFO  - [pulsar-io-64-24:ProducerImpl] - 
[persistent://public/always-compatible/tp_-206e9fd0-3dab-49eb-99f6-ebfa1a8452d8]
 [pulsar.repl.r1-->r2] Creating producer on cnx [id: 0xae6e9a97, 
L:/127.0.0.1:61237 - R:localhost/127.0.0.1:61194]
   2025-11-24T17:45:05,015 - WARN  - [pulsar-io-64-18:ClientCnx] - [id: 
0xae6e9a97, L:/127.0.0.1:61237 - R:localhost/127.0.0.1:61194] Received error 
from server: 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: 
Producers cannot connect or send message without a schema to topics with a 
schemawhen SchemaValidationEnforced is enabled caused by 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: 
Producers cannot connect or send message without a schema to topics with a 
schemawhen SchemaValidationEnforced is enabled
   2025-11-24T17:45:05,015 - ERROR - [pulsar-io-64-18:ProducerImpl] - 
[persistent://public/always-compatible/tp_-206e9fd0-3dab-49eb-99f6-ebfa1a8452d8]
 [pulsar.repl.r1-->r2] Failed to create producer: 
{"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException:
 Producers cannot connect or send message without a schema to topics with a 
schemawhen SchemaValidationEnforced is enabled caused by 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: 
Producers cannot connect or send message without a schema to topics with a 
schemawhen SchemaValidationEnforced is enabled","reqId":1217737249645833460, 
"remote":"localhost/127.0.0.1:61194", "local":"/127.0.0.1:61237"}
   2025-11-24T17:45:05,015 - WARN  - [pulsar-io-64-18:AbstractReplicator] - 
[persistent://public/always-compatible/tp_-206e9fd0-3dab-49eb-99f6-ebfa1a8452d8 
| r1-->r2] Failed to create remote producer 
(org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException:
 
{"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException:
 Producers cannot connect or send message without a schema to topics with a 
schemawhen SchemaValidationEnforced is enabled caused by 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: 
Producers cannot connect or send message without a schema to topics with a 
schemawhen SchemaValidationEnforced is enabled","reqId":1217737249645833460, 
"remote":"localhost/127.0.0.1:61194", "local":"/127.0.0.1:61237"}), retrying in 
0.782 s
   ```
   
   The root cause of the problem is in this implementation code in the Pulsar 
client that geo-replication uses:
   
https://github.com/apache/pulsar/blob/c8d6208bb364c3c65215cf235feeaec20c0adb9d/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java#L426-L443
   
   After the first lookup for the schema, the schema will be cached. When the 
schema is missing from the remote cluster's topic, it will always be 
initialized to `Schema.BYTES`. If topic is configured to have a schema, this 
will get ignored by the replicator since it will continue to use `Schema.BYTES` 
for replication.
   
   ### Modifications
   
   - modify the logic to only use the cached schema when it was provided by the 
user code with 
`org.apache.pulsar.client.api.Schema#AUTO_PRODUCE_BYTES(org.apache.pulsar.client.api.Schema<?>)`
 method.
   - continue to cache the returned schema since it's needed for producing 
messages. However, when reinitializing the producer, it will get fetched each 
time from the broker so that schema updates will be detected. This is needed 
for fixing the replication issue.
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update 
later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to