This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b45adde [Issue 9602] Add schema type validation (#9797)
b45adde is described below
commit b45added9d27adf5218ca005a82f4faa18b7ec44
Author: limingnihao <[email protected]>
AuthorDate: Fri Mar 12 15:37:05 2021 +0800
[Issue 9602] Add schema type validation (#9797)
Fixes #9602
### Motivation
The schema should not be able to modify the type after it is first created.
Unless he is Always compatible.
### Modifications
When uploading a schema, it is verified that the type of the newly uploaded
schema is consistent with the schema in the metadata.
When checkCompatible, change the Primitive type from checking only to
checking all.
### Verifying this change
* When a producer is created, the schema is defined, and when a producer is
created again, the schema type is not always the same, otherwise an exception
will be thrown.
* When a producer is created, the schema is defined, and when a consumer is
created again, the schema type is not always the same, otherwise an exception
will be thrown.
* When a consumer is created, the schema is defined, and when a producer is
created again, the schema type is not always the same, otherwise an exception
will be thrown.
* When a consumer is created, the schema is defined, and when a consumer is
created again, the schema type is not always the same, otherwise an exception
will be thrown.
* When compatibility is set to always compatible. No exceptions are thrown.
### Does this pull request potentially affect one of the following parts:
*If `yes` was chosen, please highlight the changes*
- The schema: (yes)
---
.../service/schema/SchemaRegistryServiceImpl.java | 28 +-
.../SchemaTypeCompatibilityCheckTest.java | 468 +++++++++++++++++++++
2 files changed, 485 insertions(+), 11 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index d9dd91f..b06531a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -140,6 +140,15 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
SchemaCompatibilityStrategy strategy) {
return
trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList ->
getSchemaVersionBySchemaData(schemaAndMetadataList,
schema).thenCompose(schemaVersion -> {
+ if (strategy != SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE &&
schemaAndMetadataList.size() > 0) {
+ for (SchemaAndMetadata metadata : schemaAndMetadataList) {
+ if (schema.getType() != metadata.schema.getType()) {
+ return FutureUtil.failedFuture(new
IncompatibleSchemaException(
+ String.format("Incompatible schema: exists
schema type %s, new schema type %s",
+ metadata.schema.getType(), schema.getType())));
+ }
+ }
+ }
if (schemaVersion != null) {
return CompletableFuture.completedFuture(schemaVersion);
}
@@ -242,17 +251,14 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
SchemaHash existingHash = SchemaHash.of(existingSchema.schema);
SchemaHash newHash = SchemaHash.of(newSchema);
SchemaData existingSchemaData = existingSchema.schema;
- if (existingSchemaData.getType().isPrimitive()) {
- if (newSchema.getType() != existingSchemaData.getType()) {
- throw new
IncompatibleSchemaException(String.format("Incompatible primitive schema: "
- + "exists schema type %s, new schema type %s",
- existingSchemaData.getType(), newSchema.getType()));
- }
- } else {
- if (!newHash.equals(existingHash)) {
- compatibilityChecks.getOrDefault(newSchema.getType(),
SchemaCompatibilityCheck.DEFAULT)
- .checkCompatible(existingSchemaData, newSchema,
strategy);
- }
+ if (newSchema.getType() != existingSchemaData.getType()) {
+ throw new IncompatibleSchemaException(String.format("Incompatible
schema: "
+ + "exists schema type %s, new schema type %s",
+ existingSchemaData.getType(), newSchema.getType()));
+ }
+ if (!newHash.equals(existingHash)) {
+ compatibilityChecks.getOrDefault(newSchema.getType(),
SchemaCompatibilityCheck.DEFAULT)
+ .checkCompatible(existingSchemaData, newSchema, strategy);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
new file mode 100644
index 0000000..c24822f
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.schema.compatibility;
+
+import com.google.common.collect.Sets;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.schema.Schemas;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+
+
+public class SchemaTypeCompatibilityCheckTest extends
MockedPulsarServiceBaseTest {
+ private final static String CLUSTER_NAME = "test";
+ private final static String PUBLIC_TENANT = "public";
+ private final static String namespace = "test-namespace";
+ private final static String namespaceName = PUBLIC_TENANT + "/" +
namespace;
+
+ @BeforeClass
+ @Override
+ public void setup() throws Exception {
+ super.internalSetup();
+
+ // Setup namespaces
+ admin.clusters().createCluster(CLUSTER_NAME, new
ClusterData(pulsar.getBrokerServiceUrl()));
+
+ TenantInfo tenantInfo = new TenantInfo();
+ tenantInfo.setAllowedClusters(Collections.singleton(CLUSTER_NAME));
+ admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo);
+ admin.namespaces().createNamespace(namespaceName,
Sets.newHashSet(CLUSTER_NAME));
+
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void structTypeProducerProducerUndefinedCompatible() throws
Exception {
+ admin.namespaces().setSchemaCompatibilityStrategy(namespaceName,
SchemaCompatibilityStrategy.UNDEFINED);
+
+ String topicName = TopicName.get(
+ TopicDomain.persistent.value(),
+ PUBLIC_TENANT,
+ namespace,
+ "structTypeProducerProducerUndefinedCompatible"
+ ).toString();
+
+ pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
+ .topic(topicName)
+ .create();
+
+ ProducerBuilder producerBuilder =
pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+ .topic(topicName);
+
+ Throwable t =
expectThrows(PulsarClientException.IncompatibleSchemaException.class,
producerBuilder::create);
+ assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema
type JSON, new schema type AVRO"));
+ }
+
+ @Test
+ public void structTypeProducerConsumerUndefinedCompatible() throws
Exception {
+ admin.namespaces().setSchemaCompatibilityStrategy(namespaceName,
SchemaCompatibilityStrategy.UNDEFINED);
+
+ final String subName = "my-sub";
+ String topicName = TopicName.get(
+ TopicDomain.persistent.value(),
+ PUBLIC_TENANT,
+ namespace,
+ "structTypeProducerConsumerUndefinedCompatible"
+ ).toString();
+
+ pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
+ .topic(topicName)
+ .create();
+
+ ConsumerBuilder consumerBuilder =
pulsarClient.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .subscriptionName(subName);
+
+ Throwable t =
expectThrows(PulsarClientException.IncompatibleSchemaException.class,
consumerBuilder::subscribe);
+ assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema
type JSON, new schema type AVRO"));
+ }
+
+ @Test
+ public void structTypeConsumerProducerUndefinedCompatible() throws
Exception {
+ admin.namespaces().setSchemaCompatibilityStrategy(namespaceName,
SchemaCompatibilityStrategy.UNDEFINED);
+
+ final String subName = "my-sub";
+ String topicName = TopicName.get(
+ TopicDomain.persistent.value(),
+ PUBLIC_TENANT,
+ namespace,
+ "structTypeConsumerProducerUndefinedCompatible"
+ ).toString();
+
+ pulsarClient.newConsumer(Schema.JSON(Schemas.PersonOne.class))
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .subscriptionName(subName)
+ .subscribe();
+
+ ProducerBuilder producerBuilder =
pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+ .topic(topicName);
+
+ Throwable t =
expectThrows(PulsarClientException.IncompatibleSchemaException.class,
producerBuilder::create);
+ assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema
type JSON, new schema type AVRO"));
+ }
+
+ @Test
+ public void structTypeConsumerConsumerUndefinedCompatible() throws
Exception {
+ admin.namespaces().setSchemaCompatibilityStrategy(namespaceName,
SchemaCompatibilityStrategy.UNDEFINED);
+
+ final String subName = "my-sub";
+ String topicName = TopicName.get(
+ TopicDomain.persistent.value(),
+ PUBLIC_TENANT,
+ namespace,
+ "structTypeConsumerConsumerUndefinedCompatible"
+ ).toString();
+
+ pulsarClient.newConsumer(Schema.JSON(Schemas.PersonOne.class))
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .subscriptionName(subName + "1")
+ .subscribe();
+
+ ConsumerBuilder consumerBuilder =
pulsarClient.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .subscriptionName(subName + "2");
+
+ Throwable t =
expectThrows(PulsarClientException.IncompatibleSchemaException.class,
consumerBuilder::subscribe);
+ assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema
type JSON, new schema type AVRO"));
+ }
+
+ @Test
+ public void structTypeProducerProducerAlwaysCompatible() throws Exception {
+ admin.namespaces().setSchemaCompatibilityStrategy(namespaceName,
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+ String topicName = TopicName.get(
+ TopicDomain.persistent.value(),
+ PUBLIC_TENANT,
+ namespace,
+ "structTypeProducerProducerAlwaysCompatible"
+ ).toString();
+
+ pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
+ .topic(topicName)
+ .create();
+
+ pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+ .topic(topicName)
+ .create();
+ }
+
+ @Test
+ public void structTypeProducerConsumerAlwaysCompatible() throws Exception {
+ admin.namespaces().setSchemaCompatibilityStrategy(namespaceName,
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+ final String subName = "my-sub";
+ String topicName = TopicName.get(
+ TopicDomain.persistent.value(),
+ PUBLIC_TENANT,
+ namespace,
+ "structTypeProducerConsumerAlwaysCompatible"
+ ).toString();
+
+ pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
+ .topic(topicName)
+ .create();
+
+ pulsarClient.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .subscriptionName(subName)
+ .subscribe();
+ }
+
+ @Test
+ public void structTypeConsumerProducerAlwaysCompatible() throws Exception {
+ admin.namespaces().setSchemaCompatibilityStrategy(namespaceName,
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+ final String subName = "my-sub";
+ String topicName = TopicName.get(
+ TopicDomain.persistent.value(),
+ PUBLIC_TENANT,
+ namespace,
+ "structTypeConsumerProducerAlwaysCompatible"
+ ).toString();
+
+ pulsarClient.newConsumer(Schema.JSON(Schemas.PersonOne.class))
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .subscriptionName(subName)
+ .subscribe();
+
+ pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+ .topic(topicName)
+ .create();
+ }
+
+ @Test
+ public void structTypeConsumerConsumerAlwaysCompatible() throws Exception {
+ admin.namespaces().setSchemaCompatibilityStrategy(namespaceName,
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+ final String subName = "my-sub";
+ String topicName = TopicName.get(
+ TopicDomain.persistent.value(),
+ PUBLIC_TENANT,
+ namespace,
+ "structTypeConsumerConsumerAlwaysCompatible"
+ ).toString();
+
+ pulsarClient.newConsumer(Schema.JSON(Schemas.PersonOne.class))
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .subscriptionName(subName + "1")
+ .subscribe();
+
+ pulsarClient.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .subscriptionName(subName + "2")
+ .subscribe();
+ }
+
+ @Test
+ public void primitiveTypeProducerProducerUndefinedCompatible() throws
Exception {
+ admin.namespaces().setSchemaCompatibilityStrategy(namespaceName,
SchemaCompatibilityStrategy.UNDEFINED);
+
+ String topicName = TopicName.get(
+ TopicDomain.persistent.value(),
+ PUBLIC_TENANT,
+ namespace,
+ "primitiveTypeProducerProducerUndefinedCompatible"
+ ).toString();
+
+ pulsarClient.newProducer(Schema.INT32)
+ .topic(topicName)
+ .create();
+
+ ProducerBuilder producerBuilder =
pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName);
+
+ Throwable t =
expectThrows(PulsarClientException.IncompatibleSchemaException.class,
producerBuilder::create);
+ assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema
type INT32, new schema type STRING"));
+ }
+
+ @Test
+ public void primitiveTypeProducerConsumerUndefinedCompatible() throws
Exception {
+ admin.namespaces().setSchemaCompatibilityStrategy(namespaceName,
SchemaCompatibilityStrategy.UNDEFINED);
+
+ final String subName = "my-sub";
+ String topicName = TopicName.get(
+ TopicDomain.persistent.value(),
+ PUBLIC_TENANT,
+ namespace,
+ "primitiveTypeProducerConsumerUndefinedCompatible"
+ ).toString();
+
+ pulsarClient.newProducer(Schema.INT32)
+ .topic(topicName)
+ .create();
+
+ ConsumerBuilder consumerBuilder =
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .subscriptionName(subName);
+
+ Throwable t =
expectThrows(PulsarClientException.IncompatibleSchemaException.class,
consumerBuilder::subscribe);
+ assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema
type INT32, new schema type STRING"));
+ }
+
+ @Test
+ public void primitiveTypeConsumerProducerUndefinedCompatible() throws
Exception {
+ admin.namespaces().setSchemaCompatibilityStrategy(namespaceName,
SchemaCompatibilityStrategy.UNDEFINED);
+
+ final String subName = "my-sub";
+ String topicName = TopicName.get(
+ TopicDomain.persistent.value(),
+ PUBLIC_TENANT,
+ namespace,
+ "primitiveTypeConsumerProducerUndefinedCompatible"
+ ).toString();
+
+ pulsarClient.newConsumer(Schema.INT32)
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .subscriptionName(subName)
+ .subscribe();
+
+ ProducerBuilder producerBuilder =
pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName);
+
+ Throwable t =
expectThrows(PulsarClientException.IncompatibleSchemaException.class,
producerBuilder::create);
+ assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema
type INT32, new schema type STRING"));
+ }
+
+ @Test
+ public void primitiveTypeConsumerConsumerUndefinedCompatible() throws
Exception {
+ admin.namespaces().setSchemaCompatibilityStrategy(namespaceName,
SchemaCompatibilityStrategy.UNDEFINED);
+
+ final String subName = "my-sub";
+ String topicName = TopicName.get(
+ TopicDomain.persistent.value(),
+ PUBLIC_TENANT,
+ namespace,
+ "primitiveTypeConsumerConsumerUndefinedCompatible"
+ ).toString();
+
+ pulsarClient.newConsumer(Schema.INT32)
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .subscriptionName(subName + "1")
+ .subscribe();
+
+ ConsumerBuilder consumerBuilder =
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .subscriptionName(subName + "2");
+
+ Throwable t =
expectThrows(PulsarClientException.IncompatibleSchemaException.class,
consumerBuilder::subscribe);
+ assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema
type INT32, new schema type STRING"));
+ }
+
+ @Test
+ public void primitiveTypeProducerProducerAlwaysCompatible() throws
Exception {
+ admin.namespaces().setSchemaCompatibilityStrategy(namespaceName,
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+ String topicName = TopicName.get(
+ TopicDomain.persistent.value(),
+ PUBLIC_TENANT,
+ namespace,
+ "primitiveTypeProducerProducerAlwaysCompatible"
+ ).toString();
+
+ pulsarClient.newProducer(Schema.INT32)
+ .topic(topicName)
+ .create();
+
+ pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .create();
+ }
+
+ @Test
+ public void primitiveTypeProducerConsumerAlwaysCompatible() throws
Exception {
+ admin.namespaces().setSchemaCompatibilityStrategy(namespaceName,
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+ final String subName = "my-sub";
+ String topicName = TopicName.get(
+ TopicDomain.persistent.value(),
+ PUBLIC_TENANT,
+ namespace,
+ "primitiveTypeProducerConsumerAlwaysCompatible"
+ ).toString();
+
+ pulsarClient.newProducer(Schema.INT32)
+ .topic(topicName)
+ .create();
+
+ pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .subscriptionName(subName + "2")
+ .subscribe();
+ }
+
+ @Test
+ public void primitiveTypeConsumerProducerAlwaysCompatible() throws
Exception {
+ admin.namespaces().setSchemaCompatibilityStrategy(namespaceName,
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+ final String subName = "my-sub";
+ String topicName = TopicName.get(
+ TopicDomain.persistent.value(),
+ PUBLIC_TENANT,
+ namespace,
+ "primitiveTypeConsumerProducerAlwaysCompatible"
+ ).toString();
+
+ pulsarClient.newConsumer(Schema.INT32)
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .subscriptionName(subName)
+ .subscribe();
+
+ pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .create();
+ }
+
+ @Test
+ public void primitiveTypeConsumerConsumerAlwaysCompatible() throws
Exception {
+ admin.namespaces().setSchemaCompatibilityStrategy(namespaceName,
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+ final String subName = "my-sub";
+ String topicName = TopicName.get(
+ TopicDomain.persistent.value(),
+ PUBLIC_TENANT,
+ namespace,
+ "primitiveTypeConsumerConsumerAlwaysCompatible"
+ ).toString();
+
+ pulsarClient.newConsumer(Schema.INT32)
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .subscriptionName(subName + "1")
+ .subscribe();
+
+ pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .subscriptionName(subName + "2")
+ .subscribe();
+ }
+
+}