calindima opened a new issue, #1296:
URL: https://github.com/apache/pulsar-client-go/issues/1296

   #### Expected behavior
   
   Setup:
   
   - topic has a schema uploaded to the registry
   - schemaEnforced is set to true
   - allowAutoUpdates is set to false
   - producer is created successfully with a JSON schema that follows the 
definition from the registry
   
   Expected:
   
   - producer can only publish messages that are compliant with the schema
   
   #### Actual behavior
   
   The producer created with a schema can publish non-compliant messages. The 
payload is not validated against the schema.
   
   #### Steps to reproduce
   
   The first 3 steps from setup can be done through the admin API:
   
   ```bash
   curl --request POST \
     --url $BASE_URL/admin/v2/schemas/$TENANT/$NAMESPACE/$TOPIC/schema \
     --header 'Content-Type: application/json' \
     --data '{
           "type": "JSON",
           "schema": "{\n  \"type\": \"record\",\n  \"name\": \"SchemaTest\",\n 
 \"fields\": [\n    {\n      \"name\": \"userName\",\n      \"type\": 
\"string\"\n    },\n    {\n      \"name\": \"userAge\",\n      \"type\": 
\"int\"\n    }\n  ]\n}",
           "properties": {}
       }'
   ```
   
   ```bash
   curl --request POST \
     --url 
$BASE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/schemaValidationEnforced
 \
     --header 'Content-Type: application/json' \
     --data true
   ```
   
   ```bash
   curl --request POST \
     --url 
$BASE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/isAllowAutoUpdateSchema \
     --header 'Content-Type: application/json' \
     --data false
   ```
   
   
   The code:
   
   ```go
   package main
   
   import (
        "context"
        "fmt"
        "log/slog"
        "os"
        "time"
   
        "github.com/apache/pulsar-client-go/pulsar"
        pulsarauth "github.com/apache/pulsar-client-go/pulsar/auth"
        "github.com/google/uuid"
   )
   
   type SchemaTest struct {
        UserName string `json:"userName" avro:"userName"`
        UserAge  int    `json:"userAge" avro:"userAge"`
   }
   
   type WrongSchema struct {
        NotUserName string `json:"notUserName" avro:"notUserName"`
        NotUserAge  int    `json:"notUserAge" avro:"notUserAge"`
   }
   
   func main() {
        slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, nil)))
   
        // Load settings here or hard-code them
   
        token := "insert-your-token-here"
        authProvider := pulsarauth.NewAuthenticationToken(token)
   
        pulsarClient, err := client.CreatePulsarClient(settings.Url, 
authProvider)
        if err != nil {
                slog.Error("Error creating the pulsar client", err)
        }
        defer pulsarClient.Close()
   
        producerName := fmt.Sprintf("SchemaProducer-%s", uuid.NewString()[0:6])
   
        schemaDef := `{
     "type": "record",
     "name": "SchemaTest",
     "fields": [
       {
         "name": "userName",
         "type": "string"
       },
       {
         "name": "userAge",
         "type": "int"
       }
     ]
   }`
        schemaExample := pulsar.NewJSONSchema(schemaDef, nil)
   
        producer, err := pulsarClient.CreateProducer(pulsar.ProducerOptions{
                Name:   producerName,
                Topic:  settings.TopicAddress,
                Schema: schemaExample,
        })
        if err != nil {
                slog.Error("Error creating the pulsar producer", "error", err)
        }
        defer producer.Close()
   
        for {
                //goodMsg := &SchemaTest{
                //      UserName: "JohnDoe",
                //      UserAge:  30,
                //}
                badMsg := &WrongSchema{
                        NotUserName: "JohnDoe",
                        NotUserAge:  30,
                }
   
                msgId, sendErr := producer.Send(context.Background(), 
&pulsar.ProducerMessage{
                        //Value: goodMsg,
                        Value: badMsg,
                        //Schema: schemaExample,
                })
                if sendErr != nil {
                        slog.Error("Error sending message", "error", sendErr)
                        break
                }
   
                //slog.Info("Published message: ", slog.String("messageId", 
msgId.String()), slog.Any("message", goodMsg))
                slog.Info("Published message: ", slog.String("messageId", 
msgId.String()), slog.Any("message", badMsg))
                time.Sleep(time.Second * 1)
        }
   }
   ```
   
   The schema from the registry (http response dump):
   
   ```json
   {
        "version": 0,
        "type": "JSON",
        "timestamp": 1728915217796,
        "data": "{\n  \"type\": \"record\",\n  \"name\": \"SchemaTest\",\n  
\"fields\": [\n    {\n      \"name\": \"userName\",\n      \"type\": 
\"string\"\n    },\n    {\n      \"name\": \"userAge\",\n      \"type\": 
\"int\"\n    }\n  ]\n}",
        "properties": {}
   }
   ```
   
   #### System configuration
   
   **Pulsar version**: 3.0.6.8
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to