This is an automated email from the ASF dual-hosted git repository.
urfree pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git
The following commit(s) were added to refs/heads/main by this push:
new 4deabb6da34 Docs sync done from apache/pulsar(#e6f4aae)
4deabb6da34 is described below
commit 4deabb6da34e5413e4f420a71aa9f1af0ca91def
Author: Pulsar Site Updater <[email protected]>
AuthorDate: Sat Nov 5 06:02:03 2022 +0000
Docs sync done from apache/pulsar(#e6f4aae)
---
site2/website-next/docs/admin-api-schemas.md | 605 +++++++++++++++-
site2/website-next/docs/client-libraries-cpp.md | 78 +--
site2/website-next/docs/client-libraries-java.md | 91 +--
.../docs/schema-evolution-compatibility.md | 124 ++--
site2/website-next/docs/schema-get-started.md | 472 +++++++++++--
site2/website-next/docs/schema-manage.md | 766 +--------------------
site2/website-next/docs/schema-overview.md | 154 +++++
site2/website-next/docs/schema-understand.md | 497 +++----------
site2/website-next/sidebars.json | 7 +-
9 files changed, 1318 insertions(+), 1476 deletions(-)
diff --git a/site2/website-next/docs/admin-api-schemas.md
b/site2/website-next/docs/admin-api-schemas.md
index 8399a03b872..15f11f4a91b 100644
--- a/site2/website-next/docs/admin-api-schemas.md
+++ b/site2/website-next/docs/admin-api-schemas.md
@@ -1,6 +1,609 @@
---
id: admin-api-schemas
-title: Managing Schemas
+title: Manage Schemas
sidebar_label: "Schemas"
---
+
+````mdx-code-block
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+````
+
+:::tip
+
+This page only shows **some frequently used operations**.
+
+- For the latest and complete information about `Pulsar admin`, including
commands, flags, descriptions, and more, see [Pulsar admin
doc](/tools/pulsar-admin/)
+
+- For the latest and complete information about `REST API`, including
parameters, responses, samples, and more, see {@inject: rest:REST:/} API doc.
+
+- For the latest and complete information about `Java admin API`, including
classes, methods, descriptions, and more, see [Java admin API doc](/api/admin/).
+
+:::
+
+## Manage AutoUpdate strategy
+
+### Enable AutoUpdate
+
+To enable `AutoUpdate` on a namespace, you can use the `pulsar-admin` command.
+
+```bash
+bin/pulsar-admin namespaces set-is-allow-auto-update-schema --enable
tenant/namespace
+```
+
+### Disable AutoUpdate
+
+To disable `AutoUpdate` on a namespace, you can use the `pulsar-admin` command.
+
+```bash
+bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable
tenant/namespace
+```
+
+Once the `AutoUpdate` is disabled, you can only register a new schema using
the `pulsar-admin` command.
+
+### Adjust compatibility
+
+To adjust the schema compatibility level on a namespace, you can use the
`pulsar-admin` command.
+
+```bash
+bin/pulsar-admin namespaces set-schema-compatibility-strategy --compatibility
<compatibility-level> tenant/namespace
+```
+
+## Schema validation
+
+### Enable schema validation
+
+To enable `schemaValidationEnforced` on a namespace, you can use the
`pulsar-admin` command.
+
+```bash
+bin/pulsar-admin namespaces set-schema-validation-enforce --enable
tenant/namespace
+```
+
+### Disable schema validation
+
+To disable `schemaValidationEnforced` on a namespace, you can use the
`pulsar-admin` command.
+
+```bash
+bin/pulsar-admin namespaces set-schema-validation-enforce --disable
tenant/namespace
+```
+
+## Schema manual management
+
+### Upload a schema
+
+To upload (register) a new schema for a topic, you can use one of the
following methods.
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+ defaultValue="Admin CLI"
+ values={[{"label":"Admin CLI","value":"Admin CLI"},{"label":"REST
API","value":"REST API"},{"label":"Java Admin API","value":"Java Admin API"}]}>
+
+<TabItem value="Admin CLI">
+
+Use the `upload` subcommand.
+
+```bash
+pulsar-admin schemas upload --filename <schema-definition-file> <topic-name>
+```
+
+The `schema-definition-file` is in JSON format.
+
+```json
+{
+ "type": "<schema-type>",
+ "schema": "<an-utf8-encoded-string-of-schema-definition-data>",
+ "properties": {} // the properties associated with the schema
+}
+```
+
+The `schema-definition-file` includes the following fields:
+
+| Field | Description |
+| --- | --- |
+| `type` | The schema type. |
+| `schema` | The schema definition data, which is encoded in UTF 8
charset. <li>If the schema is a **primitive** schema, this field should be
blank. </li><li>If the schema is a **struct** schema, this field should be a
JSON string of the Avro schema definition. </li> |
+| `properties` | The additional properties associated with the schema. |
+
+Here are examples of the `schema-definition-file` for a JSON schema.
+
+**Example 1**
+
+```json
+{
+ "type": "JSON",
+ "schema":
"{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"file1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file3\",\"type\":[\"string\",\"null\"],\"default\":\"dfdf\"}]}",
+ "properties": {}
+}
+```
+
+**Example 2**
+
+```json
+{
+ "type": "STRING",
+ "schema": "",
+ "properties": {
+ "key1": "value1"
+ }
+}
+```
+
+</TabItem>
+<TabItem value="REST API">
+
+Send a `POST` request to this endpoint: {@inject:
endpoint|POST|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/uploadSchema?version=@pulsar:version_number@}
+
+The post payload is in JSON format.
+
+```json
+{
+ "type": "<schema-type>",
+ "schema": "<an-utf8-encoded-string-of-schema-definition-data>",
+ "properties": {} // the properties associated with the schema
+}
+```
+
+The post payload includes the following fields:
+
+| Field | Description |
+| --- | --- |
+| `type` | The schema type. |
+| `schema` | The schema definition data, which is encoded in UTF 8
charset. <li>If the schema is a **primitive** schema, this field should be
blank. </li><li>If the schema is a **struct** schema, this field should be a
JSON string of the Avro schema definition. </li> |
+| `properties` | The additional properties associated with the schema. |
+
+</TabItem>
+<TabItem value="Java Admin API">
+
+```java
+void createSchema(String topic, PostSchemaPayload schemaPayload)
+```
+
+The `PostSchemaPayload` includes the following fields:
+
+| Field | Description |
+| --- | --- |
+| `type` | The schema type. |
+| `schema` | The schema definition data, which is encoded in UTF 8
charset. <li>If the schema is a **primitive** schema, this field should be
blank. </li><li>If the schema is a **struct** schema, this field should be a
JSON string of the Avro schema definition. </li> |
+| `properties` | The additional properties associated with the schema. |
+
+Here is an example of `PostSchemaPayload`:
+
+```java
+PulsarAdmin admin = …;
+
+PostSchemaPayload payload = new PostSchemaPayload();
+payload.setType("INT8");
+payload.setSchema("");
+
+admin.createSchema("my-tenant/my-ns/my-topic", payload);
+```
+
+</TabItem>
+
+</Tabs>
+````
+
+### Get a schema (latest)
+
+To get the latest schema for a topic, you can use one of the following
methods.
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+ defaultValue="Admin CLI"
+ values={[{"label":"Admin CLI","value":"Admin CLI"},{"label":"REST
API","value":"REST API"},{"label":"Java Admin API","value":"Java Admin API"}]}>
+
+<TabItem value="Admin CLI">
+
+Use the `get` subcommand.
+
+```bash
+pulsar-admin schemas get <topic-name>
+
+{
+ "version": 0,
+ "type": "String",
+ "timestamp": 0,
+ "data": "string",
+ "properties": {
+ "property1": "string",
+ "property2": "string"
+ }
+}
+```
+
+</TabItem>
+<TabItem value="REST API">
+
+Send a `GET` request to this endpoint: {@inject:
endpoint|GET|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/getSchema?version=@pulsar:version_number@}
+
+Here is an example of a response, which is returned in JSON format.
+
+```json
+{
+ "version": "<the-version-number-of-the-schema>",
+ "type": "<the-schema-type>",
+ "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>",
+ "data": "<an-utf8-encoded-string-of-schema-definition-data>",
+ "properties": {} // the properties associated with the schema
+}
+```
+
+The response includes the following fields:
+
+| Field | Description |
+| --- | --- |
+| `version` | The schema version, which is a long number. |
+| `type` | The schema type. |
+| `timestamp` | The timestamp of creating this version of schema. |
+| `data` | The schema definition data, which is encoded in UTF 8 charset.
<li>If the schema is a **primitive** schema, this field should be blank.
</li><li>If the schema is a **struct** schema, this field should be a JSON
string of the Avro schema definition. </li> |
+| `properties` | The additional properties associated with the schema. |
+
+</TabItem>
+<TabItem value="Java Admin API">
+
+```java
+SchemaInfo createSchema(String topic)
+```
+
+The `SchemaInfo` includes the following fields:
+
+| Field | Description |
+| --- | --- |
+| `name` | The schema name. |
+| `type` | The schema type. |
+| `schema` | A byte array of the schema definition data, which is encoded
in UTF 8 charset. <li>If the schema is a **primitive** schema, this byte array
should be empty. </li><li>If the schema is a **struct** schema, this field
should be a JSON string of the Avro schema definition converted to a byte
array. </li> |
+| `properties` | The additional properties associated with the schema. |
+
+Here is an example of `SchemaInfo`:
+
+```java
+PulsarAdmin admin = …;
+
+SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic");
+```
+
+</TabItem>
+
+</Tabs>
+````
+
+### Get a schema (specific)
+
+To get a specific version of a schema, you can use one of the following
methods.
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+ defaultValue="Admin CLI"
+ values={[{"label":"Admin CLI","value":"Admin CLI"},{"label":"REST
API","value":"REST API"},{"label":"Java Admin API","value":"Java Admin API"}]}>
+
+<TabItem value="Admin CLI">
+
+Use the `get` subcommand.
+
+```bash
+pulsar-admin schemas get <topic-name> --version=<version>
+```
+
+</TabItem>
+<TabItem value="REST API">
+
+Send a `GET` request to a schema endpoint: {@inject:
endpoint|GET|/admin/v2/schemas/:tenant/:namespace/:topic/schema/:version|operation/getSchema?version=@pulsar:version_number@}
+
+Here is an example of a response, which is returned in JSON format.
+
+```json
+{
+ "version": "<the-version-number-of-the-schema>",
+ "type": "<the-schema-type>",
+ "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>",
+ "data": "<an-utf8-encoded-string-of-schema-definition-data>",
+ "properties": {} // the properties associated with the schema
+}
+```
+
+The response includes the following fields:
+
+| Field | Description |
+| --- | --- |
+| `version` | The schema version, which is a long number. |
+| `type` | The schema type. |
+| `timestamp` | The timestamp of creating this version of schema. |
+| `data` | The schema definition data, which is encoded in UTF 8 charset.
<li>If the schema is a **primitive** schema, this field should be blank.
</li><li>If the schema is a **struct** schema, this field should be a JSON
string of the Avro schema definition. </li> |
+| `properties` | The additional properties associated with the schema. |
+
+</TabItem>
+<TabItem value="Java Admin API">
+
+```java
+SchemaInfo createSchema(String topic, long version)
+```
+
+The `SchemaInfo` includes the following fields:
+
+| Field | Description |
+| --- | --- |
+| `name` | The schema name. |
+| `type` | The schema type. |
+| `schema` | A byte array of the schema definition data, which is encoded
in UTF 8. <li>If the schema is a **primitive** schema, this byte array should
be empty. </li><li>If the schema is a **struct** schema, this field should be a
JSON string of the Avro schema definition converted to a byte array. </li> |
+| `properties` | The additional properties associated with the schema. |
+
+Here is an example of `SchemaInfo`:
+
+```java
+PulsarAdmin admin = …;
+
+SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic", 1L);
+```
+
+</TabItem>
+
+</Tabs>
+````
+
+### Extract a schema
+
+To provide a schema via a topic, you can use the following method.
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+ defaultValue="Admin CLI"
+ values={[{"label":"Admin CLI","value":"Admin CLI"}]}>
+
+<TabItem value="Admin CLI">
+
+Use the `extract` subcommand.
+
+```bash
+pulsar-admin schemas extract --classname <class-name> --jar <jar-path> --type
<type-name>
+```
+
+</TabItem>
+
+</Tabs>
+````
+
+### Delete a schema
+
+To delete a schema for a topic, you can use one of the following methods.
+
+:::note
+
+In any case, the **delete** action deletes **all versions** of a schema
registered for a topic.
+
+:::
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+ defaultValue="Admin CLI"
+ values={[{"label":"Admin CLI","value":"Admin CLI"},{"label":"REST
API","value":"REST API"},{"label":"Java Admin API","value":"Java Admin API"}]}>
+
+<TabItem value="Admin CLI">
+
+Use the `delete` subcommand.
+
+```bash
+pulsar-admin schemas delete <topic-name>
+```
+
+</TabItem>
+<TabItem value="REST API">
+
+Send a `DELETE` request to a schema endpoint: {@inject:
endpoint|DELETE|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/deleteSchema?version=@pulsar:version_number@}
+
+Here is an example of a response, which is returned in JSON format.
+
+```json
+{
+ "version": "<the-latest-version-number-of-the-schema>",
+}
+```
+
+The response includes the following field:
+
+Field | Description |
+---|---|
+`version` | The schema version, which is a long number. |
+
+</TabItem>
+<TabItem value="Java Admin API">
+
+```java
+void deleteSchema(String topic)
+```
+
+Here is an example of deleting a schema.
+
+```java
+PulsarAdmin admin = …;
+
+admin.deleteSchema("my-tenant/my-ns/my-topic");
+```
+
+</TabItem>
+
+</Tabs>
+````
+
+## Set schema compatibility check strategy
+
+You can set [schema compatibility check
strategy](schema-evolution-compatibility.md#schema-compatibility-check-strategy)
at the topic, namespace or broker level.
+
+The schema compatibility check strategy set at different levels has priority:
topic level > namespace level > broker level.
+
+- If you set the strategy at both topic and namespace levels, it uses the
topic-level strategy.
+
+- If you set the strategy at both namespace and broker levels, it uses the
namespace-level strategy.
+
+- If you do not set the strategy at any level, it uses the `FULL` strategy.
For all available values, see
[here](schema-evolution-compatibility.md#schema-compatibility-check-strategy).
+
+
+### Topic level
+
+To set a schema compatibility check strategy at the topic level, use one of
the following methods.
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+ defaultValue="Admin CLI"
+ values={[{"label":"Admin CLI","value":"Admin CLI"},{"label":"REST
API","value":"REST API"},{"label":"Java Admin API","value":"Java Admin API"}]}>
+
+<TabItem value="Admin CLI">
+
+Use the [`pulsar-admin topicPolicies
set-schema-compatibility-strategy`](/tools/pulsar-admin/) command.
+
+```shell
+pulsar-admin topicPolicies set-schema-compatibility-strategy <strategy>
<topicName>
+```
+
+</TabItem>
+<TabItem value="REST API">
+
+Send a `PUT` request to this endpoint: {@inject:
endpoint|PUT|/admin/v2/topics/:tenant/:namespace/:topic|operation/schemaCompatibilityStrategy?version=@pulsar:version_number@}
+
+</TabItem>
+<TabItem value="Java Admin API">
+
+```java
+void setSchemaCompatibilityStrategy(String topic, SchemaCompatibilityStrategy
strategy)
+```
+
+Here is an example of setting a schema compatibility check strategy at the
topic level.
+
+```java
+PulsarAdmin admin = …;
+
+admin.topicPolicies().setSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic",
SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
+```
+
+</TabItem>
+
+</Tabs>
+````
+
+To get the topic-level schema compatibility check strategy, use one of the
following methods.
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+ defaultValue="Admin CLI"
+ values={[{"label":"Admin CLI","value":"Admin CLI"},{"label":"REST
API","value":"REST API"},{"label":"Java Admin API","value":"Java Admin API"}]}>
+
+<TabItem value="Admin CLI">
+
+Use the [`pulsar-admin topicPolicies
get-schema-compatibility-strategy`](/tools/pulsar-admin/) command.
+
+```shell
+pulsar-admin topicPolicies get-schema-compatibility-strategy <topicName>
+```
+
+</TabItem>
+<TabItem value="REST API">
+
+Send a `GET` request to this endpoint: {@inject:
endpoint|GET|/admin/v2/topics/:tenant/:namespace/:topic|operation/schemaCompatibilityStrategy?version=@pulsar:version_number@}
+
+</TabItem>
+<TabItem value="Java Admin API">
+
+```java
+SchemaCompatibilityStrategy getSchemaCompatibilityStrategy(String topic,
boolean applied)
+```
+
+Here is an example of getting the topic-level schema compatibility check
strategy.
+
+```java
+PulsarAdmin admin = …;
+
+// get the current applied schema compatibility strategy
+admin.topicPolicies().getSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic",
true);
+
+// only get the schema compatibility strategy from topic policies
+admin.topicPolicies().getSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic",
false);
+```
+
+</TabItem>
+
+</Tabs>
+````
+
+To remove the topic-level schema compatibility check strategy, use one of the
following methods.
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+ defaultValue="Admin CLI"
+ values={[{"label":"Admin CLI","value":"Admin CLI"},{"label":"REST
API","value":"REST API"},{"label":"Java Admin API","value":"Java Admin API"}]}>
+
+<TabItem value="Admin CLI">
+
+Use the [`pulsar-admin topicPolicies
remove-schema-compatibility-strategy`](/tools/pulsar-admin/) command.
+
+```shell
+pulsar-admin topicPolicies remove-schema-compatibility-strategy <topicName>
+```
+
+</TabItem>
+<TabItem value="REST API">
+
+Send a `DELETE` request to this endpoint: {@inject:
endpoint|DELETE|/admin/v2/topics/:tenant/:namespace/:topic|operation/schemaCompatibilityStrategy?version=@pulsar:version_number@}
+
+</TabItem>
+<TabItem value="Java Admin API">
+
+```java
+void removeSchemaCompatibilityStrategy(String topic)
+```
+
+Here is an example of removing the topic-level schema compatibility check
strategy.
+
+```java
+PulsarAdmin admin = …;
+
+admin.removeSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic");
+```
+
+</TabItem>
+
+</Tabs>
+````
+
+### Namespace level
+
+You can set schema compatibility check strategy at namespace level using one
of the following methods.
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+ defaultValue="Admin CLI"
+ values={[{"label":"Admin CLI","value":"Admin CLI"},{"label":"REST
API","value":"REST API"},{"label":"Java Admin API","value":"Java Admin API"}]}>
+
+<TabItem value="Admin CLI">
+
+Use the [`pulsar-admin namespaces
set-schema-compatibility-strategy`](/tools/pulsar-admin/) command.
+
+```shell
+pulsar-admin namespaces set-schema-compatibility-strategy options
+```
+
+</TabItem>
+<TabItem value="REST API">
+
+Send a `PUT` request to this endpoint: {@inject:
endpoint|PUT|/admin/v2/namespaces/:tenant/:namespace|operation/schemaCompatibilityStrategy?version=@pulsar:version_number@}
+
+</TabItem>
+<TabItem value="Java Admin API">
+
+Use the [`setSchemaCompatibilityStrategy`](/api/admin/) method.
+
+```java
+admin.namespaces().setSchemaCompatibilityStrategy("test",
SchemaCompatibilityStrategy.FULL);
+```
+
+</TabItem>
+
+</Tabs>
+````
+
+### Broker level
+
+You can set schema compatibility check strategy at broker level by setting
`schemaCompatibilityStrategy` in `conf/broker.conf` or `conf/standalone.conf`
file.
+
+```conf
+schemaCompatibilityStrategy=ALWAYS_INCOMPATIBLE
+```
\ No newline at end of file
diff --git a/site2/website-next/docs/client-libraries-cpp.md
b/site2/website-next/docs/client-libraries-cpp.md
index 7f56d3cc727..7547487d10d 100644
--- a/site2/website-next/docs/client-libraries-cpp.md
+++ b/site2/website-next/docs/client-libraries-cpp.md
@@ -412,80 +412,4 @@ For complete examples, refer to [C++ client
examples](https://github.com/apache/
## Schema
-This section describes some examples about schema. For more information about
schema, see [Pulsar schema](schema-get-started.md).
-
-### Avro schema
-
-- The following example shows how to create a producer with an Avro schema.
-
- ```cpp
- static const std::string exampleSchema =
- "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
-
"\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
- Producer producer;
- ProducerConfiguration producerConf;
- producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
- client.createProducer("topic-avro", producerConf, producer);
- ```
-
-- The following example shows how to create a consumer with an Avro schema.
-
- ```cpp
- static const std::string exampleSchema =
- "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
-
"\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
- ConsumerConfiguration consumerConf;
- Consumer consumer;
- consumerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
- client.subscribe("topic-avro", "sub-2", consumerConf, consumer)
- ```
-
-### ProtobufNative schema
-
-The following example shows how to create a producer and a consumer with a
ProtobufNative schema.
-
-1. Generate the `User` class using Protobuf3 or later versions.
-
- ```protobuf
- syntax = "proto3";
-
- message User {
- string name = 1;
- int32 age = 2;
- }
- ```
-
-2. Include the `ProtobufNativeSchema.h` in your source code. Ensure the
Protobuf dependency has been added to your project.
-
- ```cpp
- #include <pulsar/ProtobufNativeSchema.h>
- ```
-
-3. Create a producer to send a `User` instance.
-
- ```cpp
- ProducerConfiguration producerConf;
- producerConf.setSchema(createProtobufNativeSchema(User::GetDescriptor()));
- Producer producer;
- client.createProducer("topic-protobuf", producerConf, producer);
- User user;
- user.set_name("my-name");
- user.set_age(10);
- std::string content;
- user.SerializeToString(&content);
- producer.send(MessageBuilder().setContent(content).build());
- ```
-
-4. Create a consumer to receive a `User` instance.
-
- ```cpp
- ConsumerConfiguration consumerConf;
- consumerConf.setSchema(createProtobufNativeSchema(User::GetDescriptor()));
- consumerConf.setSubscriptionInitialPosition(InitialPositionEarliest);
- Consumer consumer;
- client.subscribe("topic-protobuf", "my-sub", consumerConf, consumer);
- Message msg;
- consumer.receive(msg);
- User user2;
- user2.ParseFromArray(msg.getData(), msg.getLength());
- ```
+To work with [Pulsar schema](schema-overview.md) using C++ clients, see
[Schema - Get started](schema-get-started.md). For specific schema types that
C++ clients support, see
[code](https://github.com/apache/pulsar-client-cpp/blob/main/include/pulsar/Schema.h).
\ No newline at end of file
diff --git a/site2/website-next/docs/client-libraries-java.md
b/site2/website-next/docs/client-libraries-java.md
index e36a3957b1a..35339c776c2 100644
--- a/site2/website-next/docs/client-libraries-java.md
+++ b/site2/website-next/docs/client-libraries-java.md
@@ -1158,7 +1158,7 @@ tv.forEach((key, value) -> /*operations on all existing
messages*/)
## Schema
-In Pulsar, all message data consists of byte arrays "under the hood." [Message
schemas](schema-get-started.md) enable you to use other types of data when
constructing and handling messages (from simple types like strings to more
complex, application-specific types). If you construct, say, a
[producer](#producer) without specifying a schema, then the producer can only
produce messages of type `byte[]`. The following is an example.
+In Pulsar, all message data consists of byte arrays "under the hood." [Message
schemas](schema-overview.md) enable you to use other types of data when
constructing and handling messages (from simple types like strings to more
complex, application-specific types). If you construct, say, a
[producer](#producer) without specifying a schema, then the producer can only
produce messages of type `byte[]`. The following is an example.
```java
Producer<byte[]> producer = client.newProducer()
@@ -1166,95 +1166,8 @@ Producer<byte[]> producer = client.newProducer()
.create();
```
-The producer above is equivalent to a `Producer<byte[]>` (in fact, you should
*always* explicitly specify the type). If you'd like to use a producer for a
different type of data, you'll need to specify a **schema** that informs Pulsar
which data type will be transmitted over the
[topic](reference-terminology.md#topic).
+The producer above is equivalent to a `Producer<byte[]>` (in fact, you should
*always* explicitly specify the type). If you'd like to use a producer for a
different type of data, you need to specify a **schema** that informs Pulsar
which data type will be transmitted over the topic. For more examples, see
[Schema - Get started](schema-get-started.md).
-### AvroBaseStructSchema example
-
-Let's say that you have a `SensorReading` class that you'd like to transmit
over a Pulsar topic:
-
-```java
-public class SensorReading {
- public float temperature;
-
- public SensorReading(float temperature) {
- this.temperature = temperature;
- }
-
- // A no-arg constructor is required
- public SensorReading() {
- }
-
- public float getTemperature() {
- return temperature;
- }
-
- public void setTemperature(float temperature) {
- this.temperature = temperature;
- }
-}
-```
-
-You could then create a `Producer<SensorReading>` (or
`Consumer<SensorReading>`) like this:
-
-```java
-Producer<SensorReading> producer =
client.newProducer(JSONSchema.of(SensorReading.class))
- .topic("sensor-readings")
- .create();
-```
-
-The following schema formats are currently available for Java:
-
-* No schema or the byte array schema (which can be applied using
`Schema.BYTES`):
-
- ```java
- Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES)
- .topic("some-raw-bytes-topic")
- .create();
- ```
-
- Or, equivalently:
-
- ```java
- Producer<byte[]> bytesProducer = client.newProducer()
- .topic("some-raw-bytes-topic")
- .create();
- ```
-
-* `String` for normal UTF-8-encoded string data. Apply the schema using
`Schema.STRING`:
-
- ```java
- Producer<String> stringProducer = client.newProducer(Schema.STRING)
- .topic("some-string-topic")
- .create();
- ```
-
-* Create JSON schemas for POJOs using `Schema.JSON`. The following is an
example.
-
- ```java
- Producer<MyPojo> pojoProducer = client.newProducer(Schema.JSON(MyPojo.class))
- .topic("some-pojo-topic")
- .create();
- ```
-
-* Generate Protobuf schemas using `Schema.PROTOBUF`. The following example
shows how to create the Protobuf schema and use it to instantiate a new
producer:
-
- ```java
- Producer<MyProtobuf> protobufProducer =
client.newProducer(Schema.PROTOBUF(MyProtobuf.class))
- .topic("some-protobuf-topic")
- .create();
- ```
-
-* Define Avro schemas with `Schema.AVRO`. The following code snippet
demonstrates how to create and use Avro schema.
-
- ```java
- Producer<MyAvro> avroProducer = client.newProducer(Schema.AVRO(MyAvro.class))
- .topic("some-avro-topic")
- .create();
- ```
-
-### ProtobufNativeSchema example
-
-For examples of ProtobufNativeSchema, see [`SchemaDefinition` in `Complex
type`](schema-understand.md#complex-type).
## Authentication
diff --git a/site2/website-next/docs/schema-evolution-compatibility.md
b/site2/website-next/docs/schema-evolution-compatibility.md
index ad3432ba779..438eea638bf 100644
--- a/site2/website-next/docs/schema-evolution-compatibility.md
+++ b/site2/website-next/docs/schema-evolution-compatibility.md
@@ -6,29 +6,21 @@ sidebar_label: "Schema evolution and compatibility"
Normally, schemas do not stay the same over a long period of time. Instead,
they undergo evolutions to satisfy new needs.
-This chapter examines how Pulsar schema evolves and what Pulsar schema
compatibility check strategies are.
+This chapter introduces how Pulsar schema evolves and what compatibility check
strategies it adopts.
## Schema evolution
-Pulsar schema is defined in a data structure called `SchemaInfo`.
-
-Each `SchemaInfo` stored with a topic has a version. The version is used to
manage the schema changes happening within a topic.
-
The message produced with `SchemaInfo` is tagged with a schema version. When a
message is consumed by a Pulsar client, the Pulsar client can use the schema
version to retrieve the corresponding `SchemaInfo` and use the correct schema
information to deserialize data.
-### What is schema evolution?
-
Schemas store the details of attributes and types. To satisfy new business
requirements, you need to update schemas inevitably over time, which is called
**schema evolution**.
Any schema changes affect downstream consumers. Schema evolution ensures that
the downstream consumers can seamlessly handle data encoded with both old
schemas and new schemas.
-### How Pulsar schema should evolve?
-
-The answer is Pulsar schema compatibility check strategy. It determines how
schema compares old schemas with new schemas in topics.
+### How schema evolves?
-For more information, see [Schema compatibility check
strategy](#schema-compatibility-check-strategy).
+The answer is [schema compatibility check
strategy](#schema-compatibility-check-strategy). It determines how schema
compares old schemas with new schemas in topics.
-### How does Pulsar support schema evolution?
+### How Pulsar supports schema evolution?
The process of how Pulsar supports schema evolution is described as follows.
@@ -53,31 +45,31 @@ For more details, see
[`schemaRegistryCompatibilityCheckers`](https://github.com
## Schema compatibility check strategy
-Pulsar has 8 schema compatibility check strategies, which are summarized in
the following table.
+The following table outlines 8 schema compatibility check strategies and how
it works.
-Suppose that you have a topic containing three schemas (V1, V2, and V3), V1 is
the oldest and V3 is the latest:
+Suppose that you have a topic containing three schemas (V1, V2, and V3). V1 is
the oldest and V3 is the latest.
| Compatibility check strategy | Definition | Changes allowed |
Check against which schema | Upgrade first |
| --- | --- | --- | --- | --- |
| `ALWAYS_COMPATIBLE` | Disable schema compatibility check. | All
changes are allowed | All previous versions | Any order |
-| `ALWAYS_INCOMPATIBLE` | Disable schema evolution. | All changes are
disabled | None | None |
-| `BACKWARD` | Consumers using schema V3 can process data written by
producers using schema V3 or V2. | <li>Add optional fields </li><li>Delete
fields </li> | Latest version | Consumers |
-| `BACKWARD_TRANSITIVE` | Consumers using schema V3 can process data
written by producers using schema V3, V2 or V1. | <li>Add optional fields
</li><li>Delete fields </li> | All previous versions | Consumers |
-| `FORWARD` | Consumers using schema V3 or V2 can process data written by
producers using schema V3. | <li>Add fields </li><li>Delete optional fields
</li> | Latest version | Producers |
-| `FORWARD_TRANSITIVE` | Consumers using schema V3, V2, or V1 can process
data written by producers using schema V3. | <li>Add fields </li><li>Delete
optional fields </li> | All previous versions | Producers |
-| `FULL` | Backward and forward compatible between the schema V3 and V2.
| <li>Modify optional fields </li> | Latest version | Any order |
-| `FULL_TRANSITIVE` | Backward and forward compatible among schema V3, V2,
and V1. | <li>Modify optional fields </li> | All previous versions |
Any order |
-
-### ALWAYS_COMPATIBLE and ALWAYS_INCOMPATIBLE
-
-| Compatibility check strategy | Definition | Note |
-| --- | --- | --- |
-| `ALWAYS_COMPATIBLE` | Disable schema compatibility check. | None |
-| `ALWAYS_INCOMPATIBLE` | Disable schema evolution, that is, any schema
change is rejected. | <li>For all schema types except Avro and JSON, the
default schema compatibility check strategy is `ALWAYS_INCOMPATIBLE`.
</li><li>For Avro and JSON, the default schema compatibility check strategy is
`FULL`. </li> |
-
-#### Example
-
-* Example 1
+| `ALWAYS_INCOMPATIBLE` | Disable schema evolution, that is, any schema
change is rejected. | All changes are disabled | None | None |
+| `BACKWARD` | Consumers using schema V3 can process data written by
producers using the **last schema version** V2. | <li>Add optional fields
</li><li>Delete fields </li> | Latest version | Consumers |
+| `BACKWARD_TRANSITIVE` | Consumers using schema V3 can process data
written by producers using **all previous schema versions** V2 and V1. |
<li>Add optional fields </li><li>Delete fields </li> | All previous versions
| Consumers |
+| `FORWARD` | Consumers using the **last schema version** V2 can process
data written by producers using a new schema V3, even though they may not be
able to use the full capabilities of the new schema. | <li>Add fields
</li><li>Delete optional fields </li> | Latest version | Producers |
+| `FORWARD_TRANSITIVE` | Consumers using **all previous schema versions**
V2 or V1 can process data written by producers using a new schema V3. |
<li>Add fields </li><li>Delete optional fields </li> | All previous versions
| Producers |
+| `FULL` | Schemas are both backward and forward compatible. <li>Consumers
using the last schema V2 can process data written by producers using the new
schema V3. </li><li>Consumers using the new schema V3 can process data written
by producers using the last schema V2.</li> | <li>Modify optional fields
</li> | Latest version | Any order |
+| `FULL_TRANSITIVE` | Backward and forward compatible among schema V3, V2,
and V1. <li>Consumers using the schema V3 can process data written by producers
using schema V2 and V1. </li><li>Consumers using the schema V2 or V1 can
process data written by producers using the schema V3.</li> | <li>Modify
optional fields </li> | All previous versions | Any order |
+
+:::tip
+
+* The default schema compatibility check strategy varies depending on schema
types.
+ * For Avro and JSON, the default one is `FULL`.
+ * For others, the default one is `ALWAYS_INCOMPATIBLE`.
+* You can set schema compatibility check strategy at the topic, namespace or
broker level. For how to set the strategy, see
[here](admin-api-schemas.md#set-schema-compatibility-check-strategy).
+
+:::
+
+### ALWAYS_COMPATIBLE example
In some situations, an application needs to store events of several
different types in the same Pulsar topic.
@@ -85,13 +77,11 @@ Suppose that you have a topic containing three schemas (V1,
V2, and V3), V1 is t
For example, for a user entity, there are `userCreated`,
`userAddressChanged` and `userEnquiryReceived` events. The application requires
that those events are always read in the same order.
- Consequently, those events need to go in the same Pulsar partition to
maintain order. This application can use `ALWAYS_COMPATIBLE` to allow different
kinds of events to co-exist in the same topic.
-
-* Example 2
+ Consequently, those events need to go in the same Pulsar partition to
maintain order. This application can use `ALWAYS_COMPATIBLE` to allow different
kinds of events to co-exist on the same topic.
- Sometimes we also make incompatible changes.
+### ALWAYS_INCOMPATIBLE example
- For example, you are modifying a field type from `string` to `int`.
+ Sometimes we also make incompatible changes. For example, you are modifying
a field type from `string` to `int`.
In this case, you need to:
@@ -99,16 +89,7 @@ Suppose that you have a topic containing three schemas (V1,
V2, and V3), V1 is t
* Optionally, create a new topic and start migrating applications to use the
new topic and the new schema, avoiding the need to handle two incompatible
versions in the same topic.
-### BACKWARD and BACKWARD_TRANSITIVE
-
-Suppose that you have a topic containing three schemas (V1, V2, and V3), V1 is
the oldest and V3 is the latest:
-
-| Compatibility check strategy | Definition
| Description
|
-|------------------------------|------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------|
-| `BACKWARD` | Consumers using the new schema can process
data written by producers using the **last schema**. | The consumers using
the schema V3 can process data written by producers using the schema V3 or V2.
|
-| `BACKWARD_TRANSITIVE` | Consumers using the new schema can process
data written by producers using **all previous schemas**. | The consumers using
the schema V3 can process data written by producers using the schema V3, V2, or
V1. |
-
-#### Example
+### BACKWARD and BACKWARD_TRANSITIVE example
* Example 1
@@ -122,17 +103,8 @@ Suppose that you have a topic containing three schemas
(V1, V2, and V3), V1 is t
Same SQL queries must continue to work even if the data is changed. To
support it, you can evolve the schemas using the `BACKWARD` strategy.
-### FORWARD and FORWARD_TRANSITIVE
+### FORWARD and FORWARD_TRANSITIVE example
-Suppose that you have a topic containing three schemas (V1, V2, and V3), V1 is
the oldest and V3 is the latest:
-
-| Compatibility check strategy | Definition
| Description
|
-|------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------|
-| `FORWARD` | Consumers using the **last schema** can
process data written by producers using a new schema, even though they may not
be able to use the full capabilities of the new schema. | The consumers using
the schema V3 or V2 can process data written by producers using the schema V3.
|
-| `FORWARD_TRANSITIVE` | Consumers using **all previous schemas** can
process data written by producers using a new schema.
| The consumers using the
schema V3, V2, or V1 can process data written by producers using the schema V3.
|
-
-#### Example
-
* Example 1
Add a field.
@@ -147,40 +119,28 @@ Suppose that you have a topic containing three schemas
(V1, V2, and V3), V1 is t
Consequently, you can evolve the schemas using the `FORWARD` strategy to
ensure that the old schema can process data encoded with the new schema.
-### FULL and FULL_TRANSITIVE
-
-Suppose that you have a topic containing three schemas (V1, V2, and V3), V1 is
the oldest and V3 is the latest:
-
-| Compatibility check strategy | Definition | Description | Note |
-| --- | --- | --- | --- |
-| `FULL` | Schemas are both backward and forward compatible, which means:
Consumers using the last schema can process data written by producers using the
new schema. AND Consumers using the new schema can process data written by
producers using the last schema. | Consumers using the schema V3 can process
data written by producers using the schema V3 or V2. AND Consumers using the
schema V3 or V2 can process data written by producers using the schema V3. |
<li>For Avro and JSON, [...]
-| `FULL_TRANSITIVE` | The new schema is backward and forward compatible
with all previously registered schemas. | Consumers using the schema V3 can
process data written by producers using the schema V3, V2 or V1. AND Consumers
using the schema V3, V2 or V1 can process data written by producers using the
schema V3. | None |
-
-#### Example
+### FULL and FULL_TRANSITIVE example
In some data formats, for example, Avro, you can define fields with default
values. Consequently, adding or removing a field with a default value is a
fully compatible change.
-:::tip
-You can set schema compatibility check strategy at the topic, namespace or
broker level. For how to set the strategy, see
[here](schema-manage.md#set-schema-compatibility-check-strategy).
-
-:::
-
-## Schema verification
+## Schema validation
When a producer or a consumer tries to connect to a topic, a broker performs
some checks to verify a schema.
-### Producer
+### Validation on producers
-When a producer tries to connect to a topic (suppose ignore the schema
auto-creation), a broker does the following checks:
+By default, `schemaValidationEnforced` is **disabled** for producers, which
means:
+* A producer without a schema can produce any kind of messages to a topic with
schemas, which may result in producing trash data to the topic.
+* It allows non-java language clients that don’t support schema can produce
messages to a topic with schemas.
-* Check if the schema carried by the producer exists in the schema registry or
not.
+However, if you want a stronger guarantee on the topics with schemas, you can
enable `schemaValidationEnforced` across the whole cluster or on a
per-namespace basis.
+With `schemaValidationEnforced` enabled, When a producer tries to connect to a
topic (suppose ignore the schema auto-creation), the broker checks if the
schema carried by the producer exists in the schema registry or not.
* If the schema is already registered, then the producer is connected to a
broker and produces messages with that schema.
-
* If the schema is not registered, then Pulsar verifies if the schema is
allowed to be registered based on the configured compatibility check strategy.
-### Consumer
+### Validation on consumers
When a consumer tries to connect to a topic, a broker checks if a carried
schema is compatible with a registered schema based on the configured schema
compatibility check strategy.
@@ -197,7 +157,7 @@ When a consumer tries to connect to a topic, a broker
checks if a carried schema
## Order of upgrading clients
-The order of upgrading client applications is determined by the compatibility
check strategy.
+The order of upgrading client applications is determined by the [schema
compatibility check strategy](#schema-compatibility-check-strategy).
For example, the producers use schemas to write data to Pulsar and the
consumers use schemas to read data from Pulsar.
@@ -207,8 +167,4 @@ For example, the producers use schemas to write data to
Pulsar and the consumers
| `ALWAYS_INCOMPATIBLE` | None | The schema evolution is disabled.
|
| <li>`BACKWARD` </li><li>`BACKWARD_TRANSITIVE` </li> | Consumers | There
is no guarantee that consumers using the old schema can read data produced
using the new schema. Consequently, **upgrade all consumers first**, and then
start producing new data.
|
| <li>`FORWARD` </li><li>`FORWARD_TRANSITIVE` </li> | Producers | There is
no guarantee that consumers using the new schema can read data produced using
the old schema. Consequently, **upgrade all producers first**<li>to use the new
schema and ensure that the data already produced using the old schemas are not
available to consumers, and then upgrades the consumers. </li> |
-| <li>`FULL` </li><li>`FULL_TRANSITIVE` </li> | Any order | It is
guaranteed that consumers using the old schema can read data produced using the
new schema and consumers using the new schema can read data produced using the
old schema. Consequently, you can upgrade the producers and consumers in **any
order**. |
-
-
-
-
+| <li>`FULL` </li><li>`FULL_TRANSITIVE` </li> | Any order | It is
guaranteed that consumers using the old schema can read data produced using the
new schema and consumers using the new schema can read data produced using the
old schema. Consequently, you can upgrade the producers and consumers in **any
order**. |
\ No newline at end of file
diff --git a/site2/website-next/docs/schema-get-started.md
b/site2/website-next/docs/schema-get-started.md
index be7a288dc7d..5b6cea677e3 100644
--- a/site2/website-next/docs/schema-get-started.md
+++ b/site2/website-next/docs/schema-get-started.md
@@ -4,92 +4,470 @@ title: Get started
sidebar_label: "Get started"
---
-This chapter introduces Pulsar schemas and explains why they are important.
-## Schema Registry
+````mdx-code-block
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+````
-Type safety is extremely important in any application built around a message
bus like Pulsar.
-Producers and consumers need some kind of mechanism for coordinating types at
the topic level to avoid various potential problems arising. For example,
serialization and deserialization issues.
+This hands-on tutorial provides instructions and examples on how to construct
and customize schemas.
-Applications typically adopt one of the following approaches to guarantee type
safety in messaging. Both approaches are available in Pulsar, and you're free
to adopt one or the other or to mix and match on a per-topic basis.
+## Construct a string schema
-#### Note
->
-> Currently, the Pulsar schema registry is only available for the [Java
client](client-libraries-java.md), [Go client](client-libraries-go.md), [Python
client](client-libraries-python.md), and [C++ client](client-libraries-cpp.md).
+This example demonstrates how to construct a [string
schema](schema-understand.md#primitive-type) and use it to produce and consume
messages in Java.
-### Client-side approach
+1. Create a producer with a string schema and send messages.
-Producers and consumers are responsible for not only serializing and
deserializing messages (which consist of raw bytes) but also "knowing" which
types are being transmitted via which topics.
+ ```java
+ Producer<String> producer = client.newProducer(Schema.STRING).create();
+ producer.newMessage().value("Hello Pulsar!").send();
+ ```
-If a producer is sending temperature sensor data on the topic `topic-1`,
consumers of that topic will run into trouble if they attempt to parse that
data as moisture sensor readings.
+2. Create a consumer with a string schema and receive messages.
-Producers and consumers can send and receive messages consisting of raw byte
arrays and leave all type safety enforcement to the application on an
"out-of-band" basis.
+ ```java
+ Consumer<String> consumer = client.newConsumer(Schema.STRING).subscribe();
+ consumer.receive();
+ ```
-### Server-side approach
+## Construct a key/value schema
-Producers and consumers inform the system which data types can be transmitted
via the topic.
+This example shows how to construct a [key/value
schema](schema-understand.md#keyvalue-schema) and use it to produce and consume
messages in Java.
-With this approach, the messaging system enforces type safety and ensures that
producers and consumers remain synced.
+1. Construct a key/value schema with `INLINE` encoding type.
-Pulsar has a built-in **schema registry** that enables clients to upload data
schemas on a per-topic basis. Those schemas dictate which data types are
recognized as valid for that topic.
+ ```java
+ Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
+ Schema.INT32,
+ Schema.STRING,
+ KeyValueEncodingType.INLINE
+ );
+ ```
-## Why use schema
+2. Optionally, construct a key/value schema with `SEPARATED` encoding type.
-When a schema is enabled, Pulsar does parse data, it takes bytes as inputs and
sends bytes as outputs. While data has meaning beyond bytes, you need to parse
data and might encounter parse exceptions which mainly occur in the following
situations:
+ ```java
+ Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
+ Schema.INT32,
+ Schema.STRING,
+ KeyValueEncodingType.SEPARATED
+ );
+ ```
-* The field does not exist
+3. Produce messages using a key/value schema.
-* The field type has changed (for example, `string` is changed to `int`)
+ ```java
+ Producer<KeyValue<Integer, String>> producer = client.newProducer(kvSchema)
+ .topic(TOPIC)
+ .create();
-There are a few methods to prevent and overcome these exceptions, for example,
you can catch exceptions when parsing errors, which makes code hard to
maintain; or you can adopt a schema management system to perform schema
evolution, not to break downstream applications, and enforces type safety to
max extend in the language you are using, the solution is Pulsar Schema.
+ final int key = 100;
+ final String value = "value-100";
-Pulsar schema enables you to use language-specific types of data when
constructing and handling messages from simple types like `string` to more
complex application-specific types.
+ // send the key/value message
+ producer.newMessage()
+ .value(new KeyValue(key, value))
+ .send();
+ ```
+
+4. Consume messages using a key/value schema.
+
+ ```java
+ Consumer<KeyValue<Integer, String>> consumer = client.newConsumer(kvSchema)
+ ...
+ .topic(TOPIC)
+ .subscriptionName(SubscriptionName).subscribe();
+
+ // receive key/value pair
+ Message<KeyValue<Integer, String>> msg = consumer.receive();
+ KeyValue<Integer, String> kv = msg.getValue();
+ ```
+
+## Construct a struct schema
+
+This example shows how to construct a [struct
schema](schema-understand.md#struct-schema) and use it to produce and consume
messages using different methods.
+
+````mdx-code-block
+<Tabs
+ defaultValue="static"
+
values={[{"label":"static","value":"static"},{"label":"generic","value":"generic"},{"label":"SchemaDefinition","value":"SchemaDefinition"}]}>
+
+<TabItem value="static">
+
+You can predefine the `struct` schema, which can be a POJO in Java, a `struct`
in Go, or classes generated by Avro or Protobuf tools.
+
+**Example**
+
+Pulsar gets the schema definition from the predefined `struct` using an Avro
library. The schema definition is the schema data stored as a part of the
`SchemaInfo`.
+
+1. Create the _User_ class to define the messages sent to Pulsar topics.
+
+ ```java
+ @Builder
+ @AllArgsConstructor
+ @NoArgsConstructor
+ public static class User {
+ String name;
+ int age;
+ }
+ ```
+
+2. Create a producer with a `struct` schema and send messages.
+
+ ```java
+ Producer<User> producer =
client.newProducer(Schema.AVRO(User.class)).create();
+
producer.newMessage().value(User.builder().name("pulsar-user").age(1).build()).send();
+ ```
+
+3. Create a consumer with a `struct` schema and receive messages
+
+ ```java
+ Consumer<User> consumer =
client.newConsumer(Schema.AVRO(User.class)).subscribe();
+ User user = consumer.receive().getValue();
+ ```
+
+</TabItem>
+<TabItem value="generic">
+
+Sometimes applications do not have pre-defined structs, and you can use this
method to define schema and access data.
+
+You can define the `struct` schema using the `GenericSchemaBuilder`, generate
a generic struct using `GenericRecordBuilder` and consume messages into
`GenericRecord`.
+
+**Example**
+
+1. Use `RecordSchemaBuilder` to build a schema.
+
+ ```java
+ RecordSchemaBuilder recordSchemaBuilder =
SchemaBuilder.record("schemaName");
+ recordSchemaBuilder.field("intField").type(SchemaType.INT32);
+ SchemaInfo schemaInfo = recordSchemaBuilder.build(SchemaType.AVRO);
+
+ Producer<GenericRecord> producer =
client.newProducer(Schema.generic(schemaInfo)).create();
+ ```
+
+2. Use `RecordBuilder` to build the struct records.
+
+ ```java
+ producer.newMessage().value(schema.newRecordBuilder()
+ .set("intField", 32)
+ .build()).send();
+ ```
+
+</TabItem>
+<TabItem value="SchemaDefinition">
+
+You can define the `schemaDefinition` to generate a `struct` schema.
**Example**
-You can use the _User_ class to define the messages sent to Pulsar topics.
+1. Create the _User_ class to define the messages sent to Pulsar topics.
+
+ ```java
+ @Builder
+ @AllArgsConstructor
+ @NoArgsConstructor
+ public static class User {
+ String name;
+ int age;
+ }
+ ```
+
+2. Create a producer with a `SchemaDefinition` and send messages.
+
+ ```java
+ SchemaDefinition<User> schemaDefinition =
SchemaDefinition.<User>builder().withPojo(User.class).build();
+ Producer<User> producer =
client.newProducer(Schema.AVRO(schemaDefinition)).create();
+
producer.newMessage().value(User.builder().name("pulsar-user").age(1).build()).send();
+ ```
+
+3. Create a consumer with a `SchemaDefinition` schema and receive messages
+
+ ```java
+ SchemaDefinition<User> schemaDefinition =
SchemaDefinition.<User>builder().withPojo(User.class).build();
+ Consumer<User> consumer =
client.newConsumer(Schema.AVRO(schemaDefinition)).subscribe();
+ User user = consumer.receive().getValue();
+ ```
+
+</TabItem>
+
+</Tabs>
+````
+
+### Avro schema using Java
+
+Suppose you have a `SensorReading` class as follows, and you'd like to
transmit it over a Pulsar topic.
```java
-public class User {
- String name;
- int age;
+public class SensorReading {
+ public float temperature;
+
+ public SensorReading(float temperature) {
+ this.temperature = temperature;
+ }
+
+ // A no-arg constructor is required
+ public SensorReading() {
+ }
+
+ public float getTemperature() {
+ return temperature;
+ }
+
+ public void setTemperature(float temperature) {
+ this.temperature = temperature;
+ }
}
```
-When constructing a producer with the _User_ class, you can specify a schema
or not as below.
+Create a `Producer<SensorReading>` (or `Consumer<SensorReading>`) like this:
+
+```java
+Producer<SensorReading> producer =
client.newProducer(AvroSchema.of(SensorReading.class))
+ .topic("sensor-readings")
+ .create();
+```
+
+### Avro-based schema using Java
+
+The following schema formats are currently available for Java:
+
+* No schema or the byte array schema (which can be applied using
`Schema.BYTES`):
+
+ ```java
+ Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES)
+ .topic("some-raw-bytes-topic")
+ .create();
+ ```
+
+ Or, equivalently:
+
+ ```java
+ Producer<byte[]> bytesProducer = client.newProducer()
+ .topic("some-raw-bytes-topic")
+ .create();
+ ```
+
+* `String` for normal UTF-8-encoded string data. Apply the schema using
`Schema.STRING`:
+
+ ```java
+ Producer<String> stringProducer = client.newProducer(Schema.STRING)
+ .topic("some-string-topic")
+ .create();
+ ```
+
+* Create JSON schemas for POJOs using `Schema.JSON`. The following is an
example.
+
+ ```java
+ Producer<MyPojo> pojoProducer = client.newProducer(Schema.JSON(MyPojo.class))
+ .topic("some-pojo-topic")
+ .create();
+ ```
+
+* Generate Protobuf schemas using `Schema.PROTOBUF`. The following example
shows how to create the Protobuf schema and use it to instantiate a new
producer:
+
+ ```java
+ Producer<MyProtobuf> protobufProducer =
client.newProducer(Schema.PROTOBUF(MyProtobuf.class))
+ .topic("some-protobuf-topic")
+ .create();
+ ```
+
+* Define Avro schemas with `Schema.AVRO`. The following code snippet
demonstrates how to create and use Avro schema.
+
+ ```java
+ Producer<MyAvro> avroProducer = client.newProducer(Schema.AVRO(MyAvro.class))
+ .topic("some-avro-topic")
+ .create();
+ ```
+
+
+### Avro schema using C++
+
+- The following example shows how to create a producer with an Avro schema.
+
+ ```cpp
+ static const std::string exampleSchema =
+ "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
+
"\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
+ Producer producer;
+ ProducerConfiguration producerConf;
+ producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
+ client.createProducer("topic-avro", producerConf, producer);
+ ```
+
+- The following example shows how to create a consumer with an Avro schema.
+
+ ```cpp
+ static const std::string exampleSchema =
+ "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
+
"\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
+ ConsumerConfiguration consumerConf;
+ Consumer consumer;
+ consumerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
+ client.subscribe("topic-avro", "sub-2", consumerConf, consumer)
+ ```
+
+### ProtobufNative schema using C++
+
+The following example shows how to create a producer and a consumer with a
ProtobufNative schema.
+
+1. Generate the `User` class using Protobuf3 or later versions.
+
+ ```protobuf
+ syntax = "proto3";
+
+ message User {
+ string name = 1;
+ int32 age = 2;
+ }
+ ```
+
+2. Include the `ProtobufNativeSchema.h` in your source code. Ensure the
Protobuf dependency has been added to your project.
+
+ ```cpp
+ #include <pulsar/ProtobufNativeSchema.h>
+ ```
+
+3. Create a producer to send a `User` instance.
+
+ ```cpp
+ ProducerConfiguration producerConf;
+ producerConf.setSchema(createProtobufNativeSchema(User::GetDescriptor()));
+ Producer producer;
+ client.createProducer("topic-protobuf", producerConf, producer);
+ User user;
+ user.set_name("my-name");
+ user.set_age(10);
+ std::string content;
+ user.SerializeToString(&content);
+ producer.send(MessageBuilder().setContent(content).build());
+ ```
+
+4. Create a consumer to receive a `User` instance.
+
+ ```cpp
+ ConsumerConfiguration consumerConf;
+ consumerConf.setSchema(createProtobufNativeSchema(User::GetDescriptor()));
+ consumerConf.setSubscriptionInitialPosition(InitialPositionEarliest);
+ Consumer consumer;
+ client.subscribe("topic-protobuf", "my-sub", consumerConf, consumer);
+ Message msg;
+ consumer.receive(msg);
+ User user2;
+ user2.ParseFromArray(msg.getData(), msg.getLength());
+ ```
+
+
+## Construct an AUTO_PRODUCE schema
+
+Suppose you have a Pulsar topic _P_, a producer processing messages from a
Kafka topic _K_, an application reading the messages from _K_ and writing the
messages to _P_.
+
+This example shows how to construct an
[AUTO_PRODUCE](schema-understand.md#auto-schema) schema to verify whether the
bytes produced by _K_ can be sent to _P_.
+
+```java
+Produce<byte[]> pulsarProducer = client.newProducer(Schema.AUTO_PRODUCE())
+ …
+ .create();
+
+byte[] kafkaMessageBytes = … ;
+
+pulsarProducer.produce(kafkaMessageBytes);
+```
+
+## Construct an AUTO_CONSUME schema
+
+Suppose you have a Pulsar topic _P_, a consumer (for example, _MySQL_)
receiving messages from the topic _P_, an application reading the messages from
_P_ and writing the messages to _MySQL_.
+
+This example shows how to construct an [AUTO_CONSUME
schema](schema-understand.md#auto-schema) to verify whether the bytes produced
by _P_ can be sent to _MySQL_.
+
+```java
+Consumer<GenericRecord> pulsarConsumer =
client.newConsumer(Schema.AUTO_CONSUME())
+ …
+ .subscribe();
+
+Message<GenericRecord> msg = consumer.receive() ;
+GenericRecord record = msg.getValue();
+```
+
+## Construct a native Avro schema
+
+This example shows how to construct a [native Avro
schema](schema-understand.md#native-avro-schema).
+
+```java
+org.apache.avro.Schema nativeAvroSchema = … ;
+
+Producer<byte[]> producer =
pulsarClient.newProducer().topic("ingress").create();
+
+byte[] content = … ;
+
+producer.newMessage(Schema.NATIVE_AVRO(nativeAvroSchema)).value(content).send();
+```
+
+## Customize schema storage
+
+By default, Pulsar stores various data types of schemas in [Apache
BookKeeper](https://bookkeeper.apache.org) deployed alongside Pulsar.
Alternatively, you can use another storage system if needed.
-### Without schema
+To use a non-default (non-BookKeeper) storage system for Pulsar schemas, you
need to implement the following Java interfaces:
+* [SchemaStorage interface](#schemastorage-interface)
+* [SchemaStorageFactory interface](#schemastoragefactory-interface)
-If you construct a producer without specifying a schema, then the producer can
only produce messages of type `byte[]`. If you have a POJO class, you need to
serialize the POJO into bytes before sending messages.
+### Implement SchemaStorage interface
-**Example**
+The `SchemaStorage` interface has the following methods:
```java
-Producer<byte[]> producer = client.newProducer()
- .topic(topic)
- .create();
-User user = new User("Tom", 28);
-byte[] message = … // serialize the `user` by yourself;
-producer.send(message);
+public interface SchemaStorage {
+ // How schemas are updated
+ CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[]
hash);
+
+ // How schemas are fetched from storage
+ CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
+
+ // How schemas are deleted
+ CompletableFuture<SchemaVersion> delete(String key);
+
+ // Utility method for converting a schema version byte array to a
SchemaVersion object
+ SchemaVersion versionFromBytes(byte[] version);
+
+ // Startup behavior for the schema storage client
+ void start() throws Exception;
+
+ // Shutdown behavior for the schema storage client
+ void close() throws Exception;
+}
```
-### With schema
+:::tip
-If you construct a producer with specifying a schema, then you can send a
class to a topic directly without worrying about how to serialize POJOs into
bytes.
+For a complete example of **schema storage** implementation, see
[BookKeeperSchemaStorage](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java)
class.
-**Example**
+:::
-This example constructs a producer with the _JSONSchema_, and you can send the
_User_ class to topics directly without worrying about how to serialize it into
bytes.
+### Implement SchemaStorageFactory interface
+
+The `SchemaStorageFactory` interface has the following method:
```java
-Producer<User> producer = client.newProducer(JSONSchema.of(User.class))
- .topic(topic)
- .create();
-User user = new User("Tom", 28);
-producer.send(user);
+public interface SchemaStorageFactory {
+ @NotNull
+ SchemaStorage create(PulsarService pulsar) throws Exception;
+}
```
-### Summary
+:::tip
+
+For a complete example of **schema storage factory** implementation, see
[BookKeeperSchemaStorageFactory](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java)
class.
+
+:::
+
+### Deploy custom schema storage
+
+To use your custom schema storage implementation, perform the following steps.
-When constructing a producer with a schema, you do not need to serialize
messages into bytes, instead Pulsar schema does this job in the background.
+1. Package the implementation in a
[JAR](https://docs.oracle.com/javase/tutorial/deployment/jar/basicsindex.html)
file.
+
+2. Add the JAR file to the `lib` folder in your Pulsar binary or source
distribution.
+
+3. Change the `schemaRegistryStorageClassName` configuration in the
`conf/broker.conf` file to your custom factory class.
+
+4. Start Pulsar.
diff --git a/site2/website-next/docs/schema-manage.md
b/site2/website-next/docs/schema-manage.md
index 8c65ac4d7b4..69311d10770 100644
--- a/site2/website-next/docs/schema-manage.md
+++ b/site2/website-next/docs/schema-manage.md
@@ -5,767 +5,7 @@ sidebar_label: "Manage schema"
---
````mdx-code-block
-import Tabs from '@theme/Tabs';
-import TabItem from '@theme/TabItem';
-````
-
-
-This guide demonstrates the ways to manage schemas:
-
-* Automatically
-
- * [Schema AutoUpdate](#schema-autoupdate)
-
-* Manually
-
- * [Schema manual management](#schema-manual-management)
-
- * [Custom schema storage](#custom-schema-storage)
-
-## Schema AutoUpdate
-
-If a schema passes the schema compatibility check, Pulsar producer
automatically updates this schema to the topic it produces by default.
-
-### AutoUpdate for producer
-
-For a producer, the `AutoUpdate` happens in the following cases:
-
-* If a **topic doesn’t have a schema**, Pulsar registers a schema
automatically.
-
-* If a **topic has a schema**:
-
- * If a **producer doesn’t carry a schema**:
-
- * If `isSchemaValidationEnforced` or `schemaValidationEnforced` is
**disabled** in the namespace to which the topic belongs, the producer is
allowed to connect to the topic and produce data.
-
- * If `isSchemaValidationEnforced` or `schemaValidationEnforced` is
**enabled** in the namespace to which the topic belongs, the producer is
rejected and disconnected.
-
- * If a **producer carries a schema**:
-
- A broker performs the compatibility check based on the configured
compatibility check strategy of the namespace to which the topic belongs.
-
- * If the schema is registered, a producer is connected to a broker.
-
- * If the schema is not registered:
-
- * If `isAllowAutoUpdateSchema` sets to **false**, the producer is
rejected to connect to a broker.
-
- * If `isAllowAutoUpdateSchema` sets to **true**:
-
- * If the schema passes the compatibility check, then the broker
registers a new schema automatically for the topic and the producer is
connected.
-
- * If the schema does not pass the compatibility check, then the
broker does not register a schema and the producer is rejected to connect to a
broker.
-
-
-
-### AutoUpdate for consumer
-
-For a consumer, the `AutoUpdate` happens in the following cases:
-
-* If a **consumer connects to a topic without a schema** (which means the
consumer receiving raw bytes), the consumer can connect to the topic
successfully without doing any compatibility check.
-
-* If a **consumer connects to a topic with a schema**.
-
- * If a topic does not have all of them (a schema/data/a local consumer and a
local producer):
-
- * If `isAllowAutoUpdateSchema` sets to **true**, then the consumer
registers a schema and it is connected to a broker.
-
- * If `isAllowAutoUpdateSchema` sets to **false**, then the consumer is
rejected to connect to a broker.
-
- * If a topic has one of them (a schema/data/a local consumer and a local
producer), then the schema compatibility check is performed.
-
- * If the schema passes the compatibility check, then the consumer is
connected to the broker.
-
- * If the schema does not pass the compatibility check, then the consumer
is rejected to connect to the broker.
-
-
-
-
-### Manage AutoUpdate strategy
-
-You can use the `pulsar-admin` command to manage the `AutoUpdate` strategy as
below:
-
-* [Enable AutoUpdate](#enable-autoupdate)
-
-* [Disable AutoUpdate](#disable-autoupdate)
-
-* [Adjust compatibility](#adjust-compatibility)
-
-#### Enable AutoUpdate
-
-To enable `AutoUpdate` on a namespace, you can use the `pulsar-admin` command.
-
-```bash
-bin/pulsar-admin namespaces set-is-allow-auto-update-schema --enable
tenant/namespace
-```
-
-#### Disable AutoUpdate
-
-To disable `AutoUpdate` on a namespace, you can use the `pulsar-admin` command.
-
-```bash
-bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable
tenant/namespace
-```
-
-Once the `AutoUpdate` is disabled, you can only register a new schema using
the `pulsar-admin` command.
-
-#### Adjust compatibility
-
-To adjust the schema compatibility level on a namespace, you can use the
`pulsar-admin` command.
-
-```bash
-bin/pulsar-admin namespaces set-schema-compatibility-strategy --compatibility
<compatibility-level> tenant/namespace
-```
-
-### Schema validation
-
-By default, `schemaValidationEnforced` is **disabled** for producers:
-
-* This means a producer without a schema can produce any kind of messages to a
topic with schemas, which may result in producing trash data to the topic.
-
-* This allows non-java language clients that don’t support schema can produce
messages to a topic with schemas.
-
-However, if you want a stronger guarantee on the topics with schemas, you can
enable `schemaValidationEnforced` across the whole cluster or on a
per-namespace basis.
-
-#### Enable schema validation
-
-To enable `schemaValidationEnforced` on a namespace, you can use the
`pulsar-admin` command.
-
-```bash
-bin/pulsar-admin namespaces set-schema-validation-enforce --enable
tenant/namespace
-```
-
-#### Disable schema validation
-
-To disable `schemaValidationEnforced` on a namespace, you can use the
`pulsar-admin` command.
-
-```bash
-bin/pulsar-admin namespaces set-schema-validation-enforce --disable
tenant/namespace
-```
-
-## Schema manual management
-
-To manage schemas, you can use one of the following methods.
-
-| Method | Description |
-| --- | --- |
-| **Admin CLI**<li></li> | You can use the `pulsar-admin` tool to manage
Pulsar schemas, brokers, clusters, sources, sinks, topics, tenants and so on.
For more information about how to use the `pulsar-admin` tool, see
[here](/tools/pulsar-admin/). |
-| **REST API**<li></li> | Pulsar exposes schema related management API in
Pulsar’s admin RESTful API. You can access the admin RESTful endpoint directly
to manage schemas. For more information about how to use the Pulsar REST API,
see [here](/admin-rest-api/). |
-| **Java Admin API**<li></li> | Pulsar provides Java admin library. |
-
-### Upload a schema
-
-To upload (register) a new schema for a topic, you can use one of the
following methods.
-
-````mdx-code-block
-<Tabs groupId="api-choice"
- defaultValue="Admin CLI"
- values={[{"label":"Admin CLI","value":"Admin CLI"},{"label":"REST
API","value":"REST API"},{"label":"Java Admin API","value":"Java Admin API"}]}>
-
-<TabItem value="Admin CLI">
-
-Use the `upload` subcommand.
-
-```bash
-pulsar-admin schemas upload --filename <schema-definition-file> <topic-name>
-```
-
-The `schema-definition-file` is in JSON format.
-
-```json
-{
- "type": "<schema-type>",
- "schema": "<an-utf8-encoded-string-of-schema-definition-data>",
- "properties": {} // the properties associated with the schema
-}
-```
-
-The `schema-definition-file` includes the following fields:
-
-| Field | Description |
-| --- | --- |
-| `type` | The schema type. |
-| `schema` | The schema definition data, which is encoded in UTF 8
charset. <li>If the schema is a </li>**primitive**<li>schema, this field should
be blank. </li><li>If the schema is a </li>**struct**<li>schema, this field
should be a JSON string of the Avro schema definition. </li> |
-| `properties` | The additional properties associated with the schema. |
-
-Here are examples of the `schema-definition-file` for a JSON schema.
-
-**Example 1**
-
-```json
-{
- "type": "JSON",
- "schema":
"{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"file1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file3\",\"type\":[\"string\",\"null\"],\"default\":\"dfdf\"}]}",
- "properties": {}
-}
-```
-
-**Example 2**
-
-```json
-{
- "type": "STRING",
- "schema": "",
- "properties": {
- "key1": "value1"
- }
-}
-```
-
-</TabItem>
-<TabItem value="REST API">
-
-Send a `POST` request to this endpoint: {@inject:
endpoint|POST|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/uploadSchema?version=@pulsar:version_number@}
-
-The post payload is in JSON format.
-
-```json
-{
- "type": "<schema-type>",
- "schema": "<an-utf8-encoded-string-of-schema-definition-data>",
- "properties": {} // the properties associated with the schema
-}
-```
-
-The post payload includes the following fields:
-
-| Field | Description |
-| --- | --- |
-| `type` | The schema type. |
-| `schema` | The schema definition data, which is encoded in UTF 8
charset. <li>If the schema is a </li>**primitive**<li>schema, this field should
be blank. </li><li>If the schema is a </li>**struct**<li>schema, this field
should be a JSON string of the Avro schema definition. </li> |
-| `properties` | The additional properties associated with the schema. |
-
-</TabItem>
-<TabItem value="Java Admin API">
-
-```java
-void createSchema(String topic, PostSchemaPayload schemaPayload)
-```
-
-The `PostSchemaPayload` includes the following fields:
-
-| Field | Description |
-| --- | --- |
-| `type` | The schema type. |
-| `schema` | The schema definition data, which is encoded in UTF 8
charset. <li>If the schema is a </li>**primitive**<li>schema, this field should
be blank. </li><li>If the schema is a </li>**struct**<li>schema, this field
should be a JSON string of the Avro schema definition. </li> |
-| `properties` | The additional properties associated with the schema. |
-
-Here is an example of `PostSchemaPayload`:
-
-```java
-PulsarAdmin admin = …;
-
-PostSchemaPayload payload = new PostSchemaPayload();
-payload.setType("INT8");
-payload.setSchema("");
-
-admin.createSchema("my-tenant/my-ns/my-topic", payload);
-```
-
-</TabItem>
-
-</Tabs>
-````
-
-### Get a schema (latest)
-
-To get the latest schema for a topic, you can use one of the following
methods.
-
-````mdx-code-block
-<Tabs groupId="api-choice"
- defaultValue="Admin CLI"
- values={[{"label":"Admin CLI","value":"Admin CLI"},{"label":"REST
API","value":"REST API"},{"label":"Java Admin API","value":"Java Admin API"}]}>
-
-<TabItem value="Admin CLI">
-
-Use the `get` subcommand.
-
-```bash
-pulsar-admin schemas get <topic-name>
-
-{
- "version": 0,
- "type": "String",
- "timestamp": 0,
- "data": "string",
- "properties": {
- "property1": "string",
- "property2": "string"
- }
-}
-```
-
-</TabItem>
-<TabItem value="REST API">
-
-Send a `GET` request to this endpoint: {@inject:
endpoint|GET|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/getSchema?version=@pulsar:version_number@}
-
-Here is an example of a response, which is returned in JSON format.
-
-```json
-{
- "version": "<the-version-number-of-the-schema>",
- "type": "<the-schema-type>",
- "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>",
- "data": "<an-utf8-encoded-string-of-schema-definition-data>",
- "properties": {} // the properties associated with the schema
-}
-```
-
-The response includes the following fields:
-
-| Field | Description |
-| --- | --- |
-| `version` | The schema version, which is a long number. |
-| `type` | The schema type. |
-| `timestamp` | The timestamp of creating this version of schema. |
-| `data` | The schema definition data, which is encoded in UTF 8 charset.
<li>If the schema is a </li>**primitive**<li>schema, this field should be
blank. </li><li>If the schema is a </li>**struct**<li>schema, this field should
be a JSON string of the Avro schema definition. </li> |
-| `properties` | The additional properties associated with the schema. |
-
-</TabItem>
-<TabItem value="Java Admin API">
-
-```java
-SchemaInfo createSchema(String topic)
-```
-
-The `SchemaInfo` includes the following fields:
-
-| Field | Description |
-| --- | --- |
-| `name` | The schema name. |
-| `type` | The schema type. |
-| `schema` | A byte array of the schema definition data, which is encoded
in UTF 8 charset. <li>If the schema is a </li>**primitive**<li>schema, this
byte array should be empty. </li><li>If the schema is a
</li>**struct**<li>schema, this field should be a JSON string of the Avro
schema definition converted to a byte array. </li> |
-| `properties` | The additional properties associated with the schema. |
-
-Here is an example of `SchemaInfo`:
-
-```java
-PulsarAdmin admin = …;
-
-SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic");
-```
-
-</TabItem>
-
-</Tabs>
-````
-
-### Get a schema (specific)
-
-To get a specific version of a schema, you can use one of the following
methods.
-
-````mdx-code-block
-<Tabs groupId="api-choice"
- defaultValue="Admin CLI"
- values={[{"label":"Admin CLI","value":"Admin CLI"},{"label":"REST
API","value":"REST API"},{"label":"Java Admin API","value":"Java Admin API"}]}>
-
-<TabItem value="Admin CLI">
-
-Use the `get` subcommand.
-
-```bash
-pulsar-admin schemas get <topic-name> --version=<version>
-```
-
-</TabItem>
-<TabItem value="REST API">
-
-Send a `GET` request to a schema endpoint: {@inject:
endpoint|GET|/admin/v2/schemas/:tenant/:namespace/:topic/schema/:version|operation/getSchema?version=@pulsar:version_number@}
-
-Here is an example of a response, which is returned in JSON format.
-
-```json
-{
- "version": "<the-version-number-of-the-schema>",
- "type": "<the-schema-type>",
- "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>",
- "data": "<an-utf8-encoded-string-of-schema-definition-data>",
- "properties": {} // the properties associated with the schema
-}
-```
-
-The response includes the following fields:
-
-| Field | Description |
-| --- | --- |
-| `version` | The schema version, which is a long number. |
-| `type` | The schema type. |
-| `timestamp` | The timestamp of creating this version of schema. |
-| `data` | The schema definition data, which is encoded in UTF 8 charset.
<li>If the schema is a </li>**primitive**<li>schema, this field should be
blank. </li><li>If the schema is a </li>**struct**<li>schema, this field should
be a JSON string of the Avro schema definition. </li> |
-| `properties` | The additional properties associated with the schema. |
-
-</TabItem>
-<TabItem value="Java Admin API">
-
-```java
-SchemaInfo createSchema(String topic, long version)
-```
-
-The `SchemaInfo` includes the following fields:
-
-| Field | Description |
-| --- | --- |
-| `name` | The schema name. |
-| `type` | The schema type. |
-| `schema` | A byte array of the schema definition data, which is encoded
in UTF 8. <li>If the schema is a </li>**primitive**<li>schema, this byte array
should be empty. </li><li>If the schema is a </li>**struct**<li>schema, this
field should be a JSON string of the Avro schema definition converted to a byte
array. </li> |
-| `properties` | The additional properties associated with the schema. |
-
-Here is an example of `SchemaInfo`:
-
-```java
-PulsarAdmin admin = …;
-
-SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic", 1L);
-```
-
-</TabItem>
-
-</Tabs>
-````
-
-### Extract a schema
-
-To provide a schema via a topic, you can use the following method.
-
-````mdx-code-block
-<Tabs groupId="api-choice"
- defaultValue="Admin CLI"
- values={[{"label":"Admin CLI","value":"Admin CLI"}]}>
-
-<TabItem value="Admin CLI">
-
-Use the `extract` subcommand.
-
-```bash
-pulsar-admin schemas extract --classname <class-name> --jar <jar-path> --type
<type-name>
-```
-
-</TabItem>
-
-</Tabs>
-````
-
-### Delete a schema
-
-To delete a schema for a topic, you can use one of the following methods.
-
-:::note
-
-In any case, the **delete** action deletes **all versions** of a schema
registered for a topic.
-
-:::
-
-````mdx-code-block
-<Tabs groupId="api-choice"
- defaultValue="Admin CLI"
- values={[{"label":"Admin CLI","value":"Admin CLI"},{"label":"REST
API","value":"REST API"},{"label":"Java Admin API","value":"Java Admin API"}]}>
-
-<TabItem value="Admin CLI">
-
-Use the `delete` subcommand.
-
-```bash
-pulsar-admin schemas delete <topic-name>
-```
-
-</TabItem>
-<TabItem value="REST API">
-
-Send a `DELETE` request to a schema endpoint: {@inject:
endpoint|DELETE|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/deleteSchema?version=@pulsar:version_number@}
-
-Here is an example of a response, which is returned in JSON format.
-
-```json
-{
- "version": "<the-latest-version-number-of-the-schema>",
-}
-```
-
-The response includes the following field:
-
-Field | Description |
----|---|
-`version` | The schema version, which is a long number. |
-
-</TabItem>
-<TabItem value="Java Admin API">
-
-```java
-void deleteSchema(String topic)
-```
-
-Here is an example of deleting a schema.
-
-```java
-PulsarAdmin admin = …;
-
-admin.deleteSchema("my-tenant/my-ns/my-topic");
-```
-
-</TabItem>
-
-</Tabs>
-````
-
-## Custom schema storage
-
-By default, Pulsar stores various data types of schemas in [Apache
BookKeeper](https://bookkeeper.apache.org) deployed alongside Pulsar.
-
-However, you can use another storage system if needed.
-
-### Implement
-
-To use a non-default (non-BookKeeper) storage system for Pulsar schemas, you
need to implement the following Java interfaces:
-
-* [SchemaStorage interface](#schemastorage-interface)
-
-* [SchemaStorageFactory interface](#schemastoragefactory-interface)
-
-#### SchemaStorage interface
-
-The `SchemaStorage` interface has the following methods:
-
-```java
-public interface SchemaStorage {
- // How schemas are updated
- CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[]
hash);
-
- // How schemas are fetched from storage
- CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
-
- // How schemas are deleted
- CompletableFuture<SchemaVersion> delete(String key);
-
- // Utility method for converting a schema version byte array to a
SchemaVersion object
- SchemaVersion versionFromBytes(byte[] version);
-
- // Startup behavior for the schema storage client
- void start() throws Exception;
-
- // Shutdown behavior for the schema storage client
- void close() throws Exception;
-}
-```
-
-:::tip
-
-For a complete example of **schema storage** implementation, see
[BookKeeperSchemaStorage](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java)
class.
-
-:::
-
-#### SchemaStorageFactory interface
-
-The `SchemaStorageFactory` interface has the following method:
-
-```java
-public interface SchemaStorageFactory {
- @NotNull
- SchemaStorage create(PulsarService pulsar) throws Exception;
-}
-```
-
-:::tip
-
-For a complete example of **schema storage factory** implementation, see
[BookKeeperSchemaStorageFactory](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java)
class.
-
-:::
-
-### Deploy
-
-To use your custom schema storage implementation, perform the following steps.
-
-1. Package the implementation in a
[JAR](https://docs.oracle.com/javase/tutorial/deployment/jar/basicsindex.html)
file.
-
-2. Add the JAR file to the `lib` folder in your Pulsar binary or source
distribution.
-
-3. Change the `schemaRegistryStorageClassName` configuration in `broker.conf`
to your custom factory class.
-
-4. Start Pulsar.
-
-## Set schema compatibility check strategy
-
-You can set [schema compatibility check
strategy](schema-evolution-compatibility.md#schema-compatibility-check-strategy)
at the topic, namespace or broker level.
-
-The schema compatibility check strategy set at different levels has priority:
topic level > namespace level > broker level.
-
-- If you set the strategy at both topic and namespace level, it uses the
topic-level strategy.
-
-- If you set the strategy at both namespace and broker level, it uses the
namespace-level strategy.
-
-- If you do not set the strategy at any level, it uses the `FULL` strategy.
For all available values, see
[here](schema-evolution-compatibility.md#schema-compatibility-check-strategy).
-
-
-### Topic level
-
-To set a schema compatibility check strategy at the topic level, use one of
the following methods.
-
-````mdx-code-block
-<Tabs groupId="api-choice"
- defaultValue="Admin CLI"
- values={[{"label":"Admin CLI","value":"Admin CLI"},{"label":"REST
API","value":"REST API"},{"label":"Java Admin API","value":"Java Admin API"}]}>
-
-<TabItem value="Admin CLI">
-
-Use the [`pulsar-admin topicPolicies
set-schema-compatibility-strategy`](/tools/pulsar-admin/) command.
-
-```shell
-pulsar-admin topicPolicies set-schema-compatibility-strategy <strategy>
<topicName>
-```
-
-</TabItem>
-<TabItem value="REST API">
-
-Send a `PUT` request to this endpoint: {@inject:
endpoint|PUT|/admin/v2/topics/:tenant/:namespace/:topic|operation/schemaCompatibilityStrategy?version=@pulsar:version_number@}
-
-</TabItem>
-<TabItem value="Java Admin API">
-
-```java
-void setSchemaCompatibilityStrategy(String topic, SchemaCompatibilityStrategy
strategy)
-```
-
-Here is an example of setting a schema compatibility check strategy at the
topic level.
-
-```java
-PulsarAdmin admin = …;
-
-admin.topicPolicies().setSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic",
SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
-```
-
-</TabItem>
-
-</Tabs>
-````
-<br />
-To get the topic-level schema compatibility check strategy, use one of the
following methods.
-
-````mdx-code-block
-<Tabs groupId="api-choice"
- defaultValue="Admin CLI"
- values={[{"label":"Admin CLI","value":"Admin CLI"},{"label":"REST
API","value":"REST API"},{"label":"Java Admin API","value":"Java Admin API"}]}>
-
-<TabItem value="Admin CLI">
-
-Use the [`pulsar-admin topicPolicies
get-schema-compatibility-strategy`](/tools/pulsar-admin/) command.
-
-```shell
-pulsar-admin topicPolicies get-schema-compatibility-strategy <topicName>
-```
-
-</TabItem>
-<TabItem value="REST API">
-
-Send a `GET` request to this endpoint: {@inject:
endpoint|GET|/admin/v2/topics/:tenant/:namespace/:topic|operation/schemaCompatibilityStrategy?version=@pulsar:version_number@}
-
-</TabItem>
-<TabItem value="Java Admin API">
-
-```java
-SchemaCompatibilityStrategy getSchemaCompatibilityStrategy(String topic,
boolean applied)
-```
-
-Here is an example of getting the topic-level schema compatibility check
strategy.
-
-```java
-PulsarAdmin admin = …;
-
-// get the current applied schema compatibility strategy
-admin.topicPolicies().getSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic",
true);
-
-// only get the schema compatibility strategy from topic policies
-admin.topicPolicies().getSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic",
false);
-```
-
-</TabItem>
-
-</Tabs>
-````
-<br />
-To remove the topic-level schema compatibility check strategy, use one of the
following methods.
-
-````mdx-code-block
-<Tabs groupId="api-choice"
- defaultValue="Admin CLI"
- values={[{"label":"Admin CLI","value":"Admin CLI"},{"label":"REST
API","value":"REST API"},{"label":"Java Admin API","value":"Java Admin API"}]}>
-
-<TabItem value="Admin CLI">
-
-Use the [`pulsar-admin topicPolicies
remove-schema-compatibility-strategy`](/tools/pulsar-admin/) command.
-
-```shell
-pulsar-admin topicPolicies remove-schema-compatibility-strategy <topicName>
-```
-
-</TabItem>
-<TabItem value="REST API">
-
-Send a `DELETE` request to this endpoint: {@inject:
endpoint|DELETE|/admin/v2/topics/:tenant/:namespace/:topic|operation/schemaCompatibilityStrategy?version=@pulsar:version_number@}
-
-</TabItem>
-<TabItem value="Java Admin API">
-
-```java
-void removeSchemaCompatibilityStrategy(String topic)
-```
-
-Here is an example of removing the topic-level schema compatibility check
strategy.
-
-```java
-PulsarAdmin admin = …;
-
-admin.removeSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic");
-```
-
-</TabItem>
-
-</Tabs>
-````
-
-
-### Namespace level
-
-You can set schema compatibility check strategy at namespace level using one
of the following methods.
-
-````mdx-code-block
-<Tabs groupId="api-choice"
- defaultValue="Admin CLI"
- values={[{"label":"Admin CLI","value":"Admin CLI"},{"label":"REST
API","value":"REST API"},{"label":"Java Admin API","value":"Java Admin API"}]}>
-
-<TabItem value="Admin CLI">
-
-Use the [`pulsar-admin namespaces
set-schema-compatibility-strategy`](/tools/pulsar-admin/) command.
-
-```shell
-pulsar-admin namespaces set-schema-compatibility-strategy options
-```
-
-</TabItem>
-<TabItem value="REST API">
-
-Send a `PUT` request to this endpoint: {@inject:
endpoint|PUT|/admin/v2/namespaces/:tenant/:namespace|operation/schemaCompatibilityStrategy?version=@pulsar:version_number@}
-
-</TabItem>
-<TabItem value="Java Admin API">
-
-Use the [`setSchemaCompatibilityStrategy`](/api/admin/)method.
-
-```java
-admin.namespaces().setSchemaCompatibilityStrategy("test",
SchemaCompatibilityStrategy.FULL);
-```
-
-</TabItem>
-
-</Tabs>
-````
-
-### Broker level
-
-You can set schema compatibility check strategy at broker level by setting
`schemaCompatibilityStrategy` in
[`broker.conf`](https://github.com/apache/pulsar/blob/f24b4890c278f72a67fe30e7bf22dc36d71aac6a/conf/broker.conf#L1240)
or
[`standalone.conf`](https://github.com/apache/pulsar/blob/master/conf/standalone.conf)
file.
-
-**Example**
-
-```conf
-schemaCompatibilityStrategy=ALWAYS_INCOMPATIBLE
-```
+import {Redirect} from '@docusaurus/router';
+<Redirect to="admin-api-schemas" />
+````
\ No newline at end of file
diff --git a/site2/website-next/docs/schema-overview.md
b/site2/website-next/docs/schema-overview.md
new file mode 100644
index 00000000000..9560a95443f
--- /dev/null
+++ b/site2/website-next/docs/schema-overview.md
@@ -0,0 +1,154 @@
+---
+id: schema-overview
+title: Overview
+sidebar_label: "Overview"
+---
+
+This section introduces the following content:
+* [What is Pulsar Schema](#what-is-pulsar-schema)
+* [Why use it](#why-use-it)
+* [How it works](#how-it-works)
+* [Use case](#use-case)
+* [What's next?](#whats-next)
+
+## What is Pulsar Schema
+
+Pulsar messages are stored as unstructured byte arrays and the data structure
(as known as schema) is applied to this data only when it's read. The schema
serializes the bytes before they are published to a topic and deserializes them
before they are delivered to the consumers, dictating which data types are
recognized as valid for a given topic.
+
+Pulsar schema registry is a central repository to store the schema
information, which enables producers/consumers to coordinate on the schema of a
topic’s data through brokers.
+
+:::note
+
+Currently, Pulsar schema is only available for the [Java
client](client-libraries-java.md), [Go client](client-libraries-go.md), [Python
client](client-libraries-python.md), and [C++ client](client-libraries-cpp.md).
+
+:::
+
+## Why use it
+
+Type safety is extremely important in any application built around a messaging
and streaming system. Raw bytes are flexible for data transfer, but the
flexibility and neutrality come with a cost: you have to overlay data type
checking and serialization/deserialization to ensure that the bytes fed into
the system can be read and successfully consumed. In other words, you need to
make sure the data intelligible and usable to applications.
+
+Pulsar schema resolves the pain points with the following capabilities:
+* enforces the data type safety when a topic has a schema defined. As a
result, producers/consumers are only allowed to connect if they are using a
“compatible” schema.
+* provides a central location for storing information about the schemas used
within your organization, in turn greatly simplifies the sharing of this
information across application teams.
+* serves as a single source of truth for all the message schemas used across
all your services and development teams, which makes it easier for them to
collaborate.
+* keeps data compatibility on-track between schema versions. When new schemas
are uploaded, the new versions can be read by old consumers.
+* stored in the existing storage layer BookKeeper, no additional system
required.
+
+## How it works
+
+Pulsar schemas are applied and enforced at the **topic** level (schemas cannot
be applied at the namespace or tenant level).
+
+Producers and consumers upload schemas to brokers, so Pulsar schemas work on
the producer side and the consumer side.
+
+### Producer side
+
+This diagram illustrates how schema works on the Producer side.
+
+
+
+1. The application uses a schema instance to construct a producer instance.
+
+ The schema instance defines the schema for the data being produced using
the producer instance.
+
+ Take AVRO as an example, Pulsar extracts schema definition from the POJO
class and constructs the `SchemaInfo` that the producer needs to pass to a
broker when it connects.
+
+2. The producer connects to the broker with the `SchemaInfo` extracted from
the passed-in schema instance.
+
+3. The broker looks up the schema in the schema storage to check if it is
already a registered schema.
+
+4. If yes, the broker skips the schema validation since it is a known schema,
and returns the schema version to the producer.
+
+5. If no, the broker verifies whether a schema can be automatically created in
this namespace:
+
+ * If `isAllowAutoUpdateSchema` sets to **true**, then a schema can be
created, and the broker validates the schema based on the schema compatibility
check strategy defined for the topic.
+
+ * If `isAllowAutoUpdateSchema` sets to **false**, then a schema can not be
created, and the producer is rejected to connect to the broker.
+
+ :::tip
+
+ `isAllowAutoUpdateSchema` can be set via **Pulsar admin API** or **REST
API.**
+
+ For how to set `isAllowAutoUpdateSchema` via Pulsar admin API, see [Manage
AutoUpdate Strategy](admin-api-schemas.md#manage-autoupdate-strategy).
+
+ :::
+
+6. If the schema is allowed to be updated, then the compatible strategy check
is performed.
+
+ * If the schema is compatible, the broker stores it and returns the schema
version to the producer. All the messages produced by this producer are tagged
with the schema version.
+
+ * If the schema is incompatible, the broker rejects it.
+
+### Consumer side
+
+This diagram illustrates how schema works on the consumer side.
+
+
+
+1. The application uses a schema instance to construct a consumer instance.
+
+ The schema instance defines the schema that the consumer uses for decoding
messages received from a broker.
+
+2. The consumer connects to the broker with the `SchemaInfo` extracted from
the passed-in schema instance.
+
+3. The broker determines whether the topic has one of them (a schema/data/a
local consumer and a local producer).
+
+4. If a topic does not have all of them (a schema/data/a local consumer and a
local producer):
+
+ * If `isAllowAutoUpdateSchema` sets to **true**, then the consumer
registers a schema and it is connected to a broker.
+
+ * If `isAllowAutoUpdateSchema` sets to **false**, then the consumer is
rejected to connect to a broker.
+
+5. If a topic has one of them (a schema/data/a local consumer and a local
producer), then the schema compatibility check is performed.
+
+ * If the schema passes the compatibility check, then the consumer is
connected to the broker.
+
+ * If the schema does not pass the compatibility check, then the consumer
is rejected to connect to the broker.
+
+6. The consumer receives messages from the broker.
+
+ If the schema used by the consumer supports schema versioning (for example,
AVRO schema), the consumer fetches the `SchemaInfo` of the version tagged in
messages and uses the passed-in schema and the schema tagged in messages to
decode the messages.
+
+## Use case
+
+You can use language-specific types of data when constructing and handling
messages from simple data types like `string` to more complex
application-specific types.
+
+For example, you are using the _User_ class to define the messages sent to
Pulsar topics.
+
+```java
+public class User {
+ String name;
+ int age;
+}
+```
+
+**Without a schema**
+
+If you construct a producer without specifying a schema, then the producer can
only produce messages of type `byte[]`. If you have a POJO class, you need to
serialize the POJO into bytes before sending messages.
+
+```java
+Producer<byte[]> producer = client.newProducer()
+ .topic(topic)
+ .create();
+User user = new User("Tom", 28);
+byte[] message = … // serialize the `user` by yourself;
+producer.send(message);
+```
+
+**With a schema**
+
+This example constructs a producer with the _JSONSchema_, and you can send the
_User_ class to topics directly without worrying about how to serialize POJOs
into bytes.
+
+```java
+Producer<User> producer = client.newProducer(JSONSchema.of(User.class))
+ .topic(topic)
+ .create();
+User user = new User("Tom", 28);
+producer.send(user);
+```
+
+## What's next?
+
+* [Understand basic concepts](schema-understand.md)
+* [Schema evolution and compatibility](schema-evolution-compatibility.md)
+* [Get started](schema-get-started.md)
+* [Manage schema](admin-api-schemas.md)
diff --git a/site2/website-next/docs/schema-understand.md
b/site2/website-next/docs/schema-understand.md
index 434b26c9893..64cb6c9b6d8 100644
--- a/site2/website-next/docs/schema-understand.md
+++ b/site2/website-next/docs/schema-understand.md
@@ -10,22 +10,20 @@ import TabItem from '@theme/TabItem';
````
-This chapter explains the basic concepts of Pulsar schema, focuses on the
topics of particular importance, and provides additional background.
+This section explains the basic concepts of Pulsar schema and provides
additional reference.
-## SchemaInfo
+## Schema definition
-Pulsar schema is defined in a data structure called `SchemaInfo`.
-
-The `SchemaInfo` is stored and enforced on a per-topic basis and cannot be
stored at the namespace or tenant level.
+Pulsar schema is defined in a data structure called `SchemaInfo`. It is stored
and enforced on a per-topic basis and cannot be stored at the namespace or
tenant level.
A `SchemaInfo` consists of the following fields:
| Field | Description |
| --- | --- |
| `name` | Schema name (a string). |
-| `type` | Schema type, which determines how to interpret the schema data.
<li>Predefined schema: see [here](schema-understand.md#schema-type).
</li><li>Customized schema: it is left as an empty string. </li> |
+| `type` | Schema type, which determines how to interpret the schema data.
<li>Predefined schema: see [here](#schema-type). </li><li>Customized schema: it
is left as an empty string. </li> |
| `schema`(`payload`) | Schema data, which is a sequence of 8-bit unsigned
bytes and schema-type specific. |
-| `properties` | It is a user defined properties as a string/string map.
Applications can use this bag for carrying any application specific logics.
Possible properties might be the Git hash associated with the schema, an
environment string like `dev` or `prod`. |
+| `properties` | It is a user-defined property as a string/string map.
Applications can use this bag for carrying any application-specific logics.
Possible properties might be the Git hash associated with the schema, and an
environment string like `dev` or `prod`. |
**Example**
@@ -43,74 +41,36 @@ This is the `SchemaInfo` of a string.
## Schema type
Pulsar supports various schema types, which are mainly divided into two
categories:
-
-* Primitive type
-
-* Complex type
+* [Primitive type](#primitive-type)
+* [Complex type](#complex-type)
### Primitive type
-Currently, Pulsar supports the following primitive types:
-
-| Primitive Type | Description |
-|---|---|
-| `BOOLEAN` | A binary value |
-| `INT8` | A 8-bit signed integer |
-| `INT16` | A 16-bit signed integer |
-| `INT32` | A 32-bit signed integer |
-| `INT64` | A 64-bit signed integer |
-| `FLOAT` | A single precision (32-bit) IEEE 754 floating-point number |
-| `DOUBLE` | A double-precision (64-bit) IEEE 754 floating-point number |
-| `BYTES` | A sequence of 8-bit unsigned bytes |
-| `STRING` | A Unicode character sequence |
-| `TIMESTAMP` (`DATE`, `TIME`) | A logic type represents a specific instant
in time with millisecond precision. <br />It stores the number of milliseconds
since `January 1, 1970, 00:00:00 GMT` as an `INT64` value |
-| INSTANT | A single instantaneous point on the time-line with nanoseconds
precision|
-| LOCAL_DATE | An immutable date-time object that represents a date, often
viewed as year-month-day|
-| LOCAL_TIME | An immutable date-time object that represents a time, often
viewed as hour-minute-second. Time is represented to nanosecond precision.|
-| LOCAL_DATE_TIME | An immutable date-time object that represents a date-time,
often viewed as year-month-day-hour-minute-second |
-
-For primitive types, Pulsar does not store any schema data in `SchemaInfo`.
The `type` in `SchemaInfo` is used to determine how to serialize and
deserialize the data.
+The following table outlines the primitive types that Pulsar schema supports,
and the conversions between **schema types** and **language-specific primitive
types**.
+
+| Primitive Type | Description | Java Type| Python Type | Go Type |
+|---|---|---|---|---|
+| `BOOLEAN` | A binary value | boolean | bool | bool |
+| `INT8` | A 8-bit signed integer | int | | int8 |
+| `INT16` | A 16-bit signed integer | int | | int16 |
+| `INT32` | A 32-bit signed integer | int | | int32 |
+| `INT64` | A 64-bit signed integer | int | | int64 |
+| `FLOAT` | A single precision (32-bit) IEEE 754 floating-point number | float
| float | float32 |
+| `DOUBLE` | A double-precision (64-bit) IEEE 754 floating-point number |
double | float | float64|
+| `BYTES` | A sequence of 8-bit unsigned bytes | byte[], ByteBuffer, ByteBuf |
bytes | []byte |
+| `STRING` | A Unicode character sequence | string | str | string|
+| `TIMESTAMP` (`DATE`, `TIME`) | A logic type represents a specific instant
in time with millisecond precision. <br />It stores the number of milliseconds
since `January 1, 1970, 00:00:00 GMT` as an `INT64` value | java.sql.Timestamp
(java.sql.Time, java.util.Date) | | |
+| INSTANT | A single instantaneous point on the time-line with nanoseconds
precision| java.time.Instant | | |
+| LOCAL_DATE | An immutable date-time object that represents a date, often
viewed as year-month-day| java.time.LocalDate | | |
+| LOCAL_TIME | An immutable date-time object that represents a time, often
viewed as hour-minute-second. Time is represented to nanosecond precision.|
java.time.LocalDateTime | |
+| LOCAL_DATE_TIME | An immutable date-time object that represents a date-time,
often viewed as year-month-day-hour-minute-second | java.time.LocalTime | |
+
+For primitive types, Pulsar does not store any schema data in `SchemaInfo`.
The `type` in `SchemaInfo` determines how to serialize and deserialize the
data.
Some of the primitive schema implementations can use `properties` to store
implementation-specific tunable settings. For example, a `string` schema can
use `properties` to store the encoding charset to serialize and deserialize
strings.
-The conversions between **Pulsar schema types** and **language-specific
primitive types** are as below.
-
-| Schema Type | Java Type| Python Type | Go Type |
-|---|---|---|---|
-| BOOLEAN | boolean | bool | bool |
-| INT8 | byte | | int8 |
-| INT16 | short | | int16 |
-| INT32 | int | | int32 |
-| INT64 | long | | int64 |
-| FLOAT | float | float | float32 |
-| DOUBLE | double | float | float64|
-| BYTES | byte[], ByteBuffer, ByteBuf | bytes | []byte |
-| STRING | string | str | string|
-| TIMESTAMP | java.sql.Timestamp | | |
-| TIME | java.sql.Time | | |
-| DATE | java.util.Date | | |
-| INSTANT | java.time.Instant | | |
-| LOCAL_DATE | java.time.LocalDate | | |
-| LOCAL_TIME | java.time.LocalDateTime | |
-| LOCAL_DATE_TIME | java.time.LocalTime | |
-
-**Example**
-
-This example demonstrates how to use a string schema.
+For more instructions, see [Construct a string
schema](schema-get-started.md#construct-a-string-schema).
-1. Create a producer with a string schema and send messages.
-
- ```java
- Producer<String> producer = client.newProducer(Schema.STRING).create();
- producer.newMessage().value("Hello Pulsar!").send();
- ```
-
-2. Create a consumer with a string schema and receive messages.
-
- ```java
- Consumer<String> consumer = client.newConsumer(Schema.STRING).subscribe();
- consumer.receive();
- ```
### Complex type
@@ -118,319 +78,50 @@ Currently, Pulsar supports the following complex types:
| Complex Type | Description |
|---|---|
-| `keyvalue` | Represents a complex type of a key/value pair. |
-| `struct` | Handles structured data. It supports `AvroBaseStructSchema` and
`ProtobufNativeSchema`. |
-
-#### keyvalue
+| `KeyValue` | Represents a complex type of a key/value pair. |
+| `Struct` | Handles structured data. It supports `AvroBaseStructSchema` and
`ProtobufNativeSchema`. |
-`Keyvalue` schema helps applications define schemas for both key and value.
+#### `KeyValue` schema
-For `SchemaInfo` of `keyvalue` schema, Pulsar stores the `SchemaInfo` of key
schema and the `SchemaInfo` of value schema together.
+`KeyValue` schema helps applications define schemas for both key and value.
Pulsar stores the `SchemaInfo` of key schema and the value schema together.
-Pulsar provides the following methods to encode a key/value pair in messages:
+You can choose the encoding type when constructing the key/value schema.:
+* `INLINE` - Key/value pairs are encoded together in the message payload.
+* `SEPARATED` - see [Construct a key/value
schema](schema-get-started.md#construct-a-keyvalue-schema).
-* `INLINE`
-
-* `SEPARATED`
-
-You can choose the encoding type when constructing the key/value schema.
-
-````mdx-code-block
-<Tabs
- defaultValue="INLINE"
-
values={[{"label":"INLINE","value":"INLINE"},{"label":"SEPARATED","value":"SEPARATED"}]}>
+#### `Struct` schema
-<TabItem value="INLINE">
-
-Key/value pairs are encoded together in the message payload.
-
-</TabItem>
-<TabItem value="SEPARATED">
-
-Key is encoded in the message key and the value is encoded in the message
payload.
-
-**Example**
-
-This example shows how to construct a key/value schema and then use it to
produce and consume messages.
-
-1. Construct a key/value schema with `INLINE` encoding type.
-
- ```java
- Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
- Schema.INT32,
- Schema.STRING,
- KeyValueEncodingType.INLINE
- );
- ```
-
-2. Optionally, construct a key/value schema with `SEPARATED` encoding type.
-
- ```java
- Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
- Schema.INT32,
- Schema.STRING,
- KeyValueEncodingType.SEPARATED
- );
- ```
-
-3. Produce messages using a key/value schema.
-
- ```java
- Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
- Schema.INT32,
- Schema.STRING,
- KeyValueEncodingType.SEPARATED
- );
-
- Producer<KeyValue<Integer, String>> producer = client.newProducer(kvSchema)
- .topic(TOPIC)
- .create();
-
- final int key = 100;
- final String value = "value-100";
-
- // send the key/value message
- producer.newMessage()
- .value(new KeyValue(key, value))
- .send();
- ```
-
-4. Consume messages using a key/value schema.
-
- ```java
- Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
- Schema.INT32,
- Schema.STRING,
- KeyValueEncodingType.SEPARATED
- );
-
- Consumer<KeyValue<Integer, String>> consumer = client.newConsumer(kvSchema)
- ...
- .topic(TOPIC)
- .subscriptionName(SubscriptionName).subscribe();
-
- // receive key/value pair
- Message<KeyValue<Integer, String>> msg = consumer.receive();
- KeyValue<Integer, String> kv = msg.getValue();
- ```
-
-</TabItem>
-
-</Tabs>
-````
-
-#### struct
-
-This section describes the details of type and usage of the `struct` schema.
-
-##### Type
-
-`struct` schema supports `AvroBaseStructSchema` and `ProtobufNativeSchema`.
+`Struct` schema supports `AvroBaseStructSchema` and `ProtobufNativeSchema`.
|Type|Description|
---|---|
`AvroBaseStructSchema`|Pulsar uses [Avro
Specification](http://avro.apache.org/docs/current/spec.html) to declare the
schema definition for `AvroBaseStructSchema`, which supports `AvroSchema`,
`JsonSchema`, and `ProtobufSchema`. <br /><br />This allows Pulsar:<br />- to
use the same tools to manage schema definitions<br />- to use different
serialization or deserialization methods to handle data|
`ProtobufNativeSchema`|`ProtobufNativeSchema` is based on protobuf native
Descriptor. <br /><br />This allows Pulsar:<br />- to use native protobuf-v3 to
serialize or deserialize data<br />- to use `AutoConsume` to deserialize data.
-##### Usage
-
-Pulsar provides the following methods to use the `struct` schema:
-
+Pulsar provides the following methods to use the `struct` schema.
* `static`
-
* `generic`
-
* `SchemaDefinition`
-````mdx-code-block
-<Tabs
- defaultValue="static"
-
values={[{"label":"static","value":"static"},{"label":"generic","value":"generic"},{"label":"SchemaDefinition","value":"SchemaDefinition"}]}>
-
-<TabItem value="static">
-
-You can predefine the `struct` schema, which can be a POJO in Java, a `struct`
in Go, or classes generated by Avro or Protobuf tools.
-
-**Example**
-
-Pulsar gets the schema definition from the predefined `struct` using an Avro
library. The schema definition is the schema data stored as a part of the
`SchemaInfo`.
-
-1. Create the _User_ class to define the messages sent to Pulsar topics.
-
- ```java
- @Builder
- @AllArgsConstructor
- @NoArgsConstructor
- public static class User {
- String name;
- int age;
- }
- ```
-
-2. Create a producer with a `struct` schema and send messages.
-
- ```java
- Producer<User> producer =
client.newProducer(Schema.AVRO(User.class)).create();
-
producer.newMessage().value(User.builder().name("pulsar-user").age(1).build()).send();
- ```
-
-3. Create a consumer with a `struct` schema and receive messages
-
- ```java
- Consumer<User> consumer =
client.newConsumer(Schema.AVRO(User.class)).subscribe();
- User user = consumer.receive().getValue();
- ```
-
-</TabItem>
-<TabItem value="generic">
-
-Sometimes applications do not have pre-defined structs, and you can use this
method to define schema and access data.
-
-You can define the `struct` schema using the `GenericSchemaBuilder`, generate
a generic struct using `GenericRecordBuilder` and consume messages into
`GenericRecord`.
-
-**Example**
-
-1. Use `RecordSchemaBuilder` to build a schema.
-
- ```java
- RecordSchemaBuilder recordSchemaBuilder =
SchemaBuilder.record("schemaName");
- recordSchemaBuilder.field("intField").type(SchemaType.INT32);
- SchemaInfo schemaInfo = recordSchemaBuilder.build(SchemaType.AVRO);
-
- Producer<GenericRecord> producer =
client.newProducer(Schema.generic(schemaInfo)).create();
- ```
-
-2. Use `RecordBuilder` to build the struct records.
-
- ```java
- producer.newMessage().value(schema.newRecordBuilder()
- .set("intField", 32)
- .build()).send();
- ```
-
-</TabItem>
-<TabItem value="SchemaDefinition">
-
-You can define the `schemaDefinition` to generate a `struct` schema.
-
-**Example**
-
-1. Create the _User_ class to define the messages sent to Pulsar topics.
-
- ```java
- @Builder
- @AllArgsConstructor
- @NoArgsConstructor
- public static class User {
- String name;
- int age;
- }
- ```
-
-2. Create a producer with a `SchemaDefinition` and send messages.
-
- ```java
- SchemaDefinition<User> schemaDefinition =
SchemaDefinition.<User>builder().withPojo(User.class).build();
- Producer<User> producer =
client.newProducer(Schema.AVRO(schemaDefinition)).create();
-
producer.newMessage().value(User.builder().name("pulsar-user").age(1).build()).send();
- ```
-
-3. Create a consumer with a `SchemaDefinition` schema and receive messages
-
- ```java
- SchemaDefinition<User> schemaDefinition =
SchemaDefinition.<User>builder().withPojo(User.class).build();
- Consumer<User> consumer =
client.newConsumer(Schema.AVRO(schemaDefinition)).subscribe();
- User user = consumer.receive().getValue();
- ```
-
-</TabItem>
-
-</Tabs>
-````
+For more examples, see [Construct a struct
schema](schema-get-started.md#construct-a-struct-schema).
### Auto Schema
If you don't know the schema type of a Pulsar topic in advance, you can use
AUTO schema to produce or consume generic records to or from brokers.
-| Auto Schema Type | Description |
-|---|---|
-| `AUTO_PRODUCE` | This is useful for transferring data **from a producer to a
Pulsar topic that has a schema**. |
-| `AUTO_CONSUME` | This is useful for transferring data **from a Pulsar topic
that has a schema to a consumer**. |
-
-#### AUTO_PRODUCE
-
-`AUTO_PRODUCE` schema helps a producer validate whether the bytes sent by the
producer is compatible with the schema of a topic.
-
-**Example**
-
-Suppose that:
-
-* You have a producer processing messages from a Kafka topic _K_.
-
-* You have a Pulsar topic _P_, and you do not know its schema type.
-
-* Your application reads the messages from _K_ and writes the messages to _P_.
-
-In this case, you can use `AUTO_PRODUCE` to verify whether the bytes produced
by _K_ can be sent to _P_ or not.
-
-```java
-Produce<byte[]> pulsarProducer = client.newProducer(Schema.AUTO_PRODUCE())
- …
- .create();
-
-byte[] kafkaMessageBytes = … ;
-
-pulsarProducer.produce(kafkaMessageBytes);
-```
-
-#### AUTO_CONSUME
-
-`AUTO_CONSUME` schema helps a Pulsar topic validate whether the bytes sent by
a Pulsar topic is compatible with a consumer, that is, the Pulsar topic
deserializes messages into language-specific objects using the `SchemaInfo`
retrieved from broker-side.
-
-Currently, `AUTO_CONSUME` supports AVRO, JSON and ProtobufNativeSchema
schemas. It deserializes messages into `GenericRecord`.
-
-**Example**
-
-Suppose that:
-
-* You have a Pulsar topic _P_.
-
-* You have a consumer (for example, MySQL) receiving messages from the topic
_P_.
-
-* Your application reads the messages from _P_ and writes the messages to
MySQL.
-
-In this case, you can use `AUTO_CONSUME` to verify whether the bytes produced
by _P_ can be sent to MySQL or not.
-
-```java
-Consumer<GenericRecord> pulsarConsumer =
client.newConsumer(Schema.AUTO_CONSUME())
- …
- .subscribe();
-
-Message<GenericRecord> msg = consumer.receive() ;
-GenericRecord record = msg.getValue();
-```
+Auto schema contains two categories:
+* `AUTO_PRODUCE` transfers data from a producer to a Pulsar topic that has a
schema and helps the producer validate whether the out-bound bytes are
compatible with the schema of the topic. For more instructions, see [Construct
an AUTO_PRODUCE schema](schema-get-started.md#construct-an-auto_produce-schema).
+* `AUTO_CONSUME` transfers data from a Pulsar topic that has a schema to a
consumer and helps the topic validate whether the out-bound bytes are
compatible with the consumer. In other words, the topic deserializes messages
into language-specific objects `GenericRecord` using the `SchemaInfo` retrieved
from brokers. Currently, `AUTO_CONSUME` supports AVRO, JSON and
ProtobufNativeSchema schemas. For more instructions, see [Construct an
AUTO_CONSUME schema](schema-get-started.md#construct-a [...]
### Native Avro Schema
When migrating or ingesting event or message data from external systems (such
as Kafka and Cassandra), the events are often already serialized in Avro
format. The applications producing the data typically have validated the data
against their schemas (including compatibility checks) and stored them in a
database or a dedicated service (such as a schema registry). The schema of each
serialized data record is usually retrievable by some metadata attached to that
record. In such cases, a Pu [...]
-Hence, we provide `Schema.NATIVE_AVRO` to wrap a native Avro schema of type
`org.apache.avro.Schema`. The result is a schema instance of Pulsar that
accepts a serialized Avro payload without validating it against the wrapped
Avro schema.
+Hence, we provide `Schema.NATIVE_AVRO` to wrap a native Avro schema of type
`org.apache.avro.Schema`. The result is a schema instance of Pulsar that
accepts a serialized Avro payload without validating it against the wrapped
Avro schema. See for more details.
-**Example**
+## Schema versioning
-```java
-org.apache.avro.Schema nativeAvroSchema = … ;
-
-Producer<byte[]> producer =
pulsarClient.newProducer().topic("ingress").create();
-
-byte[] content = … ;
-
-producer.newMessage(Schema.NATIVE_AVRO(nativeAvroSchema)).value(content).send();
-```
-
-## Schema version
-
-Each `SchemaInfo` stored with a topic has a version. Schema version manages
schema changes happening within a topic.
+Each `SchemaInfo` stored with a topic has a version. The schema version
manages schema changes happening within a topic.
Messages produced with a given `SchemaInfo` is tagged with a schema version,
so when a message is consumed by a Pulsar client, the Pulsar client can use the
schema version to retrieve the corresponding `SchemaInfo` and then use the
`SchemaInfo` to deserialize data.
@@ -462,78 +153,60 @@ The table below lists the possible scenarios when this
connection attempt occurs
| <li>No schema exists for the topic. </li> | (1) The producer is created
using the given schema. (2) Since no existing schema is compatible with the
`SensorReading` schema, the schema is transmitted to the broker and stored. (3)
Any consumer created using the same schema or topic can consume messages from
the `sensor-data` topic. |
| <li>A schema already exists. </li><li>The producer connects using the same
schema that is already stored. </li> | (1) The schema is transmitted to the
broker. (2) The broker determines that the schema is compatible. (3) The broker
attempts to store the schema in
[BookKeeper](concepts-architecture-overview.md#persistent-storage) but then
determines that it's already stored, so it is used to tag produced messages. |
<li>A schema already exists. </li><li>The producer connects using [...]
-## How does schema work
-
-Pulsar schemas are applied and enforced at the **topic** level (schemas cannot
be applied at the namespace or tenant level).
+## Schema AutoUpdate
-Producers and consumers upload schemas to brokers, so Pulsar schemas work on
the producer side and the consumer side.
+If a schema passes the schema compatibility check, Pulsar producer
automatically updates this schema to the topic it produces by default.
-### Producer side
+### AutoUpdate for producer
-This diagram illustrates how does schema work on the Producer side.
+For a producer, the `AutoUpdate` happens in the following cases:
-
+* If a **topic doesn’t have a schema**, Pulsar registers a schema
automatically.
-1. The application uses a schema instance to construct a producer instance.
+* If a **topic has a schema**:
- The schema instance defines the schema for the data being produced using
the producer instance.
+ * If a **producer doesn’t carry a schema**:
- Take AVRO as an example, Pulsar extracts schema definition from the POJO
class and constructs the `SchemaInfo` that the producer needs to pass to a
broker when it connects.
-
-2. The producer connects to the broker with the `SchemaInfo` extracted from
the passed-in schema instance.
-
-3. The broker looks up the schema in the schema storage to check if it is
already a registered schema.
-
-4. If yes, the broker skips the schema validation since it is a known schema,
and returns the schema version to the producer.
-
-5. If no, the broker verifies whether a schema can be automatically created in
this namespace:
+ * If `isSchemaValidationEnforced` or `schemaValidationEnforced` is
**disabled** in the namespace to which the topic belongs, the producer is
allowed to connect to the topic and produce data.
+
+ * If `isSchemaValidationEnforced` or `schemaValidationEnforced` is
**enabled** in the namespace to which the topic belongs, the producer is
rejected and disconnected.
- * If `isAllowAutoUpdateSchema` sets to **true**, then a schema can be
created, and the broker validates the schema based on the schema compatibility
check strategy defined for the topic.
+ * If a **producer carries a schema**:
- * If `isAllowAutoUpdateSchema` sets to **false**, then a schema can not be
created, and the producer is rejected to connect to the broker.
+ A broker performs the compatibility check based on the configured
compatibility check strategy of the namespace to which the topic belongs.
-:::tip
-
-`isAllowAutoUpdateSchema` can be set via **Pulsar admin API** or **REST API.**
-
-For how to set `isAllowAutoUpdateSchema` via Pulsar admin API, see [Manage
AutoUpdate Strategy](schema-manage.md#manage-autoupdate-strategy).
-
-:::
-
-6. If the schema is allowed to be updated, then the compatible strategy check
is performed.
+ * If the schema is registered, a producer is connected to a broker.
- * If the schema is compatible, the broker stores it and returns the schema
version to the producer.
-
- All the messages produced by this producer are tagged with the schema
version.
-
- * If the schema is incompatible, the broker rejects it.
-
-### Consumer side
-
-This diagram illustrates how does Schema work on the consumer side.
-
-
-
-1. The application uses a schema instance to construct a consumer instance.
+ * If the schema is not registered:
+
+ * If `isAllowAutoUpdateSchema` sets to **false**, the producer is
rejected to connect to a broker.
+
+ * If `isAllowAutoUpdateSchema` sets to **true**:
- The schema instance defines the schema that the consumer uses for decoding
messages received from a broker.
+ * If the schema passes the compatibility check, then the broker
registers a new schema automatically for the topic and the producer is
connected.
+
+ * If the schema does not pass the compatibility check, then the
broker does not register a schema and the producer is rejected to connect to a
broker.
-2. The consumer connects to the broker with the `SchemaInfo` extracted from
the passed-in schema instance.
+
-3. The broker determines whether the topic has one of them (a schema/data/a
local consumer and a local producer).
+### AutoUpdate for consumer
-4. If a topic does not have all of them (a schema/data/a local consumer and a
local producer):
-
- * If `isAllowAutoUpdateSchema` sets to **true**, then the consumer
registers a schema and it is connected to a broker.
-
- * If `isAllowAutoUpdateSchema` sets to **false**, then the consumer is
rejected to connect to a broker.
-
-5. If a topic has one of them (a schema/data/a local consumer and a local
producer), then the schema compatibility check is performed.
-
- * If the schema passes the compatibility check, then the consumer is
connected to the broker.
-
- * If the schema does not pass the compatibility check, then the consumer
is rejected to connect to the broker.
+For a consumer, the `AutoUpdate` happens in the following cases:
-6. The consumer receives messages from the broker.
+* If a **consumer connects to a topic without a schema** (which means the
consumer receiving raw bytes), the consumer can connect to the topic
successfully without doing any compatibility check.
- If the schema used by the consumer supports schema versioning (for example,
AVRO schema), the consumer fetches the `SchemaInfo` of the version tagged in
messages and uses the passed-in schema and the schema tagged in messages to
decode the messages.
+* If a **consumer connects to a topic with a schema**.
+
+ * If a topic does not have all of them (a schema/data/a local consumer and a
local producer):
+
+ * If `isAllowAutoUpdateSchema` sets to **true**, then the consumer
registers a schema and it is connected to a broker.
+
+ * If `isAllowAutoUpdateSchema` sets to **false**, then the consumer is
rejected to connect to a broker.
+
+ * If a topic has one of them (a schema/data/a local consumer and a local
producer), then the schema compatibility check is performed.
+
+ * If the schema passes the compatibility check, then the consumer is
connected to the broker.
+
+ * If the schema does not pass the compatibility check, then the consumer
is rejected to connect to the broker.
+
+
\ No newline at end of file
diff --git a/site2/website-next/sidebars.json b/site2/website-next/sidebars.json
index 3d59f5e17ba..2565efedada 100644
--- a/site2/website-next/sidebars.json
+++ b/site2/website-next/sidebars.json
@@ -38,10 +38,10 @@
"type": "category",
"label": "Pulsar Schema",
"items": [
- "schema-get-started",
+ "schema-overview",
"schema-understand",
- "schema-evolution-compatibility",
- "schema-manage"
+ "schema-get-started",
+ "schema-evolution-compatibility"
]
},
{
@@ -324,6 +324,7 @@
"admin-api-topics",
"admin-api-functions",
"admin-api-packages",
+ "admin-api-schemas",
"admin-api-transactions"
]
},