This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 6232a140f13 [fix][schema] Error checking schema compatibility on a
schema-less topic via REST API (#22720)
6232a140f13 is described below
commit 6232a140f13ce2e8259d654a68c8f9656746b183
Author: Baodi Shi <[email protected]>
AuthorDate: Thu May 16 17:36:57 2024 +0800
[fix][schema] Error checking schema compatibility on a schema-less topic
via REST API (#22720)
(cherry picked from commit 101aee4543fb66035165d8744def630f9a9c3a59)
---
.../schema/AvroSchemaBasedCompatibilityCheck.java | 6 +++--
.../ProtobufNativeSchemaCompatibilityCheck.java | 4 ++-
.../service/schema/SchemaRegistryServiceImpl.java | 2 +-
.../exceptions/IncompatibleSchemaException.java | 4 +++
.../pulsar/broker/admin/AdminApiSchemaTest.java | 30 ++++++++++++++++++++++
.../broker/service/schema/SchemaServiceTest.java | 4 +--
6 files changed, 43 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
index 1e75834a129..e5fc7800c51 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
@@ -64,8 +64,10 @@ abstract class AvroSchemaBasedCompatibilityCheck implements
SchemaCompatibilityC
log.warn("Error during schema parsing: {}", e.getMessage());
throw new IncompatibleSchemaException(e);
} catch (SchemaValidationException e) {
- log.warn("Error during schema compatibility check: {}",
e.getMessage());
- throw new IncompatibleSchemaException(e);
+ String msg = String.format("Error during schema compatibility
check with strategy %s: %s: %s",
+ strategy, e.getClass().getName(), e.getMessage());
+ log.warn(msg);
+ throw new IncompatibleSchemaException(msg, e);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java
index 16b3b33ec78..fc935e80dca 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java
@@ -67,7 +67,9 @@ public class ProtobufNativeSchemaCompatibilityCheck
implements SchemaCompatibili
private void checkRootMessageChange(Descriptor fromDescriptor, Descriptor
toDescriptor,
SchemaCompatibilityStrategy
strategy) throws IncompatibleSchemaException {
if (!fromDescriptor.getFullName().equals(toDescriptor.getFullName())) {
- throw new IncompatibleSchemaException("Protobuf root message isn't
allow change!");
+ throw new IncompatibleSchemaException("Protobuf root message
change is not allowed under the '"
+ + strategy + "' strategy. Original message name: '" +
fromDescriptor.getFullName()
+ + "', new message name: '" + toDescriptor.getFullName() +
"'.");
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index ae56df248d8..903f57cb780 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -473,7 +473,7 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
}
return result;
} else {
- return FutureUtils.exception(new
IncompatibleSchemaException("Do not have existing schema."));
+ return CompletableFuture.completedFuture(null);
}
});
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java
index c1a2d9fd703..bbe2f4111d7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java
@@ -33,6 +33,10 @@ public class IncompatibleSchemaException extends
SchemaException {
super(message);
}
+ public IncompatibleSchemaException(String message, Throwable e) {
+ super(message, e);
+ }
+
public IncompatibleSchemaException(Throwable e) {
super(e);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
index f67bd6fcfce..34d7dbeb818 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
@@ -467,4 +467,34 @@ public class AdminApiSchemaTest extends
MockedPulsarServiceBaseTest {
assertTrue(e.getMessage().contains("Incompatible schema: exists
schema type STRING, new schema type INT8"));
}
}
+
+ @Test
+ public void testCompatibilityWithEmpty() throws Exception {
+ List<Schema<?>> checkSchemas = List.of(
+ Schema.STRING,
+
Schema.JSON(SchemaDefinition.builder().withPojo(Foo.class).withProperties(PROPS).build()),
+
Schema.AVRO(SchemaDefinition.builder().withPojo(Foo.class).withProperties(PROPS).build()),
+ Schema.KeyValue(Schema.STRING, Schema.STRING)
+ );
+ for (Schema<?> schema : checkSchemas) {
+ SchemaInfo schemaInfo = schema.getSchemaInfo();
+ String topicName = schemaCompatibilityNamespace +
"/testCompatibilityWithEmpty";
+ PostSchemaPayload postSchemaPayload = new
PostSchemaPayload(schemaInfo.getType().toString(),
+ schemaInfo.getSchemaDefinition(), new HashMap<>());
+
+ // check compatibility with empty schema
+ IsCompatibilityResponse isCompatibilityResponse =
+ admin.schemas().testCompatibility(topicName,
postSchemaPayload);
+ assertTrue(isCompatibilityResponse.isCompatibility());
+
assertEquals(isCompatibilityResponse.getSchemaCompatibilityStrategy(),
SchemaCompatibilityStrategy.FULL.name());
+
+ // set schema compatibility strategy is FULL_TRANSITIVE to cover
checkCompatibilityWithAll
+
admin.namespaces().setSchemaCompatibilityStrategy(schemaCompatibilityNamespace,
SchemaCompatibilityStrategy.FULL_TRANSITIVE);
+ isCompatibilityResponse =
admin.schemas().testCompatibility(topicName, postSchemaPayload);
+ assertTrue(isCompatibilityResponse.isCompatibility());
+
assertEquals(isCompatibilityResponse.getSchemaCompatibilityStrategy(),
SchemaCompatibilityStrategy.FULL_TRANSITIVE.name());
+ // set back to FULL
+
admin.namespaces().setSchemaCompatibilityStrategy(schemaCompatibilityNamespace,
SchemaCompatibilityStrategy.FULL);
+ }
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index 2d8b610e04d..3954920b9d0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.service.schema;
-import static org.testng.Assert.assertThrows;
import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertFalse;
@@ -47,7 +46,6 @@ import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import
org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
-import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -406,7 +404,7 @@ public class SchemaServiceTest extends
MockedPulsarServiceBaseTest {
.build(),
SchemaInfo.builder().type(SchemaType.BOOLEAN).schema(new
byte[0])
.build(), KeyValueEncodingType.SEPARATED);
- assertThrows(PulsarAdminException.ServerSideErrorException.class, ()
-> admin.schemas().testCompatibility(topicName, schemaInfo));
+ Assert.assertTrue(admin.schemas().testCompatibility(topicName,
schemaInfo).isCompatibility());
admin.schemas().createSchema(topicName, schemaInfo);
final IsCompatibilityResponse isCompatibilityResponse =
admin.schemas().testCompatibility(topicName, schemaInfo);