calindima opened a new issue, #1296:
URL: https://github.com/apache/pulsar-client-go/issues/1296
#### Expected behavior
Setup:
- topic has a schema uploaded to the registry
- schemaEnforced is set to true
- allowAutoUpdates is set to false
- producer is created successfully with a JSON schema that follows the
definition from the registry
Expected:
- producer can only publish messages that are compliant with the schema
#### Actual behavior
The producer created with a schema can publish non-compliant messages. The
payload is not validated against the schema.
#### Steps to reproduce
The first 3 steps from setup can be done through the admin API:
```bash
curl --request POST \
--url $BASE_URL/admin/v2/schemas/$TENANT/$NAMESPACE/$TOPIC/schema \
--header 'Content-Type: application/json' \
--data '{
"type": "JSON",
"schema": "{\n \"type\": \"record\",\n \"name\": \"SchemaTest\",\n
\"fields\": [\n {\n \"name\": \"userName\",\n \"type\":
\"string\"\n },\n {\n \"name\": \"userAge\",\n \"type\":
\"int\"\n }\n ]\n}",
"properties": {}
}'
```
```bash
curl --request POST \
--url
$BASE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/schemaValidationEnforced
\
--header 'Content-Type: application/json' \
--data true
```
```bash
curl --request POST \
--url
$BASE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/isAllowAutoUpdateSchema \
--header 'Content-Type: application/json' \
--data false
```
The code:
```go
package main
import (
"context"
"fmt"
"log/slog"
"os"
"time"
"github.com/apache/pulsar-client-go/pulsar"
pulsarauth "github.com/apache/pulsar-client-go/pulsar/auth"
"github.com/google/uuid"
)
type SchemaTest struct {
UserName string `json:"userName" avro:"userName"`
UserAge int `json:"userAge" avro:"userAge"`
}
type WrongSchema struct {
NotUserName string `json:"notUserName" avro:"notUserName"`
NotUserAge int `json:"notUserAge" avro:"notUserAge"`
}
func main() {
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, nil)))
// Load settings here or hard-code them
token := "insert-your-token-here"
authProvider := pulsarauth.NewAuthenticationToken(token)
pulsarClient, err := client.CreatePulsarClient(settings.Url,
authProvider)
if err != nil {
slog.Error("Error creating the pulsar client", err)
}
defer pulsarClient.Close()
producerName := fmt.Sprintf("SchemaProducer-%s", uuid.NewString()[0:6])
schemaDef := `{
"type": "record",
"name": "SchemaTest",
"fields": [
{
"name": "userName",
"type": "string"
},
{
"name": "userAge",
"type": "int"
}
]
}`
schemaExample := pulsar.NewJSONSchema(schemaDef, nil)
producer, err := pulsarClient.CreateProducer(pulsar.ProducerOptions{
Name: producerName,
Topic: settings.TopicAddress,
Schema: schemaExample,
})
if err != nil {
slog.Error("Error creating the pulsar producer", "error", err)
}
defer producer.Close()
for {
//goodMsg := &SchemaTest{
// UserName: "JohnDoe",
// UserAge: 30,
//}
badMsg := &WrongSchema{
NotUserName: "JohnDoe",
NotUserAge: 30,
}
msgId, sendErr := producer.Send(context.Background(),
&pulsar.ProducerMessage{
//Value: goodMsg,
Value: badMsg,
//Schema: schemaExample,
})
if sendErr != nil {
slog.Error("Error sending message", "error", sendErr)
break
}
//slog.Info("Published message: ", slog.String("messageId",
msgId.String()), slog.Any("message", goodMsg))
slog.Info("Published message: ", slog.String("messageId",
msgId.String()), slog.Any("message", badMsg))
time.Sleep(time.Second * 1)
}
}
```
The schema from the registry (http response dump):
```json
{
"version": 0,
"type": "JSON",
"timestamp": 1728915217796,
"data": "{\n \"type\": \"record\",\n \"name\": \"SchemaTest\",\n
\"fields\": [\n {\n \"name\": \"userName\",\n \"type\":
\"string\"\n },\n {\n \"name\": \"userAge\",\n \"type\":
\"int\"\n }\n ]\n}",
"properties": {}
}
```
#### System configuration
**Pulsar version**: 3.0.6.8
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]