maver1ck edited a comment on pull request #12919:
URL: https://github.com/apache/flink/pull/12919#issuecomment-715222930


   @danny0405 
   I think we have one more problem.
   When Flink is creating schema in registry nullability is not properly set 
for logical types.
   Examples. Table:
   ```
   create table `test_logical_null` (
        `string_field` STRING,
        `timestamp_field` TIMESTAMP(3)
   ) WITH (
     'connector' = 'kafka', 
     'topic' = 'test-logical-null', 
     'properties.bootstrap.servers' = 'localhost:9092', 
     'properties.group.id' = 'test12345', 
      'scan.startup.mode' = 'earliest-offset', 
     'format' = 'avro-confluent', -- Must be set to 'avro-confluent' to 
configure this format.
     'avro-confluent.schema-registry.url' = 'http://localhost:8081', -- URL to 
connect to Confluent Schema Registry
     'avro-confluent.schema-registry.subject' = 'test-logical-null' -- Subject 
name to write to the Schema Registry service; required for sinks
   )
   ```
   Schema:
   ```
   {
     "type": "record",
     "name": "record",
     "fields": [
       {
         "name": "string_field",
         "type": [
           "string",
           "null"
         ]
       },
       {
         "name": "timestamp_field",
         "type": {
           "type": "long",
           "logicalType": "timestamp-millis"
         }
       }
     ]
   }
   ```
   For not null fields:
   ```
   create table `test_logical_notnull` (
        `string_field` STRING NOT NULL,
        `timestamp_field` TIMESTAMP(3) NOT NULL
   ) WITH (
     'connector' = 'kafka', 
     'topic' = 'test-logical-notnull', 
     'properties.bootstrap.servers' = 'localhost:9092', 
     'properties.group.id' = 'test12345', 
      'scan.startup.mode' = 'earliest-offset', 
     'format' = 'avro-confluent', -- Must be set to 'avro-confluent' to 
configure this format.
     'avro-confluent.schema-registry.url' = 'http://localhost:8081', -- URL to 
connect to Confluent Schema Registry
     'avro-confluent.schema-registry.subject' = 'test-logical-notnull-value' -- 
Subject name to write to the Schema Registry service; required for sinks
   );
   ```
   Schema
   ```
   {
     "type": "record",
     "name": "record",
     "fields": [
       {
         "name": "string_field",
         "type": "string"
       },
       {
         "name": "timestamp_field",
         "type": {
           "type": "long",
           "logicalType": "timestamp-millis"
         }
       }
     ]
   }
   ```
   As you can see for string_field we have proper union with null (for nullable 
field). For timestamp_field in both examples union is missing.
   
   EDIT: I added bug report for this
   https://issues.apache.org/jira/browse/FLINK-19786


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to