This is an automated email from the ASF dual-hosted git repository.

ayegorov pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new b86bda3151c [improve][io] KCA: flag to force optional primitive 
schemas (#19951)
b86bda3151c is described below

commit b86bda3151ca00b3f3ee5918bb4e00870d900865
Author: Andrey Yegorov <[email protected]>
AuthorDate: Wed Mar 29 13:55:22 2023 -0700

    [improve][io] KCA: flag to force optional primitive schemas (#19951)
    
    Motivation
    Kafka's schema has "Optional" flag that used there to validate data/allow 
nulls.
    Pulsar's schema does not have such info which makes conversion to kafka 
schema lossy.
    
    Modifications
    Added a config parameter that lets one force primitive schemas into 
optional ones.
    KV schema is always optional.
    
    Default is false, to match existing behavior.
    
    (cherry picked from commit 55523ac8f31fd6d54aacba326edef1f53028877e)
---
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  |  16 ++-
 .../connect/PulsarKafkaConnectSinkConfig.java      |   6 +
 .../connect/schema/PulsarSchemaToKafkaSchema.java  |  55 +++++++-
 .../io/kafka/connect/KafkaConnectSinkTest.java     |  41 ++++--
 .../connect/PulsarSchemaToKafkaSchemaTest.java     | 149 +++++++++++++++------
 5 files changed, 203 insertions(+), 64 deletions(-)

diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 0f2321eaaa4..c40bcff0162 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -91,6 +91,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
     private PulsarKafkaConnectSinkConfig kafkaSinkConfig;
 
     protected String topicName;
+    protected boolean useOptionalPrimitives;
 
     private boolean sanitizeTopicName = false;
     // Thi is a workaround for https://github.com/apache/pulsar/issues/19922
@@ -165,6 +166,7 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
         unwrapKeyValueIfAvailable = 
kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
         sanitizeTopicName = kafkaSinkConfig.isSanitizeTopicName();
         collapsePartitionedTopics = 
kafkaSinkConfig.isCollapsePartitionedTopics();
+        useOptionalPrimitives = kafkaSinkConfig.isUseOptionalPrimitives();
 
         useIndexAsOffset = kafkaSinkConfig.isUseIndexAsOffset();
         maxBatchBitsForOffset = kafkaSinkConfig.getMaxBatchBitsForOffset();
@@ -447,8 +449,11 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
                 && sourceRecord.getSchema().getSchemaInfo() != null
                 && sourceRecord.getSchema().getSchemaInfo().getType() == 
SchemaType.KEY_VALUE) {
             KeyValueSchema kvSchema = (KeyValueSchema) 
sourceRecord.getSchema();
-            keySchema = 
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getKeySchema());
-            valueSchema = 
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getValueSchema());
+            // Assume Key_Value schema's key and value are always optional
+            keySchema = PulsarSchemaToKafkaSchema
+                    .getOptionalKafkaConnectSchema(kvSchema.getKeySchema(), 
useOptionalPrimitives);
+            valueSchema = PulsarSchemaToKafkaSchema
+                    .getOptionalKafkaConnectSchema(kvSchema.getValueSchema(), 
useOptionalPrimitives);
 
             Object nativeObject = sourceRecord.getValue().getNativeObject();
 
@@ -465,12 +470,13 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
         } else {
             if (sourceRecord.getMessage().get().hasBase64EncodedKey()) {
                 key = sourceRecord.getMessage().get().getKeyBytes();
-                keySchema = Schema.BYTES_SCHEMA;
+                keySchema = useOptionalPrimitives ? 
Schema.OPTIONAL_BYTES_SCHEMA : Schema.BYTES_SCHEMA;
             } else {
                 key = sourceRecord.getKey().orElse(null);
-                keySchema = Schema.STRING_SCHEMA;
+                keySchema = useOptionalPrimitives ? 
Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA;
             }
-            valueSchema = 
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(sourceRecord.getSchema());
+            valueSchema = PulsarSchemaToKafkaSchema
+                    .getKafkaConnectSchema(sourceRecord.getSchema(), 
useOptionalPrimitives);
             value = 
KafkaConnectData.getKafkaConnectData(sourceRecord.getValue().getNativeObject(), 
valueSchema);
         }
 
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
index f15704de4d0..51ab20b910c 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
@@ -100,6 +100,12 @@ public class PulsarKafkaConnectSinkConfig implements 
Serializable {
             help = "Supply kafka record with topic name without -partition- 
suffix for partitioned topics.")
     private boolean collapsePartitionedTopics = false;
 
+    @FieldDoc(
+            defaultValue = "false",
+            help = "Pulsar schema does not contain information whether the 
Schema is optional, Kafka's does. \n"
+                    + "This provides a way to force all primitive schemas to 
be optional for Kafka. \n")
+    private boolean useOptionalPrimitives = false;
+
     public static PulsarKafkaConnectSinkConfig load(String yamlFile) throws 
IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
         return mapper.readValue(new File(yamlFile), 
PulsarKafkaConnectSinkConfig.class);
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
index d6cce4befa4..d61db4bbaa0 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
@@ -116,11 +116,15 @@ public class PulsarSchemaToKafkaSchema {
     }
 
     private static final ImmutableMap<SchemaType, Schema> 
pulsarSchemaTypeToKafkaSchema;
+    private static final ImmutableMap<SchemaType, Schema> 
pulsarSchemaTypeToOptionalKafkaSchema;
     private static final ImmutableSet<String> kafkaLogicalSchemas;
     private static final AvroData avroData = new AvroData(1000);
     private static final Cache<byte[], Schema> schemaCache =
             CacheBuilder.newBuilder().maximumSize(10000)
                     .expireAfterAccess(30, TimeUnit.MINUTES).build();
+    private static final Cache<Schema, Schema> optionalSchemaCache =
+            CacheBuilder.newBuilder().maximumSize(1000)
+                    .expireAfterAccess(30, TimeUnit.MINUTES).build();
 
     static {
         pulsarSchemaTypeToKafkaSchema = ImmutableMap.<SchemaType, 
Schema>builder()
@@ -135,6 +139,17 @@ public class PulsarSchemaToKafkaSchema {
                 .put(SchemaType.BYTES, Schema.BYTES_SCHEMA)
                 .put(SchemaType.DATE, Date.SCHEMA)
                 .build();
+        pulsarSchemaTypeToOptionalKafkaSchema = ImmutableMap.<SchemaType, 
Schema>builder()
+                .put(SchemaType.BOOLEAN, Schema.OPTIONAL_BOOLEAN_SCHEMA)
+                .put(SchemaType.INT8, Schema.OPTIONAL_INT8_SCHEMA)
+                .put(SchemaType.INT16, Schema.OPTIONAL_INT16_SCHEMA)
+                .put(SchemaType.INT32, Schema.OPTIONAL_INT32_SCHEMA)
+                .put(SchemaType.INT64, Schema.OPTIONAL_INT64_SCHEMA)
+                .put(SchemaType.FLOAT, Schema.OPTIONAL_FLOAT32_SCHEMA)
+                .put(SchemaType.DOUBLE, Schema.OPTIONAL_FLOAT64_SCHEMA)
+                .put(SchemaType.STRING, Schema.OPTIONAL_STRING_SCHEMA)
+                .put(SchemaType.BYTES, Schema.OPTIONAL_BYTES_SCHEMA)
+                .build();
         kafkaLogicalSchemas = ImmutableSet.<String>builder()
                 .add(Timestamp.LOGICAL_NAME)
                 .add(Date.LOGICAL_NAME)
@@ -155,12 +170,33 @@ public class PulsarSchemaToKafkaSchema {
         return parser.parse(schemaJson);
     }
 
-    public static Schema 
getOptionalKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) 
{
-        Schema s = getKafkaConnectSchema(pulsarSchema);
-        return new OptionalForcingSchema(s);
+    public static Schema makeOptional(Schema s) {
+        if (s == null || s.isOptional()) {
+            return s;
+        }
+
+        String logicalSchemaName = s.name();
+        if (kafkaLogicalSchemas.contains(logicalSchemaName)) {
+            return s;
+        }
+
+        try {
+            return optionalSchemaCache.get(s, () -> new 
OptionalForcingSchema(s));
+        } catch (ExecutionException | UncheckedExecutionException | 
ExecutionError ee) {
+            String msg = "Failed to create optional schema for " + s;
+            log.error(msg);
+            throw new IllegalStateException(msg, ee);
+        }
     }
 
-    public static Schema 
getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) {
+    public static Schema 
getOptionalKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema,
+                                               boolean useOptionalPrimitives) {
+        return makeOptional(getKafkaConnectSchema(pulsarSchema, 
useOptionalPrimitives));
+
+    }
+
+    public static Schema 
getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema,
+                                               boolean useOptionalPrimitives) {
         if (pulsarSchema == null || pulsarSchema.getSchemaInfo() == null) {
             throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Schema is 
required.", null);
         }
@@ -193,6 +229,11 @@ public class PulsarSchemaToKafkaSchema {
             throw new IllegalStateException("Unsupported Kafka Logical Schema 
" + logicalSchemaName);
         }
 
+        if (useOptionalPrimitives
+                && 
pulsarSchemaTypeToOptionalKafkaSchema.containsKey(pulsarSchema.getSchemaInfo().getType()))
 {
+            return 
pulsarSchemaTypeToOptionalKafkaSchema.get(pulsarSchema.getSchemaInfo().getType());
+        }
+
         if 
(pulsarSchemaTypeToKafkaSchema.containsKey(pulsarSchema.getSchemaInfo().getType()))
 {
             return 
pulsarSchemaTypeToKafkaSchema.get(pulsarSchema.getSchemaInfo().getType());
         }
@@ -201,8 +242,10 @@ public class PulsarSchemaToKafkaSchema {
             return schemaCache.get(pulsarSchema.getSchemaInfo().getSchema(), 
() -> {
                 if (pulsarSchema.getSchemaInfo().getType() == 
SchemaType.KEY_VALUE) {
                     KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema;
-                    return 
SchemaBuilder.map(getKafkaConnectSchema(kvSchema.getKeySchema()),
-                                    
getOptionalKafkaConnectSchema(kvSchema.getValueSchema()))
+                    return SchemaBuilder.map(
+                            
makeOptional(getKafkaConnectSchema(kvSchema.getKeySchema(), 
useOptionalPrimitives)),
+                            
makeOptional(getKafkaConnectSchema(kvSchema.getValueSchema(), 
useOptionalPrimitives)))
+                                .optional()
                                 .build();
                 }
                 org.apache.pulsar.kafka.shade.avro.Schema avroSchema =
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 775b8f2bee2..a9792f3498f 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -162,7 +162,7 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase {
         }
     }
 
-    private String offsetTopicName = 
"persistent://my-property/my-ns/kafka-connect-sink-offset";
+    final private String offsetTopicName = 
"persistent://my-property/my-ns/kafka-connect-sink-offset";
 
     private Path file;
     private Map<String, Object> props;
@@ -797,7 +797,9 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase {
                 .build());
 
         org.apache.kafka.connect.data.Schema kafkaSchema = 
PulsarSchemaToKafkaSchema
-                .getKafkaConnectSchema(schema);
+                .getKafkaConnectSchema(schema, true);
+
+        Assert.assertFalse(kafkaSchema.isOptional());
 
         java.util.Date date = getDateFromString("12/30/1999 11:12:13");
         Object connectData = KafkaConnectData
@@ -815,7 +817,9 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase {
                 .build());
 
         org.apache.kafka.connect.data.Schema kafkaSchema = 
PulsarSchemaToKafkaSchema
-                .getKafkaConnectSchema(schema);
+                .getKafkaConnectSchema(schema, true);
+
+        Assert.assertFalse(kafkaSchema.isOptional());
 
         java.util.Date date = getDateFromString("01/01/1970 11:12:13");
         Object connectData = KafkaConnectData
@@ -833,7 +837,9 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase {
                 .build());
 
         org.apache.kafka.connect.data.Schema kafkaSchema = 
PulsarSchemaToKafkaSchema
-                .getKafkaConnectSchema(schema);
+                .getKafkaConnectSchema(schema, true);
+
+        Assert.assertFalse(kafkaSchema.isOptional());
 
         java.util.Date date = getDateFromString("12/31/2022 00:00:00");
         Object connectData = KafkaConnectData
@@ -854,7 +860,9 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase {
                 .build());
 
         org.apache.kafka.connect.data.Schema kafkaSchema = 
PulsarSchemaToKafkaSchema
-                .getKafkaConnectSchema(schema);
+                .getKafkaConnectSchema(schema, true);
+
+        Assert.assertFalse(kafkaSchema.isOptional());
 
         Object connectData = KafkaConnectData
                 .getKafkaConnectData(Decimal.fromLogical(kafkaSchema, 
BigDecimal.valueOf(100L, 10)), kafkaSchema);
@@ -874,11 +882,11 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase {
                 getGenericRecord(value, pulsarAvroSchema));
 
         org.apache.kafka.connect.data.Schema kafkaSchema = 
PulsarSchemaToKafkaSchema
-                .getKafkaConnectSchema(Schema.KeyValue(pulsarAvroSchema, 
pulsarAvroSchema));
+                .getKafkaConnectSchema(Schema.KeyValue(pulsarAvroSchema, 
pulsarAvroSchema), false);
 
-        Object connectData = KafkaConnectData.getKafkaConnectData(kv, 
kafkaSchema);
-
-        org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, 
connectData);
+        Assert.assertTrue(kafkaSchema.isOptional());
+        Assert.assertTrue(kafkaSchema.keySchema().isOptional());
+        Assert.assertTrue(kafkaSchema.valueSchema().isOptional());
     }
 
     @Test
@@ -990,7 +998,8 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase {
         Object value = pojoAsAvroRecord(pojo, pulsarAvroSchema);
 
         org.apache.kafka.connect.data.Schema kafkaSchema = 
PulsarSchemaToKafkaSchema
-                .getKafkaConnectSchema(pulsarAvroSchema);
+                .getKafkaConnectSchema(pulsarAvroSchema, false);
+        Assert.assertFalse(kafkaSchema.isOptional());
 
         Object connectData = KafkaConnectData.getKafkaConnectData(value, 
kafkaSchema);
 
@@ -999,6 +1008,18 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase {
         Object jsonNode = pojoAsJsonNode(pojo);
         connectData = KafkaConnectData.getKafkaConnectData(jsonNode, 
kafkaSchema);
         org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, 
connectData);
+
+        kafkaSchema = PulsarSchemaToKafkaSchema
+                .getKafkaConnectSchema(pulsarAvroSchema, true);
+        Assert.assertFalse(kafkaSchema.isOptional());
+
+        connectData = KafkaConnectData.getKafkaConnectData(value, kafkaSchema);
+
+        org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, 
connectData);
+
+        jsonNode = pojoAsJsonNode(pojo);
+        connectData = KafkaConnectData.getKafkaConnectData(jsonNode, 
kafkaSchema);
+        org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, 
connectData);
     }
 
     private JsonNode pojoAsJsonNode(Object pojo) {
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
index ecf0633f588..d4de626e06f 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
 import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import java.math.BigInteger;
@@ -39,6 +40,8 @@ import java.util.List;
 import java.util.Map;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
 
 /**
  * Test the conversion of PulsarSchema To KafkaSchema\.
@@ -132,101 +135,134 @@ public class PulsarSchemaToKafkaSchemaTest {
         String[] stringArr;
     }
 
-    @Test
-    public void bytesSchemaTest() {
+    @DataProvider(name = "useOptionalPrimitives")
+    public static Object[][] useOptionalPrimitives() {
+        return new Object[][] {
+                {true},
+                {false}
+        };
+    }
+
+    @Test(dataProvider = "useOptionalPrimitives")
+    public void bytesSchemaTest(boolean useOptionalPrimitives) {
         org.apache.kafka.connect.data.Schema kafkaSchema =
-                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BYTES);
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BYTES, 
useOptionalPrimitives);
         assertEquals(kafkaSchema.type(), 
org.apache.kafka.connect.data.Schema.Type.BYTES);
+        assertEquals(useOptionalPrimitives, kafkaSchema.isOptional());
 
         kafkaSchema =
-                
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BYTEBUFFER);
+                
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BYTEBUFFER, 
useOptionalPrimitives);
         assertEquals(kafkaSchema.type(), 
org.apache.kafka.connect.data.Schema.Type.BYTES);
+        assertEquals(useOptionalPrimitives, kafkaSchema.isOptional());
     }
 
-    @Test
-    public void stringSchemaTest() {
+    @Test(dataProvider = "useOptionalPrimitives")
+    public void stringSchemaTest(boolean useOptionalPrimitives) {
         org.apache.kafka.connect.data.Schema kafkaSchema =
-                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.STRING);
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.STRING, 
useOptionalPrimitives);
         assertEquals(kafkaSchema.type(), 
org.apache.kafka.connect.data.Schema.Type.STRING);
+        assertEquals(useOptionalPrimitives, kafkaSchema.isOptional());
     }
 
-    @Test
-    public void booleanSchemaTest() {
+    @Test(dataProvider = "useOptionalPrimitives")
+    public void booleanSchemaTest(boolean useOptionalPrimitives) {
         org.apache.kafka.connect.data.Schema kafkaSchema =
-                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BOOL);
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BOOL, 
useOptionalPrimitives);
         assertEquals(kafkaSchema.type(), 
org.apache.kafka.connect.data.Schema.Type.BOOLEAN);
+        assertEquals(useOptionalPrimitives, kafkaSchema.isOptional());
     }
 
-    @Test
-    public void int8SchemaTest() {
+    @Test(dataProvider = "useOptionalPrimitives")
+    public void int8SchemaTest(boolean useOptionalPrimitives) {
         org.apache.kafka.connect.data.Schema kafkaSchema =
-                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT8);
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT8, 
useOptionalPrimitives);
         assertEquals(kafkaSchema.type(), 
org.apache.kafka.connect.data.Schema.Type.INT8);
+        assertEquals(useOptionalPrimitives, kafkaSchema.isOptional());
     }
 
-    @Test
-    public void int16SchemaTest() {
+    @Test(dataProvider = "useOptionalPrimitives")
+    public void int16SchemaTest(boolean useOptionalPrimitives) {
         org.apache.kafka.connect.data.Schema kafkaSchema =
-                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT16);
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT16, 
useOptionalPrimitives);
         assertEquals(kafkaSchema.type(), 
org.apache.kafka.connect.data.Schema.Type.INT16);
+        assertEquals(useOptionalPrimitives, kafkaSchema.isOptional());
     }
 
-    @Test
-    public void int32SchemaTest() {
+    @Test(dataProvider = "useOptionalPrimitives")
+    public void int32SchemaTest(boolean useOptionalPrimitives) {
         org.apache.kafka.connect.data.Schema kafkaSchema =
-                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT32);
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT32, 
useOptionalPrimitives);
         assertEquals(kafkaSchema.type(), 
org.apache.kafka.connect.data.Schema.Type.INT32);
+        assertEquals(useOptionalPrimitives, kafkaSchema.isOptional());
     }
 
-    @Test
-    public void int64SchemaTest() {
+    @Test(dataProvider = "useOptionalPrimitives")
+    public void int64SchemaTest(boolean useOptionalPrimitives) {
         org.apache.kafka.connect.data.Schema kafkaSchema =
-                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT64);
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT64, 
useOptionalPrimitives);
         assertEquals(kafkaSchema.type(), 
org.apache.kafka.connect.data.Schema.Type.INT64);
+        assertEquals(useOptionalPrimitives, kafkaSchema.isOptional());
     }
 
-    @Test
-    public void float32SchemaTest() {
+    @Test(dataProvider = "useOptionalPrimitives")
+    public void float32SchemaTest(boolean useOptionalPrimitives) {
         org.apache.kafka.connect.data.Schema kafkaSchema =
-                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.FLOAT);
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.FLOAT, 
useOptionalPrimitives);
         assertEquals(kafkaSchema.type(), 
org.apache.kafka.connect.data.Schema.Type.FLOAT32);
+        assertEquals(useOptionalPrimitives, kafkaSchema.isOptional());
     }
 
-    @Test
-    public void float64SchemaTest() {
+    @Test(dataProvider = "useOptionalPrimitives")
+    public void float64SchemaTest(boolean useOptionalPrimitives) {
         org.apache.kafka.connect.data.Schema kafkaSchema =
-                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.DOUBLE);
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.DOUBLE, 
useOptionalPrimitives);
         assertEquals(kafkaSchema.type(), 
org.apache.kafka.connect.data.Schema.Type.FLOAT64);
+        assertEquals(useOptionalPrimitives, kafkaSchema.isOptional());
     }
 
-    @Test
-    public void kvBytesSchemaTest() {
+    @Test(dataProvider = "useOptionalPrimitives")
+    public void kvBytesSchemaTest(boolean useOptionalPrimitives) {
         org.apache.kafka.connect.data.Schema kafkaSchema =
-                
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.KV_BYTES());
+                
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.KV_BYTES(), 
useOptionalPrimitives);
         assertEquals(kafkaSchema.type(), 
org.apache.kafka.connect.data.Schema.Type.MAP);
         assertEquals(kafkaSchema.keySchema().type(), 
org.apache.kafka.connect.data.Schema.Type.BYTES);
         assertEquals(kafkaSchema.valueSchema().type(), 
org.apache.kafka.connect.data.Schema.Type.BYTES);
+        assertTrue(kafkaSchema.isOptional());
+
+        // key and value are always optional
+        assertTrue(kafkaSchema.keySchema().isOptional());
+        assertTrue(kafkaSchema.valueSchema().isOptional());
     }
 
     @Test
     public void kvBytesIntSchemaTests() {
         Schema pulsarKvSchema = KeyValueSchemaImpl.of(Schema.STRING, 
Schema.INT64);
         org.apache.kafka.connect.data.Schema kafkaSchema =
-                
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarKvSchema);
+                
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarKvSchema, false);
         assertEquals(kafkaSchema.type(), 
org.apache.kafka.connect.data.Schema.Type.MAP);
         assertEquals(kafkaSchema.keySchema().type(), 
org.apache.kafka.connect.data.Schema.Type.STRING);
         assertEquals(kafkaSchema.valueSchema().type(), 
org.apache.kafka.connect.data.Schema.Type.INT64);
+        assertTrue(kafkaSchema.isOptional());
+
+        // key and value are always optional
+        assertTrue(kafkaSchema.keySchema().isOptional());
+        assertTrue(kafkaSchema.valueSchema().isOptional());
     }
 
     @Test
     public void avroSchemaTest() {
         AvroSchema<StructWithAnnotations> pulsarAvroSchema = 
AvroSchema.of(StructWithAnnotations.class);
         org.apache.kafka.connect.data.Schema kafkaSchema =
-                
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema);
+                
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema, false);
+        org.apache.kafka.connect.data.Schema kafkaSchemaOpt =
+                
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema, true);
         assertEquals(kafkaSchema.type(), 
org.apache.kafka.connect.data.Schema.Type.STRUCT);
         assertEquals(kafkaSchema.fields().size(), STRUCT_FIELDS.size());
         for (String name: STRUCT_FIELDS) {
             assertEquals(kafkaSchema.field(name).name(), name);
+            // set by avro schema
+            assertEquals(kafkaSchema.field(name).schema().isOptional(),
+                    kafkaSchemaOpt.field(name).schema().isOptional());
         }
     }
 
@@ -234,11 +270,16 @@ public class PulsarSchemaToKafkaSchemaTest {
     public void avroComplexSchemaTest() {
         AvroSchema<ComplexStruct> pulsarAvroSchema = 
AvroSchema.of(ComplexStruct.class);
         org.apache.kafka.connect.data.Schema kafkaSchema =
-                
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema);
+                
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema, false);
+        org.apache.kafka.connect.data.Schema kafkaSchemaOpt =
+                
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema, true);
         assertEquals(kafkaSchema.type(), 
org.apache.kafka.connect.data.Schema.Type.STRUCT);
         assertEquals(kafkaSchema.fields().size(), 
COMPLEX_STRUCT_FIELDS.size());
         for (String name: COMPLEX_STRUCT_FIELDS) {
             assertEquals(kafkaSchema.field(name).name(), name);
+            // set by avro schema
+            assertEquals(kafkaSchema.field(name).schema().isOptional(),
+                    kafkaSchemaOpt.field(name).schema().isOptional());
         }
     }
 
@@ -250,11 +291,16 @@ public class PulsarSchemaToKafkaSchemaTest {
                 .withAlwaysAllowNull(false)
                 .build());
         org.apache.kafka.connect.data.Schema kafkaSchema =
-                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema);
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema, 
false);
+        org.apache.kafka.connect.data.Schema kafkaSchemaOpt =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema, 
true);
         assertEquals(kafkaSchema.type(), 
org.apache.kafka.connect.data.Schema.Type.STRUCT);
         assertEquals(kafkaSchema.fields().size(), STRUCT_FIELDS.size());
         for (String name: STRUCT_FIELDS) {
             assertEquals(kafkaSchema.field(name).name(), name);
+            // set by schema
+            assertEquals(kafkaSchema.field(name).schema().isOptional(),
+                    kafkaSchemaOpt.field(name).schema().isOptional());
         }
     }
 
@@ -266,11 +312,27 @@ public class PulsarSchemaToKafkaSchemaTest {
                         .withAlwaysAllowNull(false)
                         .build());
         org.apache.kafka.connect.data.Schema kafkaSchema =
-                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema);
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema, 
false);
         assertEquals(kafkaSchema.type(), 
org.apache.kafka.connect.data.Schema.Type.STRUCT);
         assertEquals(kafkaSchema.fields().size(), 
COMPLEX_STRUCT_FIELDS.size());
         for (String name: COMPLEX_STRUCT_FIELDS) {
             assertEquals(kafkaSchema.field(name).name(), name);
+            assertFalse(kafkaSchema.field(name).schema().isOptional());
+        }
+
+        kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema, 
true);
+        assertEquals(kafkaSchema.type(), 
org.apache.kafka.connect.data.Schema.Type.STRUCT);
+        assertEquals(kafkaSchema.fields().size(), 
COMPLEX_STRUCT_FIELDS.size());
+        for (String name: COMPLEX_STRUCT_FIELDS) {
+            assertEquals(kafkaSchema.field(name).name(), name);
+            assertFalse(kafkaSchema.field(name).schema().isOptional());
+
+            if (kafkaSchema.field(name).schema().type().isPrimitive()) {
+                // false because .withAlwaysAllowNull(false), avroschema 
values are used
+                assertFalse(kafkaSchema.field(name).schema().isOptional(),
+                        kafkaSchema.field(name).schema().type().getName());
+            }
         }
     }
 
@@ -308,39 +370,40 @@ public class PulsarSchemaToKafkaSchemaTest {
     @Test
     public void dateSchemaTest() {
         org.apache.kafka.connect.data.Schema kafkaSchema =
-                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.DATE);
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.DATE, 
true);
         assertEquals(kafkaSchema.type(), Date.SCHEMA.type());
+        assertFalse(kafkaSchema.isOptional());
     }
 
     // not supported schemas below:
     @Test(expectedExceptions = IllegalStateException.class)
     public void timeSchemaTest() {
-        PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.TIME);
+        PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.TIME, false);
     }
 
     @Test(expectedExceptions = IllegalStateException.class)
     public void timestampSchemaTest() {
-        PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.TIMESTAMP);
+        PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.TIMESTAMP, 
false);
     }
 
     @Test(expectedExceptions = IllegalStateException.class)
     public void instantSchemaTest() {
-        PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INSTANT);
+        PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INSTANT, false);
     }
 
     @Test(expectedExceptions = IllegalStateException.class)
     public void localDateSchemaTest() {
-        PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_DATE);
+        PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_DATE, 
false);
     }
 
     @Test(expectedExceptions = IllegalStateException.class)
     public void localTimeSchemaTest() {
-        PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_TIME);
+        PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_TIME, 
false);
     }
 
     @Test(expectedExceptions = IllegalStateException.class)
     public void localDatetimeSchemaTest() {
-        
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_DATE_TIME);
+        
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_DATE_TIME, false);
     }
 
 }

Reply via email to