codelipenghui commented on a change in pull request #5227: [PIP-44] Separate schema compatibility checker for producer and consumer URL: https://github.com/apache/pulsar/pull/5227#discussion_r333425375
########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java ########## @@ -109,61 +110,55 @@ @Override @NotNull public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema, - SchemaCompatibilityStrategy strategy) { - return getSchema(schemaId, SchemaVersion.Latest) - .thenCompose( - (existingSchema) -> - { - CompletableFuture<Long> maxDeleteVersionFuture; - if (existingSchema == null) { - maxDeleteVersionFuture = completedFuture(NO_DELETED_VERSION); - } else if (existingSchema.schema.isDeleted()) { - maxDeleteVersionFuture = completedFuture(((LongSchemaVersion)schemaStorage - .versionFromBytes(existingSchema.version.bytes())).getVersion()); + SchemaCompatibilityStrategy strategy, boolean isAllowAutoUpdateSchema) { + return trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList -> { + final CompletableFuture<SchemaVersion> completableFuture = new CompletableFuture<>(); + SchemaVersion schemaVersion; + for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) { + if (Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(), + hashFunction.hashBytes(schema.getData()).asBytes())) { + schemaVersion = schemaAndMetadata.version; + completableFuture.complete(schemaVersion); + return completableFuture; + } + } + if (isAllowAutoUpdateSchema) { + CompletableFuture<Void> isCompatibility = new CompletableFuture<>(); + if (schemaAndMetadataList.size() != 0) { + if (isTransitiveStrategy(strategy)) { + isCompatibility = checkCompatibilityWithAll(schema, strategy, schemaAndMetadataList); } else { - if (isTransitiveStrategy(strategy)) { - maxDeleteVersionFuture = checkCompatibilityWithAll(schemaId, schema, strategy); - - } else { - maxDeleteVersionFuture = new CompletableFuture<>(); - trimDeletedSchemaAndGetList(schemaId).thenAccept(schemaAndMetadataList -> { - checkCompatibilityWithLatest(schemaId, schema, strategy).whenComplete((v, ex) -> { - if (ex == null) { - Long maxDeleteVersion = ((LongSchemaVersion)schemaStorage - .versionFromBytes(schemaAndMetadataList.get(0).version.bytes())).getVersion() - 1L; - maxDeleteVersionFuture.complete(maxDeleteVersion); - } else { - maxDeleteVersionFuture.completeExceptionally(ex); - } - }); - }); - } + isCompatibility = checkCompatibilityWithLatest(schemaId, schema, strategy); } - return maxDeleteVersionFuture; + } else { + isCompatibility.complete(null); } - ).thenCompose(maxDeleteVersion -> { - byte[] context = hashFunction.hashBytes(schema.getData()).asBytes(); - SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder() - .setType(Functions.convertFromDomainType(schema.getType())) - .setSchema(ByteString.copyFrom(schema.getData())) - .setSchemaId(schemaId) - .setUser(schema.getUser()) - .setDeleted(false) - .setTimestamp(clock.millis()) - .addAllProps(toPairs(schema.getProps())) - .build(); - return schemaStorage.put(schemaId, info.toByteArray(), context, maxDeleteVersion); - }); + return isCompatibility.thenCompose((v) -> { + byte[] context = hashFunction.hashBytes(schema.getData()).asBytes(); + SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder() + .setType(Functions.convertFromDomainType(schema.getType())) + .setSchema(ByteString.copyFrom(schema.getData())) + .setSchemaId(schemaId) + .setUser(schema.getUser()) + .setDeleted(false) + .setTimestamp(clock.millis()) + .addAllProps(toPairs(schema.getProps())) + .build(); + return schemaStorage.put(schemaId, info.toByteArray(), context); + + }); + } else { + return FutureUtils.exception(new IncompatibleSchemaException("Don't allow auto update schema.")); + } + }); } @Override @NotNull public CompletableFuture<SchemaVersion> deleteSchema(String schemaId, String user) { byte[] deletedEntry = deleted(schemaId, user).toByteArray(); return trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList -> - schemaStorage.put(schemaId, deletedEntry, new byte[]{}, ((LongSchemaVersion)schemaStorage - .versionFromBytes(schemaAndMetadataList.get(0).version.bytes())).getVersion() - 1L)); - + schemaStorage.put(schemaId, deletedEntry, new byte[]{})); Review comment: No need to call trimDeletedSchemaAndGetList ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services