labuladong commented on code in PR #18434:
URL: https://github.com/apache/pulsar/pull/18434#discussion_r1033415078
##########
site2/docs/schema-get-started.md:
##########
@@ -11,80 +11,283 @@ import TabItem from '@theme/TabItem';
````
-This hands-on tutorial provides instructions and examples on how to construct
and customize schemas.
+This hands-on tutorial provides instructions and examples on how to construct
schemas. For the instructions on administrative tasks, see [Manage
schema](admin-api-schemas.md).
-## Construct a string schema
+## Construct a schema
-This example demonstrates how to construct a [string
schema](schema-understand.md#primitive-type) and use it to produce and consume
messages in Java.
+- [Construct a bytes schema](#bytes)
+- [Construct a string schema](#string)
+- [Construct a key/value schema](#keyvalue)
+- [Construct an Avro schema](#avro)
+- [Construct a JSON schema](#json)
+- [Construct a Protobuf schema](#protobuf)
+- [Construct a ProtobufNative schema](#protobufnative)
+- [Construct a native Avro schema](#nativeavro)
+- [Construct an AUTO_PRODUCE schema](#auto_produce)
+- [Construct an AUTO_CONSUME schema](#auto_consume)
-1. Create a producer with a string schema and send messages.
+### `bytes`
+
+This example demonstrates how to construct a [bytes
schema](schema-understand.md#primitive-type) using language-specific clients
and use it to produce and consume messages.
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+ defaultValue="Java"
+
values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"},{"label":"Python","value":"Python"},{"label":"Go","value":"Go"}]}>
+
+<TabItem value="Java">
+
+```java
+ .topic("my-topic")
+ .create();
+Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
Review Comment:
Seems missing some code here
##########
site2/docs/schema-understand.md:
##########
@@ -146,67 +159,88 @@ Producer<SensorReading> producer =
client.newProducer(JSONSchema.of(SensorReadin
.create();
```
-The table below lists the possible scenarios when this connection attempt
occurs and what happens in each scenario:
+The table below outlines the possible scenarios when this connection attempt
occurs and the result of each scenario:
-| Scenario | What happens |
-| --- | --- |
-| <li>No schema exists for the topic. </li> | (1) The producer is created
using the given schema. (2) Since no existing schema is compatible with the
`SensorReading` schema, the schema is transmitted to the broker and stored. (3)
Any consumer created using the same schema or topic can consume messages from
the `sensor-data` topic. |
-| <li>A schema already exists. </li><li>The producer connects using the same
schema that is already stored. </li> | (1) The schema is transmitted to the
broker. (2) The broker determines that the schema is compatible. (3) The broker
attempts to store the schema in
[BookKeeper](concepts-architecture-overview.md#persistent-storage) but then
determines that it's already stored, so it is used to tag produced messages. |
<li>A schema already exists. </li><li>The producer connects using a new
schema that is compatible. </li> | (1) The schema is transmitted to the
broker. (2) The broker determines that the schema is compatible and stores the
new schema as the current version (with a new version number). |
+| Scenario
| Result
|
+|-----------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| <li>No schema exists for the topic. </li>
| (1) The producer is created with the given
schema. <br /> (2) The schema is transmitted to the broker and stored since
there is no existing schema. <br /> (3) Any consumer created using the same
schema or topic can consume messages from the `sensor-data` topic.
|
+| <li>A schema already exists. </li><li>The producer connects using the same
schema that is already stored. </li> | (1) The schema is transmitted to the
broker.<br /> (2) The broker determines that the schema is compatible. <br />
(3) The broker attempts to store the schema in
[BookKeeper](concepts-architecture-overview.md#persistent-storage) but then
determines that it's already stored, so it is used to tag produced messages. |
+| <li>A schema already exists. </li><li>The producer connects using a new
schema that is compatible. </li> | (1) The schema is transmitted to the
broker. <br /> (2) The broker determines that the schema is compatible and
stores the new schema as the current version (with a new version number).
|
-## Schema AutoUpdate
+### Schema compatibility check
-If a schema passes the schema compatibility check, Pulsar producer
automatically updates this schema to the topic it produces by default.
+The purpose of schema compatibility check is to ensure that existing consumers
can process the introduced messages.
-### AutoUpdate for producer
+When receiving a `SchemaInfo` from producers, brokers recognize the schema
type and deploy the schema compatibility checker
([`schemaRegistryCompatibilityCheckers`](https://github.com/apache/pulsar/blob/bf194b557c48e2d3246e44f1fc28876932d8ecb8/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java))
for that schema type to check if the `SchemaInfo` is compatible with the
schema of the topic by applying the configured compatibility check strategy.
-For a producer, the `AutoUpdate` happens in the following cases:
+The default value of `schemaRegistryCompatibilityCheckers` in the
`conf/broker.conf` file is as follows.
+
+```properties
+schemaRegistryCompatibilityCheckers=org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck
+```
-* If a **topic doesn’t have a schema**, Pulsar registers a schema
automatically.
+Each schema type corresponds to one instance of the schema compatibility
checker. Avro, JSON, and Protobuf schemas have their own compatibility
checkers, while all the other schema types share the default compatibility
checker that disables the schema evolution.
-* If a **topic has a schema**:
+#### Schema compatibility check strategy
- * If a **producer doesn’t carry a schema**:
+Suppose that you have a topic containing three schemas (V1, V2, and V3). V1 is
the oldest and V3 is the latest. The following table outlines 8 schema
compatibility strategies and how it works.
- * If `isSchemaValidationEnforced` or `schemaValidationEnforced` is
**disabled** in the namespace to which the topic belongs, the producer is
allowed to connect to the topic and produce data.
-
- * If `isSchemaValidationEnforced` or `schemaValidationEnforced` is
**enabled** in the namespace to which the topic belongs, the producer is
rejected and disconnected.
+| Compatibility check strategy | Definition | Changes allowed |
Check against which schema |
+| --- | --- | --- | --- |
+| `ALWAYS_COMPATIBLE` | Disable schema compatibility check. | All
changes are allowed | All previous versions |
+| `ALWAYS_INCOMPATIBLE` | Disable schema evolution, that is, any schema
change is rejected. | No change is allowed | N/A |
+| `BACKWARD` | Consumers using schema V3 can process data written by
producers using the **last schema version** V2. | <li>Add optional fields
</li><li>Delete fields </li> | Latest version |
+| `BACKWARD_TRANSITIVE` | Consumers using schema V3 can process data
written by producers using **all previous schema versions** V2 and V1. |
<li>Add optional fields </li><li>Delete fields </li> | All previous versions
|
+| `FORWARD` | Consumers using the **last schema version** V2 can process
data written by producers using a new schema V3, even though they may not be
able to use the full capabilities of the new schema. | <li>Add fields
</li><li>Delete optional fields </li> | Latest version |
+| `FORWARD_TRANSITIVE` | Consumers using **all previous schema versions**
V2 or V1 can process data written by producers using a new schema V3. |
<li>Add fields </li><li>Delete optional fields </li> | All previous versions
|
+| `FULL` | Schemas are both backward and forward compatible. <li>Consumers
using the last schema V2 can process data written by producers using the new
schema V3. </li><li>Consumers using the new schema V3 can process data written
by producers using the last schema V2.</li> | <li>Modify optional fields
</li> | Latest version |
+| `FULL_TRANSITIVE` | Backward and forward compatible among schema V3, V2,
and V1. <li>Consumers using the schema V3 can process data written by producers
using schema V2 and V1. </li><li>Consumers using the schema V2 or V1 can
process data written by producers using the schema V3.</li> | <li>Modify
optional fields </li> | All previous versions |
- * If a **producer carries a schema**:
-
- A broker performs the compatibility check based on the configured
compatibility check strategy of the namespace to which the topic belongs.
-
- * If the schema is registered, a producer is connected to a broker.
-
- * If the schema is not registered:
-
- * If `isAllowAutoUpdateSchema` sets to **false**, the producer is
rejected to connect to a broker.
+:::tip
+
+* The default schema compatibility check strategy varies depending on schema
types.
+ * For Avro and JSON, the default one is `FULL`.
+ * For others, the default one is `ALWAYS_INCOMPATIBLE`.
+* For more instructions, see [Admin
API](admin-api-schemas.md#set-schema-compatibility-check-strategy).
+
+:::
+
+### Schema AutoUpdate
+
+By default, schema `AutoUpdate` is enabled. When a schema passes the schema
compatibility check, the producer automatically updates this schema to the
topic it produces.
+
+#### Producer side
+
+For a producer, the `AutoUpdate` happens in the following cases:
+
+* If a **topic doesn’t have a schema** (meaning the data is in raw bytes),
Pulsar registers the schema automatically.
+
+* If a **topic has a schema** and the **producer doesn’t carry any schema**
(meaning it produces raw bytes):
+
+ * If [schema validation](#schema-validation) is **disabled**
(`schemaValidationEnforced`=`false`) in the namespace that the topic belongs
to, the producer is allowed to connect to the topic and produce data.
- * If `isAllowAutoUpdateSchema` sets to **true**:
-
- * If the schema passes the compatibility check, then the broker
registers a new schema automatically for the topic and the producer is
connected.
-
- * If the schema does not pass the compatibility check, then the
broker does not register a schema and the producer is rejected to connect to a
broker.
+ * Otherwise, the producer is rejected.
-
+ * If a **topic has a schema** and the **producer carries a schema**, see
[How schema works on producer side](schema-overview.md#producer-side) for more
information.
-### AutoUpdate for consumer
+#### Consumer side
For a consumer, the `AutoUpdate` happens in the following cases:
-* If a **consumer connects to a topic without a schema** (which means the
consumer receiving raw bytes), the consumer can connect to the topic
successfully without doing any compatibility check.
+* If a consumer connects to a topic **without a schema** (meaning it consumes
raw bytes), the consumer can connect to the topic successfully without doing
any compatibility check.
-* If a **consumer connects to a topic with a schema**.
+* If a consumer connects to a topic **with a schema**, see [How schema works
on consumer side](schema-overview.md#consumer-side) for more information.
- * If a topic does not have all of them (a schema/data/a local consumer and a
local producer):
-
- * If `isAllowAutoUpdateSchema` sets to **true**, then the consumer
registers a schema and it is connected to a broker.
-
- * If `isAllowAutoUpdateSchema` sets to **false**, then the consumer is
rejected to connect to a broker.
-
- * If a topic has one of them (a schema/data/a local consumer and a local
producer), then the schema compatibility check is performed.
-
- * If the schema passes the compatibility check, then the consumer is
connected to the broker.
-
- * If the schema does not pass the compatibility check, then the consumer
is rejected to connect to the broker.
-
-
\ No newline at end of file
+### Order of upgrading clients
+
+To adapt to schema evolution and auto-update, you need to upgrade your client
applications accordingly. The upgrade order may vary depending on the
configured [schema compatibility check
strategy](#schema-compatibility-check-strategy).
+
+The following table outlines the mapping between the schema compatibility
check strategy and the upgrade order of clients.
+
+| Compatibility check strategy | Upgrade order | Description
|
+| --- | ---
|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `ALWAYS_COMPATIBLE` | Any order | The compatibility check is disabled.
Consequently, you can upgrade the producers and consumers in **any order**.
|
+| `ALWAYS_INCOMPATIBLE` | N/A | The schema evolution is disabled.
|
+| <li>`BACKWARD` </li><li>`BACKWARD_TRANSITIVE` </li> | Consumer first |
There is no guarantee that consumers using the old schema can read data
produced using the new schema. Consequently, **upgrade all consumers first**,
and then start producing new data.
|
+| <li>`FORWARD` </li><li>`FORWARD_TRANSITIVE` </li> | Producer first |
There is no guarantee that consumers using the new schema can read data
produced using the old schema. Consequently, **upgrade all producers
first**<li>to use the new schema and ensure that the data already produced
using the old schemas are not available to consumers, and then upgrades the
consumers. </li> |
Review Comment:
```suggestion
| <li>`FORWARD` </li><li>`FORWARD_TRANSITIVE` </li> | Producer first |
There is no guarantee that consumers using the new schema can read data
produced using the old schema. Consequently, **upgrade all producers first** to
use the new schema and ensure that the data already produced using the old
schemas are not available to consumers, and then upgrades the consumers. |
```
there is a redundant list tag
##########
site2/docs/schema-get-started.md:
##########
@@ -11,80 +11,283 @@ import TabItem from '@theme/TabItem';
````
-This hands-on tutorial provides instructions and examples on how to construct
and customize schemas.
+This hands-on tutorial provides instructions and examples on how to construct
schemas. For the instructions on administrative tasks, see [Manage
schema](admin-api-schemas.md).
-## Construct a string schema
+## Construct a schema
-This example demonstrates how to construct a [string
schema](schema-understand.md#primitive-type) and use it to produce and consume
messages in Java.
+- [Construct a bytes schema](#bytes)
+- [Construct a string schema](#string)
+- [Construct a key/value schema](#keyvalue)
+- [Construct an Avro schema](#avro)
+- [Construct a JSON schema](#json)
+- [Construct a Protobuf schema](#protobuf)
+- [Construct a ProtobufNative schema](#protobufnative)
+- [Construct a native Avro schema](#nativeavro)
+- [Construct an AUTO_PRODUCE schema](#auto_produce)
+- [Construct an AUTO_CONSUME schema](#auto_consume)
-1. Create a producer with a string schema and send messages.
+### `bytes`
+
+This example demonstrates how to construct a [bytes
schema](schema-understand.md#primitive-type) using language-specific clients
and use it to produce and consume messages.
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+ defaultValue="Java"
+
values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"},{"label":"Python","value":"Python"},{"label":"Go","value":"Go"}]}>
+
+<TabItem value="Java">
+
+```java
+ .topic("my-topic")
+ .create();
+Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+ .topic("my-topic")
+ .subscriptionName("my-sub")
+ .subscribe();
+
+producer.newMessage().value("message".getBytes()).send();
+
+Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+```
+
+</TabItem>
+<TabItem value="C++">
+
+```cpp
+SchemaInfo schemaInfo = SchemaInfo(SchemaType::BYTES, "Bytes", "");
+Producer producer;
+client.createProducer("topic-bytes",
ProducerConfiguration().setSchema(schemaInfo), producer);
+std::array<char, 1024> buffer;
+producer.send(MessageBuilder().setContent(buffer.data(),
buffer.size()).build());
+Consumer consumer;
+res = client.subscribe("topic-bytes", "my-sub",
ConsumerConfiguration().setSchema(schemaInfo), consumer);
+Message msg;
+consumer.receive(msg, 3000);
+```
+
+</TabItem>
+<TabItem value="Python">
+
+```python
+producer = client.create_producer(
+'bytes-schema-topic',
+schema=BytesSchema())
+producer.send(b"Hello")
+
+consumer = client.subscribe(
+'bytes-schema-topic',
+ 'sub',
+ schema=BytesSchema())
+msg = consumer.receive()
+data = msg.value()
+```
Review Comment:
Indent style can be improved here.
##########
site2/docs/schema-get-started.md:
##########
@@ -11,80 +11,283 @@ import TabItem from '@theme/TabItem';
````
-This hands-on tutorial provides instructions and examples on how to construct
and customize schemas.
+This hands-on tutorial provides instructions and examples on how to construct
schemas. For the instructions on administrative tasks, see [Manage
schema](admin-api-schemas.md).
-## Construct a string schema
+## Construct a schema
-This example demonstrates how to construct a [string
schema](schema-understand.md#primitive-type) and use it to produce and consume
messages in Java.
+- [Construct a bytes schema](#bytes)
+- [Construct a string schema](#string)
+- [Construct a key/value schema](#keyvalue)
+- [Construct an Avro schema](#avro)
+- [Construct a JSON schema](#json)
+- [Construct a Protobuf schema](#protobuf)
+- [Construct a ProtobufNative schema](#protobufnative)
+- [Construct a native Avro schema](#nativeavro)
+- [Construct an AUTO_PRODUCE schema](#auto_produce)
+- [Construct an AUTO_CONSUME schema](#auto_consume)
-1. Create a producer with a string schema and send messages.
+### `bytes`
+
+This example demonstrates how to construct a [bytes
schema](schema-understand.md#primitive-type) using language-specific clients
and use it to produce and consume messages.
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+ defaultValue="Java"
+
values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"},{"label":"Python","value":"Python"},{"label":"Go","value":"Go"}]}>
+
+<TabItem value="Java">
+
+```java
+ .topic("my-topic")
+ .create();
+Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+ .topic("my-topic")
+ .subscriptionName("my-sub")
+ .subscribe();
+
+producer.newMessage().value("message".getBytes()).send();
+
+Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+```
+
+</TabItem>
+<TabItem value="C++">
+
+```cpp
+SchemaInfo schemaInfo = SchemaInfo(SchemaType::BYTES, "Bytes", "");
+Producer producer;
+client.createProducer("topic-bytes",
ProducerConfiguration().setSchema(schemaInfo), producer);
+std::array<char, 1024> buffer;
+producer.send(MessageBuilder().setContent(buffer.data(),
buffer.size()).build());
+Consumer consumer;
+res = client.subscribe("topic-bytes", "my-sub",
ConsumerConfiguration().setSchema(schemaInfo), consumer);
+Message msg;
+consumer.receive(msg, 3000);
+```
+
+</TabItem>
+<TabItem value="Python">
+
+```python
+producer = client.create_producer(
+'bytes-schema-topic',
+schema=BytesSchema())
+producer.send(b"Hello")
+
+consumer = client.subscribe(
+'bytes-schema-topic',
+ 'sub',
+ schema=BytesSchema())
+msg = consumer.receive()
+data = msg.value()
+```
+
+</TabItem>
+<TabItem value="Go">
+
+```go
+producer, err := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: "my-topic",
+ Schema: pulsar.NewBytesSchema(nil),
+})
+id, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
+ Value: []byte("message"),
+})
+
+consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: "my-topic",
+ Schema: pulsar.NewBytesSchema(nil),
+ SubscriptionName: "my-sub",
+ Type: pulsar.Exclusive,
+})
+```
+
+</TabItem>
+</Tabs>
+````
+
+### `string`
+
+This example demonstrates how to construct a [string
schema](schema-understand.md#primitive-type) using language-specific clients
and use it to produce and consume messages.
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+ defaultValue="Java"
+
values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"},{"label":"Python","value":"Python"},{"label":"Go","value":"Go"}]}>
+
+<TabItem value="Java">
```java
Producer<String> producer = client.newProducer(Schema.STRING).create();
producer.newMessage().value("Hello Pulsar!").send();
- ```
-2. Create a consumer with a string schema and receive messages.
-
- ```java
Consumer<String> consumer = client.newConsumer(Schema.STRING).subscribe();
- consumer.receive();
+ Message<String> message = consumer.receive();
```
-## Construct a key/value schema
+</TabItem>
+<TabItem value="C++">
+
+```cpp
+SchemaInfo schemaInfo = SchemaInfo(SchemaType::STRING, "String", "");
+Producer producer;
+client.createProducer("topic-string",
ProducerConfiguration().setSchema(schemaInfo), producer);
+producer.send(MessageBuilder().setContent("message").build());
+
+Consumer consumer;
+client.subscribe("topic-string", "my-sub",
ConsumerConfiguration().setSchema(schemaInfo), consumer);
+Message msg;
+consumer.receive(msg, 3000);
+```
-This example shows how to construct a [key/value
schema](schema-understand.md#keyvalue-schema) and use it to produce and consume
messages in Java.
+</TabItem>
+<TabItem value="Python">
+
+```python
+producer = client.create_producer(
+ 'string-schema-topic',
+ schema=StringSchema())
+producer.send("Hello")
+
+consumer = client.subscribe(
+ 'string-schema-topic',
+ 'sub',
+ schema=StringSchema())
+msg = consumer.receive()
+str = msg.value()
+```
+
+</TabItem>
+<TabItem value="Go">
+
+```go
+producer, err := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: "my-topic",
+ Schema: pulsar.NewStringSchema(nil),
+})
+id, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
+ Value: "message",
+})
+
+consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: "my-topic",
+ Schema: pulsar.NewStringSchema(nil),
+ SubscriptionName: "my-sub",
+ Type: pulsar.Exclusive,
+})
+msg, err := consumer.Receive(context.Background())
Review Comment:
Same suggestion as above.
##########
site2/docs/schema-get-started.md:
##########
@@ -178,10 +396,10 @@ You can define the `schemaDefinition` to generate a
`struct` schema.
```java
SchemaDefinition<User> schemaDefinition =
SchemaDefinition.<User>builder().withPojo(User.class).build();
Producer<User> producer =
client.newProducer(Schema.AVRO(schemaDefinition)).create();
-
producer.newMessage().value(User.builder().name("pulsar-user").age(1).build()).send();
+ producer.newMessage().value(new User (“pulsar-user”, 1)).send();
Review Comment:
```suggestion
producer.newMessage().value(new User("pulsar-user", 1)).send();
```
Full-width quotes would be only used in Chinese.
##########
site2/docs/schema-get-started.md:
##########
@@ -164,12 +380,14 @@ You can define the `schemaDefinition` to generate a
`struct` schema.
1. Create the _User_ class to define the messages sent to Pulsar topics.
```java
- @Builder
- @AllArgsConstructor
- @NoArgsConstructor
public static class User {
- String name;
- int age;
+ public String name;
+ public int age;
+ public User(String name, int age) {
+ this.name = name;
+ this.age = age
Review Comment:
Suggest all indents to be the same (4 spaces).
##########
site2/docs/schema-get-started.md:
##########
@@ -228,88 +452,349 @@ Producer<SensorReading> producer =
client.newProducer(AvroSchema.of(SensorReadin
.create();
```
-### Avro-based schema using Java
-
-The following schema formats are currently available for Java:
+</TabItem>
+<TabItem value="C++">
-* No schema or the byte array schema (which can be applied using
`Schema.BYTES`):
+ ```cpp
+ // Send messages
+ static const std::string exampleSchema =
+ "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
+
"\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
+ Producer producer;
+ ProducerConfiguration producerConf;
+ producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
+ client.createProducer("topic-avro", producerConf, producer);
- ```java
- Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES)
- .topic("some-raw-bytes-topic")
- .create();
+ // Receive messages
+ static const std::string exampleSchema =
+ "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
+
"\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
+ ConsumerConfiguration consumerConf;
+ Consumer consumer;
+ consumerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
+ client.subscribe("topic-avro", "sub-2", consumerConf, consumer)
```
- Or, equivalently:
+</TabItem>
+<TabItem value="Python">
- ```java
- Producer<byte[]> bytesProducer = client.newProducer()
- .topic("some-raw-bytes-topic")
- .create();
- ```
+You can declare an `AvroSchema` using Python through one of the following
methods.
-* `String` for normal UTF-8-encoded string data. Apply the schema using
`Schema.STRING`:
+**Method 1: Record**
- ```java
- Producer<String> stringProducer = client.newProducer(Schema.STRING)
- .topic("some-string-topic")
- .create();
- ```
+Declare an `AvroSchema` by passing a class that inherits from
`pulsar.schema.Record` and defines the fields as class variables.
-* Create JSON schemas for POJOs using `Schema.JSON`. The following is an
example.
+```python
+class Example(Record):
+ a = Integer()
+ b = Integer()
- ```java
- Producer<MyPojo> pojoProducer = client.newProducer(Schema.JSON(MyPojo.class))
- .topic("some-pojo-topic")
- .create();
- ```
+producer = client.create_producer(
+'avro-schema-topic',
+schema=AvroSchema(Example))
+r = Example(a=1, b=2)
+producer.send(r)
-* Generate Protobuf schemas using `Schema.PROTOBUF`. The following example
shows how to create the Protobuf schema and use it to instantiate a new
producer:
+consumer = client.subscribe(
+'avro-schema-topic',
+ 'sub',
+ schema=AvroSchema(Example))
+msg = consumer.receive()
+e = msg.value()
Review Comment:
Same indent style problem as above.
##########
site2/docs/schema-get-started.md:
##########
@@ -11,80 +11,283 @@ import TabItem from '@theme/TabItem';
````
-This hands-on tutorial provides instructions and examples on how to construct
and customize schemas.
+This hands-on tutorial provides instructions and examples on how to construct
schemas. For the instructions on administrative tasks, see [Manage
schema](admin-api-schemas.md).
-## Construct a string schema
+## Construct a schema
-This example demonstrates how to construct a [string
schema](schema-understand.md#primitive-type) and use it to produce and consume
messages in Java.
+- [Construct a bytes schema](#bytes)
+- [Construct a string schema](#string)
+- [Construct a key/value schema](#keyvalue)
+- [Construct an Avro schema](#avro)
+- [Construct a JSON schema](#json)
+- [Construct a Protobuf schema](#protobuf)
+- [Construct a ProtobufNative schema](#protobufnative)
+- [Construct a native Avro schema](#nativeavro)
+- [Construct an AUTO_PRODUCE schema](#auto_produce)
+- [Construct an AUTO_CONSUME schema](#auto_consume)
-1. Create a producer with a string schema and send messages.
+### `bytes`
+
+This example demonstrates how to construct a [bytes
schema](schema-understand.md#primitive-type) using language-specific clients
and use it to produce and consume messages.
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+ defaultValue="Java"
+
values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"},{"label":"Python","value":"Python"},{"label":"Go","value":"Go"}]}>
+
+<TabItem value="Java">
+
+```java
+ .topic("my-topic")
+ .create();
+Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+ .topic("my-topic")
+ .subscriptionName("my-sub")
+ .subscribe();
+
+producer.newMessage().value("message".getBytes()).send();
+
+Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+```
+
+</TabItem>
+<TabItem value="C++">
+
+```cpp
+SchemaInfo schemaInfo = SchemaInfo(SchemaType::BYTES, "Bytes", "");
+Producer producer;
+client.createProducer("topic-bytes",
ProducerConfiguration().setSchema(schemaInfo), producer);
+std::array<char, 1024> buffer;
+producer.send(MessageBuilder().setContent(buffer.data(),
buffer.size()).build());
+Consumer consumer;
+res = client.subscribe("topic-bytes", "my-sub",
ConsumerConfiguration().setSchema(schemaInfo), consumer);
+Message msg;
+consumer.receive(msg, 3000);
+```
+
+</TabItem>
+<TabItem value="Python">
+
+```python
+producer = client.create_producer(
+'bytes-schema-topic',
+schema=BytesSchema())
+producer.send(b"Hello")
+
+consumer = client.subscribe(
+'bytes-schema-topic',
+ 'sub',
+ schema=BytesSchema())
+msg = consumer.receive()
+data = msg.value()
+```
+
+</TabItem>
+<TabItem value="Go">
+
+```go
+producer, err := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: "my-topic",
+ Schema: pulsar.NewBytesSchema(nil),
+})
+id, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
+ Value: []byte("message"),
+})
+
+consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: "my-topic",
+ Schema: pulsar.NewBytesSchema(nil),
+ SubscriptionName: "my-sub",
+ Type: pulsar.Exclusive,
+})
+```
Review Comment:
Suggest 4 space or a tab as indent for go code.
##########
site2/docs/schema-get-started.md:
##########
@@ -228,88 +452,349 @@ Producer<SensorReading> producer =
client.newProducer(AvroSchema.of(SensorReadin
.create();
```
-### Avro-based schema using Java
-
-The following schema formats are currently available for Java:
+</TabItem>
+<TabItem value="C++">
-* No schema or the byte array schema (which can be applied using
`Schema.BYTES`):
+ ```cpp
+ // Send messages
+ static const std::string exampleSchema =
+ "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
+
"\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
+ Producer producer;
+ ProducerConfiguration producerConf;
+ producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
+ client.createProducer("topic-avro", producerConf, producer);
- ```java
- Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES)
- .topic("some-raw-bytes-topic")
- .create();
+ // Receive messages
+ static const std::string exampleSchema =
+ "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
+
"\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
+ ConsumerConfiguration consumerConf;
+ Consumer consumer;
+ consumerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
+ client.subscribe("topic-avro", "sub-2", consumerConf, consumer)
```
- Or, equivalently:
+</TabItem>
+<TabItem value="Python">
- ```java
- Producer<byte[]> bytesProducer = client.newProducer()
- .topic("some-raw-bytes-topic")
- .create();
- ```
+You can declare an `AvroSchema` using Python through one of the following
methods.
-* `String` for normal UTF-8-encoded string data. Apply the schema using
`Schema.STRING`:
+**Method 1: Record**
- ```java
- Producer<String> stringProducer = client.newProducer(Schema.STRING)
- .topic("some-string-topic")
- .create();
- ```
+Declare an `AvroSchema` by passing a class that inherits from
`pulsar.schema.Record` and defines the fields as class variables.
-* Create JSON schemas for POJOs using `Schema.JSON`. The following is an
example.
+```python
+class Example(Record):
+ a = Integer()
+ b = Integer()
- ```java
- Producer<MyPojo> pojoProducer = client.newProducer(Schema.JSON(MyPojo.class))
- .topic("some-pojo-topic")
- .create();
- ```
+producer = client.create_producer(
+'avro-schema-topic',
+schema=AvroSchema(Example))
+r = Example(a=1, b=2)
+producer.send(r)
-* Generate Protobuf schemas using `Schema.PROTOBUF`. The following example
shows how to create the Protobuf schema and use it to instantiate a new
producer:
+consumer = client.subscribe(
+'avro-schema-topic',
+ 'sub',
+ schema=AvroSchema(Example))
+msg = consumer.receive()
+e = msg.value()
+```
- ```java
- Producer<MyProtobuf> protobufProducer =
client.newProducer(Schema.PROTOBUF(MyProtobuf.class))
- .topic("some-protobuf-topic")
- .create();
- ```
+**Method 2: JSON definition**
+
+1. Declare an `AvroSchema` using JSON. In this case, Avro schemas are defined
using JSON.
+
+ Below is an example of `AvroSchema` defined using a JSON file
(`company.avsc`).
+
+ ```json
+ {
+ "doc": "this is doc",
+ "namespace": "example.avro",
+ "type": "record",
+ "name": "Company",
+ "fields": [
+ {"name": "name", "type": ["null", "string"]},
+ {"name": "address", "type": ["null", "string"]},
+ {"name": "employees", "type": ["null", {"type": "array", "items": {
+ "type": "record",
+ "name": "Employee",
+ "fields": [
+ {"name": "name", "type": ["null", "string"]},
+ {"name": "age", "type": ["null", "int"]}
+ ]
+ }}]},
+ {"name": "labels", "type": ["null", {"type": "map", "values":
"string"}]}
+ ]
+ }
+ ```
-* Define Avro schemas with `Schema.AVRO`. The following code snippet
demonstrates how to create and use Avro schema.
+2. Load a schema definition from a file by using
[`avro.schema`](https://avro.apache.org/docs/current/getting-started-python/)
or
[`fastavro.schema`](https://fastavro.readthedocs.io/en/latest/schema.html#fastavro._schema_py.load_schema).
+
+ If you use the [JSON definition](#method-2-json-definition) method to
declare an `AvroSchema`, you need to:
+ - Use [Python dict](https://developers.google.com/edu/python/dict-files) to
produce and consume messages, which is different from using the
[Record](#method-1-record) method.
+ - Set the value of the `_record_cls` parameter to `None` when generating an
`AvroSchema` object.
+
+ **Example**
+
+ ```python
+ from fastavro.schema import load_schema
+ from pulsar.schema import *
+ schema_definition = load_schema("examples/company.avsc")
+ avro_schema = AvroSchema(None, schema_definition=schema_definition)
+ producer = client.create_producer(
+ topic=topic,
+ schema=avro_schema)
+ consumer = client.subscribe(topic, 'test', schema=avro_schema)
+ company = {
+ "name": "company-name" + str(i),
+ "address": 'xxx road xxx street ' + str(i),
+ "employees": [
+ {"name": "user" + str(i), "age": 20 + i},
+ {"name": "user" + str(i), "age": 30 + i},
+ {"name": "user" + str(i), "age": 35 + i},
+ ],
+ "labels": {
+ "industry": "software" + str(i),
+ "scale": ">100",
+ "funds": "1000000.0"
+ }
+ }
+ producer.send(company)
+ msg = consumer.receive()
+ # Users could get a dict object by `value()` method.
+ msg.value()
+ ```
- ```java
- Producer<MyAvro> avroProducer = client.newProducer(Schema.AVRO(MyAvro.class))
- .topic("some-avro-topic")
- .create();
- ```
+</TabItem>
+<TabItem value="Go">
+Suppose you have an `avroExampleStruct` class as follows, and you'd like to
transmit it over a Pulsar topic.
-### Avro schema using C++
+```go
+ type avroExampleStruct struct {
+ ID int
+ Name string
+}
+```
Review Comment:
Same indent style problem as above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]