This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ec609af4a31 [fix][broker] Fix issue with schemaValidationEnforced in
geo-replication (#25012)
ec609af4a31 is described below
commit ec609af4a319ad58600c497fd869dfad0869c226
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Nov 25 11:53:02 2025 +0200
[fix][broker] Fix issue with schemaValidationEnforced in geo-replication
(#25012)
---
...eWayReplicatorSchemaValidationEnforcedTest.java | 103 +++++++++++++++++++++
.../pulsar/client/impl/PulsarClientImpl.java | 2 +-
.../client/impl/schema/AutoProduceBytesSchema.java | 6 ++
3 files changed, 110 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorSchemaValidationEnforcedTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorSchemaValidationEnforcedTest.java
new file mode 100644
index 00000000000..2dd94bb1cd1
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorSchemaValidationEnforcedTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import java.util.concurrent.TimeUnit;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.pulsar.zookeeper.ZookeeperServerTest;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-replication")
+public class OneWayReplicatorSchemaValidationEnforcedTest extends
OneWayReplicatorTestBase {
+
+ @Override
+ @BeforeClass(alwaysRun = true, timeOut = 300000)
+ public void setup() throws Exception {
+ super.setup();
+ }
+
+ @Override
+ @AfterClass(alwaysRun = true, timeOut = 300000)
+ public void cleanup() throws Exception {
+ super.cleanup();
+ }
+
+ @Data
+ private static class MyClass {
+ int field1;
+ String field2;
+ Long field3;
+ }
+
+ @Override
+ protected void setConfigDefaults(ServiceConfiguration config, String
clusterName,
+ LocalBookkeeperEnsemble
bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
+ super.setConfigDefaults(config, clusterName, bookkeeperEnsemble,
brokerConfigZk);
+ config.setSchemaValidationEnforced(true);
+ // disable topic auto creation so that it's possible to reproduce the
scenario consistently
+ config.setAllowAutoTopicCreation(false);
+ }
+
+ @Test(timeOut = 30000)
+ public void testReplicationWithAvroSchemaWithSchemaValidationEnforced()
throws Exception {
+ Schema<MyClass> myClassSchema = Schema.AVRO(MyClass.class);
+ final String topicName =
+ BrokerTestUtil.newUniqueName("persistent://" +
sourceClusterAlwaysSchemaCompatibleNamespace + "/tp_");
+ // create the topic and schema in the local cluster (r1)
+ admin1.topics().createNonPartitionedTopic(topicName);
+ admin1.schemas().createSchema(topicName,
myClassSchema.getSchemaInfo());
+ // create the topic and schema in the remote cluster (r2)
+ admin2.topics().createNonPartitionedTopic(topicName);
+ admin2.schemas().createSchema(topicName,
myClassSchema.getSchemaInfo());
+
+ // consume from the remote cluster (r2)
+ Consumer<MyClass> consumer2 = client2.newConsumer(myClassSchema)
+ .topic(topicName).subscriptionName("sub").subscribe();
+
+ // produce to local cluster (r1)
+ Producer<MyClass> producer1 =
client1.newProducer(myClassSchema).topic(topicName).create();
+ MyClass sentBody = new MyClass();
+ sentBody.setField1(1);
+ sentBody.setField2("test");
+ sentBody.setField3(123456789L);
+ producer1.send(sentBody);
+
+ // verify that the message was received from the remote cluster (r2)
+ Message<MyClass> received = consumer2.receive(10, TimeUnit.SECONDS);
+ assertThat(received).isNotNull();
+ assertThat(received.getValue()).isNotNull().satisfies(receivedBody -> {
+ assertThat(receivedBody.getField1()).isEqualTo(1);
+ assertThat(receivedBody.getField2()).isEqualTo("test");
+ assertThat(receivedBody.getField3()).isEqualTo(123456789L);
+ });
+ }
+
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index a09e11cf1f5..859480f6fc8 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -424,7 +424,7 @@ public class PulsarClientImpl implements PulsarClient {
if (schema instanceof AutoProduceBytesSchema) {
AutoProduceBytesSchema autoProduceBytesSchema =
(AutoProduceBytesSchema) schema;
- if (autoProduceBytesSchema.schemaInitialized()) {
+ if (autoProduceBytesSchema.hasUserProvidedSchema()) {
return createProducerAsync(topic, conf, schema, interceptors);
}
return lookup.getSchema(TopicName.get(conf.getTopicName()))
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
index 8641d14d7df..cc134b57b21 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
@@ -35,12 +35,14 @@ public class AutoProduceBytesSchema<T> implements
Schema<byte[]> {
@Setter
private boolean requireSchemaValidation = true;
private Schema<T> schema;
+ private boolean userProvidedSchema;
public AutoProduceBytesSchema() {
}
public AutoProduceBytesSchema(Schema<T> schema) {
this.schema = schema;
+ this.userProvidedSchema = true;
SchemaInfo schemaInfo = schema.getSchemaInfo();
this.requireSchemaValidation = schemaInfo != null
&& schemaInfo.getType() !=
SchemaType.BYTES
@@ -62,6 +64,10 @@ public class AutoProduceBytesSchema<T> implements
Schema<byte[]> {
return schema != null;
}
+ public boolean hasUserProvidedSchema() {
+ return userProvidedSchema;
+ }
+
@Override
public void validate(byte[] message) {
ensureSchemaInitialized();