This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 8b62272 [Issue 12757][broker] add broker config
isAllowAutoUpdateSchema (#12786)
8b62272 is described below
commit 8b622722dcb2c9d99e40b9e9d5b713a845677f42
Author: JiangHaiting <[email protected]>
AuthorDate: Thu Nov 18 20:37:37 2021 +0800
[Issue 12757][broker] add broker config isAllowAutoUpdateSchema (#12786)
(cherry picked from commit fa7be236efcc6772e0aac05f25f8d5f3cf0ad741)
---
conf/broker.conf | 4 ++
conf/standalone.conf | 4 ++
.../apache/pulsar/broker/ServiceConfiguration.java | 8 +++
.../pulsar/broker/service/AbstractTopic.java | 38 ++++++----
.../SchemaCompatibilityCheckTest.java | 81 ++++++++++++++++++++++
.../pulsar/common/policies/data/Policies.java | 2 +-
6 files changed, 121 insertions(+), 16 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 552bdb4..a51a5f7 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -264,6 +264,10 @@ brokerMaxConnections=0
# The maximum number of connections per IP. If it exceeds, new connections are
rejected.
brokerMaxConnectionsPerIp=0
+# Allow schema to be auto updated at broker level. User can override this by
+# 'is_allow_auto_update_schema' of namespace policy.
+isAllowAutoUpdateSchemaEnabled=true
+
# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 7f52eca..d8c1370 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -169,6 +169,10 @@ defaultNumberOfNamespaceBundles=4
# Using a value of 0, is disabling maxTopicsPerNamespace-limit check.
maxTopicsPerNamespace=0
+# Allow schema to be auto updated at broker level. User can override this by
+# 'is_allow_auto_update_schema' of namespace policy.
+isAllowAutoUpdateSchemaEnabled=true
+
# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 5eb0a6b..21c0e5f 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -554,6 +554,14 @@ public class ServiceConfiguration implements
PulsarConfiguration {
private int brokerMaxConnectionsPerIp = 0;
@FieldContext(
+ category = CATEGORY_POLICIES,
+ dynamic = true,
+ doc = "Allow schema to be auto updated at broker level. User can
override this by 'is_allow_auto_update_schema'"
+ + " of namespace policy. This is enabled by default."
+ )
+ private boolean isAllowAutoUpdateSchemaEnabled = true;
+
+ @FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "Enable check for minimum allowed client library version"
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index ebd05b6..5d24147 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -98,7 +98,7 @@ public abstract class AbstractTopic implements Topic {
@Getter
protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy
=
SchemaCompatibilityStrategy.FULL;
- protected volatile boolean isAllowAutoUpdateSchema = true;
+ protected volatile Boolean isAllowAutoUpdateSchema;
// schema validation enforced flag
protected volatile boolean schemaValidationEnforced = false;
@@ -330,20 +330,28 @@ public abstract class AbstractTopic implements Topic {
String base = TopicName.get(getName()).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
SchemaRegistryService schemaRegistryService =
brokerService.pulsar().getSchemaRegistryService();
- return isAllowAutoUpdateSchema ? schemaRegistryService
- .putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy)
- :
schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList
->
-
schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList,
schema)
- .thenCompose(schemaVersion -> {
- if (schemaVersion == null) {
- return FutureUtil
- .failedFuture(
- new IncompatibleSchemaException(
- "Schema not found and schema
auto updating is disabled."));
- } else {
- return
CompletableFuture.completedFuture(schemaVersion);
- }
- }));
+
+ if (allowAutoUpdateSchema()) {
+ return schemaRegistryService.putSchemaIfAbsent(id, schema,
schemaCompatibilityStrategy);
+ } else {
+ return
schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList
->
+
schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList,
schema)
+ .thenCompose(schemaVersion -> {
+ if (schemaVersion == null) {
+ return FutureUtil.failedFuture(new
IncompatibleSchemaException(
+ "Schema not found and schema auto
updating is disabled."));
+ } else {
+ return
CompletableFuture.completedFuture(schemaVersion);
+ }
+ }));
+ }
+ }
+
+ private boolean allowAutoUpdateSchema() {
+ if (isAllowAutoUpdateSchema == null) {
+ return
brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
+ }
+ return isAllowAutoUpdateSchema;
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index 293f71d..80168b9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -218,6 +218,87 @@ public class SchemaCompatibilityCheckTest extends
MockedPulsarServiceBaseTest {
}
}
+ @Test(dataProvider = "AllCheckSchemaCompatibilityStrategy")
+ public void
testBrokerAllowAutoUpdateSchemaDisabled(SchemaCompatibilityStrategy
schemaCompatibilityStrategy)
+ throws Exception {
+
+ final String tenant = PUBLIC_TENANT;
+ final String topic = "test-consumer-compatibility";
+ String namespace = "test-namespace-" + randomName(16);
+ String fqtn = TopicName.get(
+ TopicDomain.persistent.value(),
+ tenant,
+ namespace,
+ topic
+ ).toString();
+
+ NamespaceName namespaceName = NamespaceName.get(tenant, namespace);
+
+ admin.namespaces().createNamespace(
+ tenant + "/" + namespace,
+ Sets.newHashSet(CLUSTER_NAME)
+ );
+
+
assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()),
+ SchemaCompatibilityStrategy.FULL);
+
+
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(),
schemaCompatibilityStrategy);
+ admin.schemas().createSchema(fqtn,
Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
+
+
+ pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false);
+ ProducerBuilder<Schemas.PersonTwo> producerThreeBuilder = pulsarClient
+
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
+ (false).withSupportSchemaVersioning(true).
+ withPojo(Schemas.PersonTwo.class).build()))
+ .topic(fqtn);
+ try {
+ producerThreeBuilder.create();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("Schema not found and
schema auto updating is disabled."));
+ }
+
+ pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(true);
+ ConsumerBuilder<Schemas.PersonTwo> comsumerBuilder =
pulsarClient.newConsumer(Schema.AVRO(
+
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
+
(false).withSupportSchemaVersioning(true).
+ withPojo(Schemas.PersonTwo.class).build()))
+ .subscriptionName("test")
+ .topic(fqtn);
+
+ Producer<Schemas.PersonTwo> producer = producerThreeBuilder.create();
+ Consumer<Schemas.PersonTwo> consumerTwo = comsumerBuilder.subscribe();
+
+ producer.send(new Schemas.PersonTwo(2, "Lucy"));
+ Message<Schemas.PersonTwo> message = consumerTwo.receive();
+
+ Schemas.PersonTwo personTwo = message.getValue();
+ consumerTwo.acknowledge(message);
+
+ assertEquals(personTwo.getId(), 2);
+ assertEquals(personTwo.getName(), "Lucy");
+
+ producer.close();
+ consumerTwo.close();
+
+ pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false);
+
+ producer = producerThreeBuilder.create();
+ consumerTwo = comsumerBuilder.subscribe();
+
+ producer.send(new Schemas.PersonTwo(2, "Lucy"));
+ message = consumerTwo.receive();
+
+ personTwo = message.getValue();
+ consumerTwo.acknowledge(message);
+
+ assertEquals(personTwo.getId(), 2);
+ assertEquals(personTwo.getName(), "Lucy");
+
+ consumerTwo.close();
+ producer.close();
+ }
+
@Test(dataProvider = "AllCheckSchemaCompatibilityStrategy")
public void testIsAutoUpdateSchema(SchemaCompatibilityStrategy
schemaCompatibilityStrategy) throws Exception {
final String tenant = PUBLIC_TENANT;
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index ff773f6..8622715 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -107,7 +107,7 @@ public class Policies {
public SchemaCompatibilityStrategy schema_compatibility_strategy =
SchemaCompatibilityStrategy.UNDEFINED;
@SuppressWarnings("checkstyle:MemberName")
- public boolean is_allow_auto_update_schema = true;
+ public Boolean is_allow_auto_update_schema = null;
@SuppressWarnings("checkstyle:MemberName")
public boolean schema_validation_enforced = false;