This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 22ac4c842e1f64cd1d84d54ee2bb4c10fb080caf
Author: lipenghui <[email protected]>
AuthorDate: Mon Apr 26 08:27:44 2021 +0800

    Fix schema type check issue when use always compatible strategy (#10367)
    
    Related to #9797
    
    Fix schema type check issue when use always compatible strategy.
    
    1. For non-transitive strategy, only check schema type for the last schema
    2. For transitive strategy, check all schema's type
    3. Get schema by schema data should consider different schema types
    
    (cherry picked from commit 04f8c96a6c0c1c93cd495f46fb33d6e44d6004ea)
---
 .../service/schema/SchemaRegistryServiceImpl.java  |  75 ++++---
 .../SchemaTypeCompatibilityCheckTest.java          | 225 ++++-----------------
 2 files changed, 92 insertions(+), 208 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 37f7c56..0f493ac 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -141,15 +141,6 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
                                                               
SchemaCompatibilityStrategy strategy) {
         return 
trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList ->
                 getSchemaVersionBySchemaData(schemaAndMetadataList, 
schema).thenCompose(schemaVersion -> {
-            if (strategy != SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE && 
schemaAndMetadataList.size() > 0) {
-                for (SchemaAndMetadata metadata : schemaAndMetadataList) {
-                    if (schema.getType() != metadata.schema.getType()) {
-                        return FutureUtil.failedFuture(new 
IncompatibleSchemaException(
-                                String.format("Incompatible schema: exists 
schema type %s, new schema type %s",
-                                metadata.schema.getType(), schema.getType())));
-                    }
-                }
-            }
             if (schemaVersion != null) {
                 return CompletableFuture.completedFuture(schemaVersion);
             }
@@ -298,6 +289,9 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
     public CompletableFuture<SchemaVersion> getSchemaVersionBySchemaData(
             List<SchemaAndMetadata> schemaAndMetadataList,
             SchemaData schemaData) {
+        if (schemaAndMetadataList == null || schemaAndMetadataList.size() == 
0) {
+            return CompletableFuture.completedFuture(null);
+        }
         final CompletableFuture<SchemaVersion> completableFuture = new 
CompletableFuture<>();
         SchemaVersion schemaVersion;
         if (isUsingAvroSchemaParser(schemaData.getType())) {
@@ -308,14 +302,15 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
                 if (isUsingAvroSchemaParser(schemaData.getType())) {
                     Schema.Parser existParser = new Schema.Parser();
                     Schema existSchema = existParser.parse(new 
String(schemaAndMetadata.schema.getData(), UTF_8));
-                    if (newSchema.equals(existSchema)) {
+                    if (newSchema.equals(existSchema) && 
schemaAndMetadata.schema.getType() == schemaData.getType()) {
                         schemaVersion = schemaAndMetadata.version;
                         completableFuture.complete(schemaVersion);
                         return completableFuture;
                     }
                 } else {
                     if 
(Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(),
-                            
hashFunction.hashBytes(schemaData.getData()).asBytes())) {
+                            
hashFunction.hashBytes(schemaData.getData()).asBytes())
+                            && schemaAndMetadata.schema.getType() == 
schemaData.getType()) {
                         schemaVersion = schemaAndMetadata.version;
                         completableFuture.complete(schemaVersion);
                         return completableFuture;
@@ -325,7 +320,8 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
         } else {
             for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
                 if 
(Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(),
-                        
hashFunction.hashBytes(schemaData.getData()).asBytes())) {
+                        hashFunction.hashBytes(schemaData.getData()).asBytes())
+                        && schemaAndMetadata.schema.getType() == 
schemaData.getType()) {
                     schemaVersion = schemaAndMetadata.version;
                     completableFuture.complete(schemaVersion);
                     return completableFuture;
@@ -338,14 +334,23 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
 
     private CompletableFuture<Void> checkCompatibilityWithLatest(String 
schemaId, SchemaData schema,
                                                                     
SchemaCompatibilityStrategy strategy) {
+        if (SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE == strategy) {
+            return CompletableFuture.completedFuture(null);
+        }
         return getSchema(schemaId).thenCompose(existingSchema -> {
             if (existingSchema != null && !existingSchema.schema.isDeleted()) {
                 CompletableFuture<Void> result = new CompletableFuture<>();
-                try {
-                    checkCompatible(existingSchema, schema, strategy);
-                    result.complete(null);
-                } catch (IncompatibleSchemaException e) {
-                    result.completeExceptionally(e);
+                if (existingSchema.schema.getType() != schema.getType()) {
+                    result.completeExceptionally(new 
IncompatibleSchemaException(
+                            String.format("Incompatible schema: exists schema 
type %s, new schema type %s",
+                                    existingSchema.schema.getType(), 
schema.getType())));
+                } else {
+                    try {
+                        checkCompatible(existingSchema, schema, strategy);
+                        result.complete(null);
+                    } catch (IncompatibleSchemaException e) {
+                        result.completeExceptionally(e);
+                    }
                 }
                 return result;
             } else {
@@ -365,17 +370,35 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
                                                               
SchemaCompatibilityStrategy strategy,
                                                               
List<SchemaAndMetadata> schemaAndMetadataList) {
         CompletableFuture<Void> result = new CompletableFuture<>();
-        try {
-            compatibilityChecks.getOrDefault(schema.getType(), 
SchemaCompatibilityCheck.DEFAULT).checkCompatible(schemaAndMetadataList
-                    .stream()
-                    .map(schemaAndMetadata -> schemaAndMetadata.schema)
-                    .collect(Collectors.toList()), schema, strategy);
+        if (strategy == SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE) {
             result.complete(null);
-        } catch (Exception e) {
-            if (e instanceof IncompatibleSchemaException) {
-                result.completeExceptionally(e);
+        } else {
+            SchemaAndMetadata breakSchema = null;
+            for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
+                if (schemaAndMetadata.schema.getType() != schema.getType()) {
+                    breakSchema = schemaAndMetadata;
+                    break;
+                }
+            }
+            if (breakSchema == null) {
+                try {
+                    compatibilityChecks.getOrDefault(schema.getType(), 
SchemaCompatibilityCheck.DEFAULT)
+                            .checkCompatible(schemaAndMetadataList
+                                    .stream()
+                                    .map(schemaAndMetadata -> 
schemaAndMetadata.schema)
+                                    .collect(Collectors.toList()), schema, 
strategy);
+                    result.complete(null);
+                } catch (Exception e) {
+                    if (e instanceof IncompatibleSchemaException) {
+                        result.completeExceptionally(e);
+                    } else {
+                        result.completeExceptionally(new 
IncompatibleSchemaException(e));
+                    }
+                }
             } else {
-                result.completeExceptionally(new 
IncompatibleSchemaException(e));
+                result.completeExceptionally(new IncompatibleSchemaException(
+                        String.format("Incompatible schema: exists schema type 
%s, new schema type %s",
+                                breakSchema.schema.getType(), 
schema.getType())));
             }
         }
         return result;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
index c24822f..bc711c5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
@@ -30,14 +30,18 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.schema.Schemas;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.expectThrows;
 
@@ -173,101 +177,6 @@ public class SchemaTypeCompatibilityCheckTest extends 
MockedPulsarServiceBaseTes
     }
 
     @Test
-    public void structTypeProducerProducerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "structTypeProducerProducerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
-                .topic(topicName)
-                .create();
-
-        pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
-                .topic(topicName)
-                .create();
-    }
-
-    @Test
-    public void structTypeProducerConsumerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        final String subName = "my-sub";
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "structTypeProducerConsumerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
-                .topic(topicName)
-                .create();
-
-        pulsarClient.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName)
-                .subscribe();
-    }
-
-    @Test
-    public void structTypeConsumerProducerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        final String subName = "my-sub";
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "structTypeConsumerProducerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newConsumer(Schema.JSON(Schemas.PersonOne.class))
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName)
-                .subscribe();
-
-        pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
-                .topic(topicName)
-                .create();
-    }
-
-    @Test
-    public void structTypeConsumerConsumerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        final String subName = "my-sub";
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "structTypeConsumerConsumerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newConsumer(Schema.JSON(Schemas.PersonOne.class))
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName + "1")
-                .subscribe();
-
-        pulsarClient.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName + "2")
-                .subscribe();
-    }
-
-    @Test
     public void primitiveTypeProducerProducerUndefinedCompatible() throws 
Exception {
         admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.UNDEFINED);
 
@@ -371,98 +280,50 @@ public class SchemaTypeCompatibilityCheckTest extends 
MockedPulsarServiceBaseTes
     }
 
     @Test
-    public void primitiveTypeProducerProducerAlwaysCompatible() throws 
Exception {
+    public void testAlwaysCompatible() throws Exception {
         admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        String topicName = TopicName.get(
+        final String topicName = TopicName.get(
                 TopicDomain.persistent.value(),
                 PUBLIC_TENANT,
                 namespace,
-                "primitiveTypeProducerProducerAlwaysCompatible"
+                "testAlwaysCompatible" + UUID.randomUUID().toString()
         ).toString();
-
-        pulsarClient.newProducer(Schema.INT32)
-                .topic(topicName)
-                .create();
-
-        pulsarClient.newProducer(Schema.STRING)
-                .topic(topicName)
-                .create();
-    }
-
-    @Test
-    public void primitiveTypeProducerConsumerAlwaysCompatible() throws 
Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        final String subName = "my-sub";
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "primitiveTypeProducerConsumerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newProducer(Schema.INT32)
-                .topic(topicName)
-                .create();
-
-        pulsarClient.newConsumer(Schema.STRING)
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName + "2")
-                .subscribe();
-    }
-
-    @Test
-    public void primitiveTypeConsumerProducerAlwaysCompatible() throws 
Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        final String subName = "my-sub";
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "primitiveTypeConsumerProducerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newConsumer(Schema.INT32)
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName)
-                .subscribe();
-
-        pulsarClient.newProducer(Schema.STRING)
-                .topic(topicName)
-                .create();
-    }
-
-    @Test
-    public void primitiveTypeConsumerConsumerAlwaysCompatible() throws 
Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        final String subName = "my-sub";
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "primitiveTypeConsumerConsumerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newConsumer(Schema.INT32)
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName + "1")
-                .subscribe();
-
-        pulsarClient.newConsumer(Schema.STRING)
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName + "2")
-                .subscribe();
+        Schema<?>[] schemas = new Schema[] {
+                Schema.AVRO(Schemas.PersonOne.class),
+                Schema.AVRO(Schemas.PersonFour.class),
+                Schema.JSON(Schemas.PersonOne.class),
+                Schema.JSON(Schemas.PersonFour.class),
+                Schema.INT8,
+                Schema.INT16,
+                Schema.INT32,
+                Schema.INT64,
+                Schema.DATE,
+                Schema.BOOL,
+                Schema.DOUBLE,
+                Schema.STRING,
+                Schema.BYTES,
+                Schema.FLOAT,
+                Schema.INSTANT,
+                Schema.BYTEBUFFER,
+                Schema.TIME,
+                Schema.TIMESTAMP,
+                Schema.LOCAL_DATE,
+                Schema.LOCAL_DATE_TIME,
+                Schema.LOCAL_TIME
+        };
+
+        for (Schema<?> schema : schemas) {
+            pulsarClient.newProducer(schema)
+                    .topic(topicName)
+                    .create();
+        }
+
+        for (Schema<?> schema : schemas) {
+            pulsarClient.newConsumer(schema)
+                    .topic(topicName)
+                    .subscriptionName(UUID.randomUUID().toString())
+                    .subscribe();
+        }
     }
 
 }

Reply via email to