This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e01940d [Schema] Schema comparison logic change. (#9612)
e01940d is described below
commit e01940dd4fc1e81e44c948e9235a37e780ca610a
Author: congbo <[email protected]>
AuthorDate: Tue Feb 23 21:24:14 2021 +0800
[Schema] Schema comparison logic change. (#9612)
* [Schema] Schema comparison logic change.
* Add the test logic
* change the compare logic
* reimplement
* Fix the test
* Fix some comment
* Change judge to switch.
Co-authored-by: congbo <[email protected]>
---
.../service/schema/SchemaRegistryServiceImpl.java | 59 ++++--
.../broker/service/schema/SchemaServiceTest.java | 201 ++++++++-------------
.../java/org/apache/pulsar/schema/SchemaTest.java | 17 +-
.../SchemaCompatibilityCheckTest.java | 52 ++++++
4 files changed, 187 insertions(+), 142 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 364628f..d9dd91f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.schema;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.isNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs;
@@ -40,6 +41,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.commons.collections.CollectionUtils;
import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
@@ -141,17 +143,17 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
if (schemaVersion != null) {
return CompletableFuture.completedFuture(schemaVersion);
}
- CompletableFuture<Void> checkCompatibilityFurture = new
CompletableFuture<>();
+ CompletableFuture<Void> checkCompatibilityFuture = new
CompletableFuture<>();
if (schemaAndMetadataList.size() != 0) {
if (isTransitiveStrategy(strategy)) {
- checkCompatibilityFurture =
checkCompatibilityWithAll(schema, strategy, schemaAndMetadataList);
+ checkCompatibilityFuture =
checkCompatibilityWithAll(schema, strategy, schemaAndMetadataList);
} else {
- checkCompatibilityFurture =
checkCompatibilityWithLatest(schemaId, schema, strategy);
+ checkCompatibilityFuture =
checkCompatibilityWithLatest(schemaId, schema, strategy);
}
} else {
- checkCompatibilityFurture.complete(null);
+ checkCompatibilityFuture.complete(null);
}
- return checkCompatibilityFurture.thenCompose(v -> {
+ return checkCompatibilityFuture.thenCompose(v -> {
byte[] context =
hashFunction.hashBytes(schema.getData()).asBytes();
SchemaRegistryFormat.SchemaInfo info =
SchemaRegistryFormat.SchemaInfo.newBuilder()
.setType(Functions.convertFromDomainType(schema.getType()))
@@ -293,12 +295,36 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
SchemaData schemaData) {
final CompletableFuture<SchemaVersion> completableFuture = new
CompletableFuture<>();
SchemaVersion schemaVersion;
- for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
- if
(Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(),
- hashFunction.hashBytes(schemaData.getData()).asBytes())) {
- schemaVersion = schemaAndMetadata.version;
- completableFuture.complete(schemaVersion);
- return completableFuture;
+ if (isUsingAvroSchemaParser(schemaData.getType())) {
+ Schema.Parser parser = new Schema.Parser();
+ Schema newSchema = parser.parse(new String(schemaData.getData(),
UTF_8));
+
+ for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
+ if (isUsingAvroSchemaParser(schemaData.getType())) {
+ Schema.Parser existParser = new Schema.Parser();
+ Schema existSchema = existParser.parse(new
String(schemaAndMetadata.schema.getData(), UTF_8));
+ if (newSchema.equals(existSchema)) {
+ schemaVersion = schemaAndMetadata.version;
+ completableFuture.complete(schemaVersion);
+ return completableFuture;
+ }
+ } else {
+ if
(Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(),
+
hashFunction.hashBytes(schemaData.getData()).asBytes())) {
+ schemaVersion = schemaAndMetadata.version;
+ completableFuture.complete(schemaVersion);
+ return completableFuture;
+ }
+ }
+ }
+ } else {
+ for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
+ if
(Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(),
+
hashFunction.hashBytes(schemaData.getData()).asBytes())) {
+ schemaVersion = schemaAndMetadata.version;
+ completableFuture.complete(schemaVersion);
+ return completableFuture;
+ }
}
}
completableFuture.complete(null);
@@ -467,4 +493,15 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
}
}
+ public static boolean isUsingAvroSchemaParser(SchemaType type) {
+ switch (type) {
+ case AVRO:
+ case JSON:
+ case PROTOBUF:
+ return true;
+ default:
+ return false;
+ }
+ }
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index 6b2f192..a623f05 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -42,8 +42,6 @@ import
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import
org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
-import org.apache.pulsar.common.protocol.schema.SchemaStorage;
-import org.apache.pulsar.common.protocol.schema.StoredSchema;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
@@ -53,37 +51,27 @@ import org.testng.annotations.Test;
public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
- private static Clock MockClock = Clock.fixed(Instant.EPOCH,
ZoneId.systemDefault());
-
- private String schemaId1 = "1/2/3/4";
- private String userId = "user";
-
- private SchemaData schema1 = SchemaData.builder()
- .user(userId)
- .type(SchemaType.JSON)
- .timestamp(MockClock.millis())
- .isDeleted(false)
- .data("message { required int64 a = 1};".getBytes())
- .props(new TreeMap<>())
- .build();
-
- private SchemaData schema2 = SchemaData.builder()
- .user(userId)
- .type(SchemaType.JSON)
- .timestamp(MockClock.millis())
- .isDeleted(false)
- .data("message { required int64 b = 1};".getBytes())
- .props(new TreeMap<>())
- .build();
-
- private SchemaData schema3 = SchemaData.builder()
- .user(userId)
- .type(SchemaType.JSON)
- .timestamp(MockClock.millis())
- .isDeleted(false)
- .data("message { required int64 c = 1};".getBytes())
- .props(new TreeMap<>())
- .build();
+ private static final Clock MockClock = Clock.fixed(Instant.EPOCH,
ZoneId.systemDefault());
+
+ private final String schemaId1 = "1/2/3/4";
+ private final static String userId = "user";
+
+ private final static String schemaJson1 =
+
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema"
+
+
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}";
+ private final static SchemaData schemaData1 = getSchemaData(schemaJson1);
+
+ private final static String schemaJson2 =
+
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema"
+
+
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"},"
+
+
"{\"name\":\"field2\",\"type\":\"string\",\"default\":\"foo\"}]}";
+ private final static SchemaData schemaData2 = getSchemaData(schemaJson2);
+
+ private final static String schemaJson3 =
+
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema"
+
+
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"},"
+
+ "{\"name\":\"field2\",\"type\":\"string\"}]}";
+ private final static SchemaData schemaData3 = getSchemaData(schemaJson3);
private SchemaRegistryServiceImpl schemaRegistryService;
@@ -109,10 +97,10 @@ public class SchemaServiceTest extends
MockedPulsarServiceBaseTest {
@Test
public void writeReadBackDeleteSchemaEntry() throws Exception {
- putSchema(schemaId1, schema1, version(0));
+ putSchema(schemaId1, schemaData1, version(0));
SchemaData latest = getLatestSchema(schemaId1, version(0));
- assertEquals(schema1, latest);
+ assertEquals(schemaData1, latest);
deleteSchema(schemaId1, version(1));
@@ -121,154 +109,129 @@ public class SchemaServiceTest extends
MockedPulsarServiceBaseTest {
@Test
public void findSchemaVersionTest() throws Exception {
- putSchema(schemaId1, schema1, version(0));
- assertEquals(0, schemaRegistryService.findSchemaVersion(schemaId1,
schema1).get().longValue());
+ putSchema(schemaId1, schemaData1, version(0));
+ assertEquals(0, schemaRegistryService.findSchemaVersion(schemaId1,
schemaData1).get().longValue());
}
@Test
public void deleteSchemaAndAddSchema() throws Exception {
- putSchema(schemaId1, schema1, version(0));
+ putSchema(schemaId1, schemaData1, version(0));
SchemaData latest = getLatestSchema(schemaId1, version(0));
- assertEquals(schema1, latest);
+ assertEquals(schemaData1, latest);
deleteSchema(schemaId1, version(1));
assertNull(schemaRegistryService.getSchema(schemaId1).get());
- putSchema(schemaId1, schema1, version(2));
+ putSchema(schemaId1, schemaData1, version(2));
latest = getLatestSchema(schemaId1, version(2));
- assertEquals(schema1, latest);
+ assertEquals(schemaData1, latest);
}
@Test
public void getReturnsTheLastWrittenEntry() throws Exception {
- putSchema(schemaId1, schema1, version(0));
- putSchema(schemaId1, schema2, version(1));
+ putSchema(schemaId1, schemaData1, version(0));
+ putSchema(schemaId1, schemaData2, version(1));
SchemaData latest = getLatestSchema(schemaId1, version(1));
- assertEquals(schema2, latest);
+ assertEquals(schemaData2, latest);
}
@Test
public void getByVersionReturnsTheCorrectEntry() throws Exception {
- putSchema(schemaId1, schema1, version(0));
- putSchema(schemaId1, schema2, version(1));
+ putSchema(schemaId1, schemaData1, version(0));
+ putSchema(schemaId1, schemaData2, version(1));
SchemaData version0 = getSchema(schemaId1, version(0));
- assertEquals(schema1, version0);
+ assertEquals(schemaData1, version0);
}
@Test
public void getByVersionReturnsTheCorrectEntry2() throws Exception {
- putSchema(schemaId1, schema1, version(0));
- putSchema(schemaId1, schema2, version(1));
+ putSchema(schemaId1, schemaData1, version(0));
+ putSchema(schemaId1, schemaData2, version(1));
SchemaData version1 = getSchema(schemaId1, version(1));
- assertEquals(schema2, version1);
+ assertEquals(schemaData2, version1);
}
@Test
public void getByVersionReturnsTheCorrectEntry3() throws Exception {
- putSchema(schemaId1, schema1, version(0));
+ putSchema(schemaId1, schemaData1, version(0));
SchemaData version1 = getSchema(schemaId1, version(0));
- assertEquals(schema1, version1);
+ assertEquals(schemaData1, version1);
}
@Test
public void getAllVersionSchema() throws Exception {
- putSchema(schemaId1, schema1, version(0));
- putSchema(schemaId1, schema2, version(1));
- putSchema(schemaId1, schema3, version(2));
+ putSchema(schemaId1, schemaData1, version(0));
+ putSchema(schemaId1, schemaData2, version(1));
+ putSchema(schemaId1, schemaData3, version(2));
List<SchemaData> allSchemas = getAllSchemas(schemaId1);
- assertEquals(schema1, allSchemas.get(0));
- assertEquals(schema2, allSchemas.get(1));
- assertEquals(schema3, allSchemas.get(2));
+ assertEquals(schemaData1, allSchemas.get(0));
+ assertEquals(schemaData2, allSchemas.get(1));
+ assertEquals(schemaData3, allSchemas.get(2));
}
@Test
public void addLotsOfEntriesThenDelete() throws Exception {
- SchemaData randomSchema1 = randomSchema();
- SchemaData randomSchema2 = randomSchema();
- SchemaData randomSchema3 = randomSchema();
- SchemaData randomSchema4 = randomSchema();
- SchemaData randomSchema5 = randomSchema();
- SchemaData randomSchema6 = randomSchema();
- SchemaData randomSchema7 = randomSchema();
-
- putSchema(schemaId1, randomSchema1, version(0));
- putSchema(schemaId1, randomSchema2, version(1));
- putSchema(schemaId1, randomSchema3, version(2));
- putSchema(schemaId1, randomSchema4, version(3));
- putSchema(schemaId1, randomSchema5, version(4));
- putSchema(schemaId1, randomSchema6, version(5));
- putSchema(schemaId1, randomSchema7, version(6));
+
+ putSchema(schemaId1, schemaData1, version(0));
+ putSchema(schemaId1, schemaData2, version(1));
+ putSchema(schemaId1, schemaData3, version(2));
SchemaData version0 = getSchema(schemaId1, version(0));
- assertEquals(randomSchema1, version0);
+ assertEquals(schemaData1, version0);
SchemaData version1 = getSchema(schemaId1, version(1));
- assertEquals(randomSchema2, version1);
+ assertEquals(schemaData2, version1);
SchemaData version2 = getSchema(schemaId1, version(2));
- assertEquals(randomSchema3, version2);
-
- SchemaData version3 = getSchema(schemaId1, version(3));
- assertEquals(randomSchema4, version3);
+ assertEquals(schemaData3, version2);
- SchemaData version4 = getSchema(schemaId1, version(4));
- assertEquals(randomSchema5, version4);
+ deleteSchema(schemaId1, version(3));
- SchemaData version5 = getSchema(schemaId1, version(5));
- assertEquals(randomSchema6, version5);
-
- SchemaData version6 = getSchema(schemaId1, version(6));
- assertEquals(randomSchema7, version6);
-
- deleteSchema(schemaId1, version(7));
-
- SchemaRegistry.SchemaAndMetadata version7 =
schemaRegistryService.getSchema(schemaId1, version(7)).get();
- assertNull(version7);
+ SchemaRegistry.SchemaAndMetadata version3 =
schemaRegistryService.getSchema(schemaId1, version(3)).get();
+ assertNull(version3);
}
@Test
public void writeSchemasToDifferentIds() throws Exception {
- SchemaData schemaWithDifferentId = schema3;
-
- putSchema(schemaId1, schema1, version(0));
+ putSchema(schemaId1, schemaData1, version(0));
String schemaId2 = "id2";
- putSchema(schemaId2, schemaWithDifferentId, version(0));
+ putSchema(schemaId2, schemaData3, version(0));
SchemaData withFirstId = getLatestSchema(schemaId1, version(0));
SchemaData withDifferentId = getLatestSchema(schemaId2, version(0));
- assertEquals(schema1, withFirstId);
- assertEquals(schema3, withDifferentId);
+ assertEquals(schemaData1, withFirstId);
+ assertEquals(schemaData3, withDifferentId);
}
@Test
public void dontReAddExistingSchemaAtRoot() throws Exception {
- putSchema(schemaId1, schema1, version(0));
- putSchema(schemaId1, schema1, version(0));
- putSchema(schemaId1, schema1, version(0));
+ putSchema(schemaId1, schemaData1, version(0));
+ putSchema(schemaId1, schemaData1, version(0));
+ putSchema(schemaId1, schemaData1, version(0));
}
@Test
public void trimDeletedSchemaAndGetListTest() throws Exception {
List<SchemaAndMetadata> list = new ArrayList<>();
CompletableFuture<SchemaVersion> put =
schemaRegistryService.putSchemaIfAbsent(
- schemaId1, schema1, SchemaCompatibilityStrategy.FULL);
+ schemaId1, schemaData1, SchemaCompatibilityStrategy.FULL);
SchemaVersion newVersion = put.get();
- list.add(new SchemaAndMetadata(schemaId1, schema1, newVersion));
+ list.add(new SchemaAndMetadata(schemaId1, schemaData1, newVersion));
put = schemaRegistryService.putSchemaIfAbsent(
- schemaId1, schema2, SchemaCompatibilityStrategy.FULL);
+ schemaId1, schemaData2, SchemaCompatibilityStrategy.FULL);
newVersion = put.get();
- list.add(new SchemaAndMetadata(schemaId1, schema2, newVersion));
+ list.add(new SchemaAndMetadata(schemaId1, schemaData2, newVersion));
List<SchemaAndMetadata> list1 =
schemaRegistryService.trimDeletedSchemaAndGetList(schemaId1).get();
assertEquals(list.size(), list1.size());
HashFunction hashFunction = Hashing.sha256();
@@ -285,34 +248,14 @@ public class SchemaServiceTest extends
MockedPulsarServiceBaseTest {
@Test
public void dontReAddExistingSchemaInMiddle() throws Exception {
- putSchema(schemaId1, randomSchema(), version(0));
- putSchema(schemaId1, schema2, version(1));
- putSchema(schemaId1, randomSchema(), version(2));
- putSchema(schemaId1, randomSchema(), version(3));
- putSchema(schemaId1, randomSchema(), version(4));
- putSchema(schemaId1, randomSchema(), version(5));
- putSchema(schemaId1, schema2, version(1));
+ putSchema(schemaId1, schemaData1, version(0));
+ putSchema(schemaId1, schemaData2, version(1));
+ putSchema(schemaId1, schemaData3, version(2));
+ putSchema(schemaId1, schemaData2, version(1));
}
@Test(expectedExceptions = ExecutionException.class)
public void checkIsCompatible() throws Exception {
- String schemaJson1 =
-
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema"
+
-
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}";
- SchemaData schemaData1 = getSchemaData(schemaJson1);
-
- String schemaJson2 =
-
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema"
+
-
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"},"
+
-
"{\"name\":\"field2\",\"type\":\"string\",\"default\":\"foo\"}]}";
- SchemaData schemaData2 = getSchemaData(schemaJson2);
-
- String schemaJson3 =
-
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema"
+
-
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"},"
+
- "{\"name\":\"field2\",\"type\":\"string\"}]}";
- SchemaData schemaData3 = getSchemaData(schemaJson3);
-
putSchema(schemaId1, schemaData1, version(0),
SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
putSchema(schemaId1, schemaData2, version(1),
SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
@@ -381,7 +324,7 @@ public class SchemaServiceTest extends
MockedPulsarServiceBaseTest {
.build();
}
- private SchemaData getSchemaData(String schemaJson) {
+ private static SchemaData getSchemaData(String schemaJson) {
return
SchemaData.builder().data(schemaJson.getBytes()).type(SchemaType.AVRO).user(userId).build();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index b8580fd..595da2b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -21,12 +21,13 @@ package org.apache.pulsar.schema;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static
org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.util.Collections;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
@@ -37,6 +38,7 @@ import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.schema.SchemaType;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -193,4 +195,15 @@ public class SchemaTest extends
MockedPulsarServiceBaseTest {
consumer.close();
consumer1.close();
}
+
+ @Test
+ public void testIsUsingAvroSchemaParser() {
+ for (SchemaType value : SchemaType.values()) {
+ if (value == SchemaType.AVRO || value == SchemaType.JSON || value
== SchemaType.PROTOBUF) {
+
assertTrue(SchemaRegistryServiceImpl.isUsingAvroSchemaParser(value));
+ } else {
+
assertFalse(SchemaRegistryServiceImpl.isUsingAvroSchemaParser(value));
+ }
+ }
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index 37c1942..565aeb5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -35,6 +35,8 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.schema.Schemas;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -45,8 +47,11 @@ import org.testng.annotations.Test;
import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
@Slf4j
@@ -294,6 +299,53 @@ public class SchemaCompatibilityCheckTest extends
MockedPulsarServiceBaseTest {
producer.close();
}
+ @Test
+ public void testSchemaComparison() throws Exception {
+ final String tenant = PUBLIC_TENANT;
+ final String topic = "test-schema-comparison";
+
+ String namespace = "test-namespace-" + randomName(16);
+ String fqtn = TopicName.get(
+ TopicDomain.persistent.value(),
+ tenant,
+ namespace,
+ topic
+ ).toString();
+
+ NamespaceName namespaceName = NamespaceName.get(tenant, namespace);
+
+ admin.namespaces().createNamespace(
+ tenant + "/" + namespace,
+ Sets.newHashSet(CLUSTER_NAME)
+ );
+
+
assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()),
+ SchemaCompatibilityStrategy.FULL);
+ byte[] changeSchemaBytes = (new
String(Schema.AVRO(Schemas.PersonOne.class)
+ .getSchemaInfo().getSchema(), UTF_8) + "/n /n
/n").getBytes();
+ SchemaInfo schemaInfo =
SchemaInfo.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build();
+ admin.schemas().createSchema(fqtn, schemaInfo);
+
+
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
+ ProducerBuilder<Schemas.PersonOne> producerOneBuilder = pulsarClient
+ .newProducer(Schema.AVRO(Schemas.PersonOne.class))
+ .topic(fqtn);
+ producerOneBuilder.create().close();
+
+ assertArrayEquals(changeSchemaBytes,
admin.schemas().getSchemaInfo(fqtn).getSchema());
+
+ ProducerBuilder<Schemas.PersonThree> producerThreeBuilder =
pulsarClient
+ .newProducer(Schema.AVRO(Schemas.PersonThree.class))
+ .topic(fqtn);
+
+ try {
+ producerThreeBuilder.create();
+ fail();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("Schema not found and
schema auto updating is disabled."));
+ }
+ }
+
@Test(dataProvider = "AllCheckSchemaCompatibilityStrategy")
public void
testProducerSendWithOldSchemaAndConsumerCanRead(SchemaCompatibilityStrategy
schemaCompatibilityStrategy) throws Exception {
final String tenant = PUBLIC_TENANT;