This is an automated email from the ASF dual-hosted git repository.
ayegorov pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new b86bda3151c [improve][io] KCA: flag to force optional primitive
schemas (#19951)
b86bda3151c is described below
commit b86bda3151ca00b3f3ee5918bb4e00870d900865
Author: Andrey Yegorov <[email protected]>
AuthorDate: Wed Mar 29 13:55:22 2023 -0700
[improve][io] KCA: flag to force optional primitive schemas (#19951)
Motivation
Kafka's schema has "Optional" flag that used there to validate data/allow
nulls.
Pulsar's schema does not have such info which makes conversion to kafka
schema lossy.
Modifications
Added a config parameter that lets one force primitive schemas into
optional ones.
KV schema is always optional.
Default is false, to match existing behavior.
(cherry picked from commit 55523ac8f31fd6d54aacba326edef1f53028877e)
---
.../pulsar/io/kafka/connect/KafkaConnectSink.java | 16 ++-
.../connect/PulsarKafkaConnectSinkConfig.java | 6 +
.../connect/schema/PulsarSchemaToKafkaSchema.java | 55 +++++++-
.../io/kafka/connect/KafkaConnectSinkTest.java | 41 ++++--
.../connect/PulsarSchemaToKafkaSchemaTest.java | 149 +++++++++++++++------
5 files changed, 203 insertions(+), 64 deletions(-)
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 0f2321eaaa4..c40bcff0162 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -91,6 +91,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
private PulsarKafkaConnectSinkConfig kafkaSinkConfig;
protected String topicName;
+ protected boolean useOptionalPrimitives;
private boolean sanitizeTopicName = false;
// Thi is a workaround for https://github.com/apache/pulsar/issues/19922
@@ -165,6 +166,7 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
unwrapKeyValueIfAvailable =
kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
sanitizeTopicName = kafkaSinkConfig.isSanitizeTopicName();
collapsePartitionedTopics =
kafkaSinkConfig.isCollapsePartitionedTopics();
+ useOptionalPrimitives = kafkaSinkConfig.isUseOptionalPrimitives();
useIndexAsOffset = kafkaSinkConfig.isUseIndexAsOffset();
maxBatchBitsForOffset = kafkaSinkConfig.getMaxBatchBitsForOffset();
@@ -447,8 +449,11 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
&& sourceRecord.getSchema().getSchemaInfo() != null
&& sourceRecord.getSchema().getSchemaInfo().getType() ==
SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema)
sourceRecord.getSchema();
- keySchema =
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getKeySchema());
- valueSchema =
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getValueSchema());
+ // Assume Key_Value schema's key and value are always optional
+ keySchema = PulsarSchemaToKafkaSchema
+ .getOptionalKafkaConnectSchema(kvSchema.getKeySchema(),
useOptionalPrimitives);
+ valueSchema = PulsarSchemaToKafkaSchema
+ .getOptionalKafkaConnectSchema(kvSchema.getValueSchema(),
useOptionalPrimitives);
Object nativeObject = sourceRecord.getValue().getNativeObject();
@@ -465,12 +470,13 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
} else {
if (sourceRecord.getMessage().get().hasBase64EncodedKey()) {
key = sourceRecord.getMessage().get().getKeyBytes();
- keySchema = Schema.BYTES_SCHEMA;
+ keySchema = useOptionalPrimitives ?
Schema.OPTIONAL_BYTES_SCHEMA : Schema.BYTES_SCHEMA;
} else {
key = sourceRecord.getKey().orElse(null);
- keySchema = Schema.STRING_SCHEMA;
+ keySchema = useOptionalPrimitives ?
Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA;
}
- valueSchema =
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(sourceRecord.getSchema());
+ valueSchema = PulsarSchemaToKafkaSchema
+ .getKafkaConnectSchema(sourceRecord.getSchema(),
useOptionalPrimitives);
value =
KafkaConnectData.getKafkaConnectData(sourceRecord.getValue().getNativeObject(),
valueSchema);
}
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
index f15704de4d0..51ab20b910c 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
@@ -100,6 +100,12 @@ public class PulsarKafkaConnectSinkConfig implements
Serializable {
help = "Supply kafka record with topic name without -partition-
suffix for partitioned topics.")
private boolean collapsePartitionedTopics = false;
+ @FieldDoc(
+ defaultValue = "false",
+ help = "Pulsar schema does not contain information whether the
Schema is optional, Kafka's does. \n"
+ + "This provides a way to force all primitive schemas to
be optional for Kafka. \n")
+ private boolean useOptionalPrimitives = false;
+
public static PulsarKafkaConnectSinkConfig load(String yamlFile) throws
IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile),
PulsarKafkaConnectSinkConfig.class);
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
index d6cce4befa4..d61db4bbaa0 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
@@ -116,11 +116,15 @@ public class PulsarSchemaToKafkaSchema {
}
private static final ImmutableMap<SchemaType, Schema>
pulsarSchemaTypeToKafkaSchema;
+ private static final ImmutableMap<SchemaType, Schema>
pulsarSchemaTypeToOptionalKafkaSchema;
private static final ImmutableSet<String> kafkaLogicalSchemas;
private static final AvroData avroData = new AvroData(1000);
private static final Cache<byte[], Schema> schemaCache =
CacheBuilder.newBuilder().maximumSize(10000)
.expireAfterAccess(30, TimeUnit.MINUTES).build();
+ private static final Cache<Schema, Schema> optionalSchemaCache =
+ CacheBuilder.newBuilder().maximumSize(1000)
+ .expireAfterAccess(30, TimeUnit.MINUTES).build();
static {
pulsarSchemaTypeToKafkaSchema = ImmutableMap.<SchemaType,
Schema>builder()
@@ -135,6 +139,17 @@ public class PulsarSchemaToKafkaSchema {
.put(SchemaType.BYTES, Schema.BYTES_SCHEMA)
.put(SchemaType.DATE, Date.SCHEMA)
.build();
+ pulsarSchemaTypeToOptionalKafkaSchema = ImmutableMap.<SchemaType,
Schema>builder()
+ .put(SchemaType.BOOLEAN, Schema.OPTIONAL_BOOLEAN_SCHEMA)
+ .put(SchemaType.INT8, Schema.OPTIONAL_INT8_SCHEMA)
+ .put(SchemaType.INT16, Schema.OPTIONAL_INT16_SCHEMA)
+ .put(SchemaType.INT32, Schema.OPTIONAL_INT32_SCHEMA)
+ .put(SchemaType.INT64, Schema.OPTIONAL_INT64_SCHEMA)
+ .put(SchemaType.FLOAT, Schema.OPTIONAL_FLOAT32_SCHEMA)
+ .put(SchemaType.DOUBLE, Schema.OPTIONAL_FLOAT64_SCHEMA)
+ .put(SchemaType.STRING, Schema.OPTIONAL_STRING_SCHEMA)
+ .put(SchemaType.BYTES, Schema.OPTIONAL_BYTES_SCHEMA)
+ .build();
kafkaLogicalSchemas = ImmutableSet.<String>builder()
.add(Timestamp.LOGICAL_NAME)
.add(Date.LOGICAL_NAME)
@@ -155,12 +170,33 @@ public class PulsarSchemaToKafkaSchema {
return parser.parse(schemaJson);
}
- public static Schema
getOptionalKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema)
{
- Schema s = getKafkaConnectSchema(pulsarSchema);
- return new OptionalForcingSchema(s);
+ public static Schema makeOptional(Schema s) {
+ if (s == null || s.isOptional()) {
+ return s;
+ }
+
+ String logicalSchemaName = s.name();
+ if (kafkaLogicalSchemas.contains(logicalSchemaName)) {
+ return s;
+ }
+
+ try {
+ return optionalSchemaCache.get(s, () -> new
OptionalForcingSchema(s));
+ } catch (ExecutionException | UncheckedExecutionException |
ExecutionError ee) {
+ String msg = "Failed to create optional schema for " + s;
+ log.error(msg);
+ throw new IllegalStateException(msg, ee);
+ }
}
- public static Schema
getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) {
+ public static Schema
getOptionalKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema,
+ boolean useOptionalPrimitives) {
+ return makeOptional(getKafkaConnectSchema(pulsarSchema,
useOptionalPrimitives));
+
+ }
+
+ public static Schema
getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema,
+ boolean useOptionalPrimitives) {
if (pulsarSchema == null || pulsarSchema.getSchemaInfo() == null) {
throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Schema is
required.", null);
}
@@ -193,6 +229,11 @@ public class PulsarSchemaToKafkaSchema {
throw new IllegalStateException("Unsupported Kafka Logical Schema
" + logicalSchemaName);
}
+ if (useOptionalPrimitives
+ &&
pulsarSchemaTypeToOptionalKafkaSchema.containsKey(pulsarSchema.getSchemaInfo().getType()))
{
+ return
pulsarSchemaTypeToOptionalKafkaSchema.get(pulsarSchema.getSchemaInfo().getType());
+ }
+
if
(pulsarSchemaTypeToKafkaSchema.containsKey(pulsarSchema.getSchemaInfo().getType()))
{
return
pulsarSchemaTypeToKafkaSchema.get(pulsarSchema.getSchemaInfo().getType());
}
@@ -201,8 +242,10 @@ public class PulsarSchemaToKafkaSchema {
return schemaCache.get(pulsarSchema.getSchemaInfo().getSchema(),
() -> {
if (pulsarSchema.getSchemaInfo().getType() ==
SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema;
- return
SchemaBuilder.map(getKafkaConnectSchema(kvSchema.getKeySchema()),
-
getOptionalKafkaConnectSchema(kvSchema.getValueSchema()))
+ return SchemaBuilder.map(
+
makeOptional(getKafkaConnectSchema(kvSchema.getKeySchema(),
useOptionalPrimitives)),
+
makeOptional(getKafkaConnectSchema(kvSchema.getValueSchema(),
useOptionalPrimitives)))
+ .optional()
.build();
}
org.apache.pulsar.kafka.shade.avro.Schema avroSchema =
diff --git
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 775b8f2bee2..a9792f3498f 100644
---
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -162,7 +162,7 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
}
}
- private String offsetTopicName =
"persistent://my-property/my-ns/kafka-connect-sink-offset";
+ final private String offsetTopicName =
"persistent://my-property/my-ns/kafka-connect-sink-offset";
private Path file;
private Map<String, Object> props;
@@ -797,7 +797,9 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
.build());
org.apache.kafka.connect.data.Schema kafkaSchema =
PulsarSchemaToKafkaSchema
- .getKafkaConnectSchema(schema);
+ .getKafkaConnectSchema(schema, true);
+
+ Assert.assertFalse(kafkaSchema.isOptional());
java.util.Date date = getDateFromString("12/30/1999 11:12:13");
Object connectData = KafkaConnectData
@@ -815,7 +817,9 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
.build());
org.apache.kafka.connect.data.Schema kafkaSchema =
PulsarSchemaToKafkaSchema
- .getKafkaConnectSchema(schema);
+ .getKafkaConnectSchema(schema, true);
+
+ Assert.assertFalse(kafkaSchema.isOptional());
java.util.Date date = getDateFromString("01/01/1970 11:12:13");
Object connectData = KafkaConnectData
@@ -833,7 +837,9 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
.build());
org.apache.kafka.connect.data.Schema kafkaSchema =
PulsarSchemaToKafkaSchema
- .getKafkaConnectSchema(schema);
+ .getKafkaConnectSchema(schema, true);
+
+ Assert.assertFalse(kafkaSchema.isOptional());
java.util.Date date = getDateFromString("12/31/2022 00:00:00");
Object connectData = KafkaConnectData
@@ -854,7 +860,9 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
.build());
org.apache.kafka.connect.data.Schema kafkaSchema =
PulsarSchemaToKafkaSchema
- .getKafkaConnectSchema(schema);
+ .getKafkaConnectSchema(schema, true);
+
+ Assert.assertFalse(kafkaSchema.isOptional());
Object connectData = KafkaConnectData
.getKafkaConnectData(Decimal.fromLogical(kafkaSchema,
BigDecimal.valueOf(100L, 10)), kafkaSchema);
@@ -874,11 +882,11 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
getGenericRecord(value, pulsarAvroSchema));
org.apache.kafka.connect.data.Schema kafkaSchema =
PulsarSchemaToKafkaSchema
- .getKafkaConnectSchema(Schema.KeyValue(pulsarAvroSchema,
pulsarAvroSchema));
+ .getKafkaConnectSchema(Schema.KeyValue(pulsarAvroSchema,
pulsarAvroSchema), false);
- Object connectData = KafkaConnectData.getKafkaConnectData(kv,
kafkaSchema);
-
- org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema,
connectData);
+ Assert.assertTrue(kafkaSchema.isOptional());
+ Assert.assertTrue(kafkaSchema.keySchema().isOptional());
+ Assert.assertTrue(kafkaSchema.valueSchema().isOptional());
}
@Test
@@ -990,7 +998,8 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
Object value = pojoAsAvroRecord(pojo, pulsarAvroSchema);
org.apache.kafka.connect.data.Schema kafkaSchema =
PulsarSchemaToKafkaSchema
- .getKafkaConnectSchema(pulsarAvroSchema);
+ .getKafkaConnectSchema(pulsarAvroSchema, false);
+ Assert.assertFalse(kafkaSchema.isOptional());
Object connectData = KafkaConnectData.getKafkaConnectData(value,
kafkaSchema);
@@ -999,6 +1008,18 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
Object jsonNode = pojoAsJsonNode(pojo);
connectData = KafkaConnectData.getKafkaConnectData(jsonNode,
kafkaSchema);
org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema,
connectData);
+
+ kafkaSchema = PulsarSchemaToKafkaSchema
+ .getKafkaConnectSchema(pulsarAvroSchema, true);
+ Assert.assertFalse(kafkaSchema.isOptional());
+
+ connectData = KafkaConnectData.getKafkaConnectData(value, kafkaSchema);
+
+ org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema,
connectData);
+
+ jsonNode = pojoAsJsonNode(pojo);
+ connectData = KafkaConnectData.getKafkaConnectData(jsonNode,
kafkaSchema);
+ org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema,
connectData);
}
private JsonNode pojoAsJsonNode(Object pojo) {
diff --git
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
index ecf0633f588..d4de626e06f 100644
---
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
+++
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.math.BigInteger;
@@ -39,6 +40,8 @@ import java.util.List;
import java.util.Map;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
/**
* Test the conversion of PulsarSchema To KafkaSchema\.
@@ -132,101 +135,134 @@ public class PulsarSchemaToKafkaSchemaTest {
String[] stringArr;
}
- @Test
- public void bytesSchemaTest() {
+ @DataProvider(name = "useOptionalPrimitives")
+ public static Object[][] useOptionalPrimitives() {
+ return new Object[][] {
+ {true},
+ {false}
+ };
+ }
+
+ @Test(dataProvider = "useOptionalPrimitives")
+ public void bytesSchemaTest(boolean useOptionalPrimitives) {
org.apache.kafka.connect.data.Schema kafkaSchema =
- PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BYTES);
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BYTES,
useOptionalPrimitives);
assertEquals(kafkaSchema.type(),
org.apache.kafka.connect.data.Schema.Type.BYTES);
+ assertEquals(useOptionalPrimitives, kafkaSchema.isOptional());
kafkaSchema =
-
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BYTEBUFFER);
+
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BYTEBUFFER,
useOptionalPrimitives);
assertEquals(kafkaSchema.type(),
org.apache.kafka.connect.data.Schema.Type.BYTES);
+ assertEquals(useOptionalPrimitives, kafkaSchema.isOptional());
}
- @Test
- public void stringSchemaTest() {
+ @Test(dataProvider = "useOptionalPrimitives")
+ public void stringSchemaTest(boolean useOptionalPrimitives) {
org.apache.kafka.connect.data.Schema kafkaSchema =
- PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.STRING);
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.STRING,
useOptionalPrimitives);
assertEquals(kafkaSchema.type(),
org.apache.kafka.connect.data.Schema.Type.STRING);
+ assertEquals(useOptionalPrimitives, kafkaSchema.isOptional());
}
- @Test
- public void booleanSchemaTest() {
+ @Test(dataProvider = "useOptionalPrimitives")
+ public void booleanSchemaTest(boolean useOptionalPrimitives) {
org.apache.kafka.connect.data.Schema kafkaSchema =
- PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BOOL);
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BOOL,
useOptionalPrimitives);
assertEquals(kafkaSchema.type(),
org.apache.kafka.connect.data.Schema.Type.BOOLEAN);
+ assertEquals(useOptionalPrimitives, kafkaSchema.isOptional());
}
- @Test
- public void int8SchemaTest() {
+ @Test(dataProvider = "useOptionalPrimitives")
+ public void int8SchemaTest(boolean useOptionalPrimitives) {
org.apache.kafka.connect.data.Schema kafkaSchema =
- PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT8);
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT8,
useOptionalPrimitives);
assertEquals(kafkaSchema.type(),
org.apache.kafka.connect.data.Schema.Type.INT8);
+ assertEquals(useOptionalPrimitives, kafkaSchema.isOptional());
}
- @Test
- public void int16SchemaTest() {
+ @Test(dataProvider = "useOptionalPrimitives")
+ public void int16SchemaTest(boolean useOptionalPrimitives) {
org.apache.kafka.connect.data.Schema kafkaSchema =
- PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT16);
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT16,
useOptionalPrimitives);
assertEquals(kafkaSchema.type(),
org.apache.kafka.connect.data.Schema.Type.INT16);
+ assertEquals(useOptionalPrimitives, kafkaSchema.isOptional());
}
- @Test
- public void int32SchemaTest() {
+ @Test(dataProvider = "useOptionalPrimitives")
+ public void int32SchemaTest(boolean useOptionalPrimitives) {
org.apache.kafka.connect.data.Schema kafkaSchema =
- PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT32);
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT32,
useOptionalPrimitives);
assertEquals(kafkaSchema.type(),
org.apache.kafka.connect.data.Schema.Type.INT32);
+ assertEquals(useOptionalPrimitives, kafkaSchema.isOptional());
}
- @Test
- public void int64SchemaTest() {
+ @Test(dataProvider = "useOptionalPrimitives")
+ public void int64SchemaTest(boolean useOptionalPrimitives) {
org.apache.kafka.connect.data.Schema kafkaSchema =
- PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT64);
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT64,
useOptionalPrimitives);
assertEquals(kafkaSchema.type(),
org.apache.kafka.connect.data.Schema.Type.INT64);
+ assertEquals(useOptionalPrimitives, kafkaSchema.isOptional());
}
- @Test
- public void float32SchemaTest() {
+ @Test(dataProvider = "useOptionalPrimitives")
+ public void float32SchemaTest(boolean useOptionalPrimitives) {
org.apache.kafka.connect.data.Schema kafkaSchema =
- PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.FLOAT);
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.FLOAT,
useOptionalPrimitives);
assertEquals(kafkaSchema.type(),
org.apache.kafka.connect.data.Schema.Type.FLOAT32);
+ assertEquals(useOptionalPrimitives, kafkaSchema.isOptional());
}
- @Test
- public void float64SchemaTest() {
+ @Test(dataProvider = "useOptionalPrimitives")
+ public void float64SchemaTest(boolean useOptionalPrimitives) {
org.apache.kafka.connect.data.Schema kafkaSchema =
- PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.DOUBLE);
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.DOUBLE,
useOptionalPrimitives);
assertEquals(kafkaSchema.type(),
org.apache.kafka.connect.data.Schema.Type.FLOAT64);
+ assertEquals(useOptionalPrimitives, kafkaSchema.isOptional());
}
- @Test
- public void kvBytesSchemaTest() {
+ @Test(dataProvider = "useOptionalPrimitives")
+ public void kvBytesSchemaTest(boolean useOptionalPrimitives) {
org.apache.kafka.connect.data.Schema kafkaSchema =
-
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.KV_BYTES());
+
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.KV_BYTES(),
useOptionalPrimitives);
assertEquals(kafkaSchema.type(),
org.apache.kafka.connect.data.Schema.Type.MAP);
assertEquals(kafkaSchema.keySchema().type(),
org.apache.kafka.connect.data.Schema.Type.BYTES);
assertEquals(kafkaSchema.valueSchema().type(),
org.apache.kafka.connect.data.Schema.Type.BYTES);
+ assertTrue(kafkaSchema.isOptional());
+
+ // key and value are always optional
+ assertTrue(kafkaSchema.keySchema().isOptional());
+ assertTrue(kafkaSchema.valueSchema().isOptional());
}
@Test
public void kvBytesIntSchemaTests() {
Schema pulsarKvSchema = KeyValueSchemaImpl.of(Schema.STRING,
Schema.INT64);
org.apache.kafka.connect.data.Schema kafkaSchema =
-
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarKvSchema);
+
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarKvSchema, false);
assertEquals(kafkaSchema.type(),
org.apache.kafka.connect.data.Schema.Type.MAP);
assertEquals(kafkaSchema.keySchema().type(),
org.apache.kafka.connect.data.Schema.Type.STRING);
assertEquals(kafkaSchema.valueSchema().type(),
org.apache.kafka.connect.data.Schema.Type.INT64);
+ assertTrue(kafkaSchema.isOptional());
+
+ // key and value are always optional
+ assertTrue(kafkaSchema.keySchema().isOptional());
+ assertTrue(kafkaSchema.valueSchema().isOptional());
}
@Test
public void avroSchemaTest() {
AvroSchema<StructWithAnnotations> pulsarAvroSchema =
AvroSchema.of(StructWithAnnotations.class);
org.apache.kafka.connect.data.Schema kafkaSchema =
-
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema);
+
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema, false);
+ org.apache.kafka.connect.data.Schema kafkaSchemaOpt =
+
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema, true);
assertEquals(kafkaSchema.type(),
org.apache.kafka.connect.data.Schema.Type.STRUCT);
assertEquals(kafkaSchema.fields().size(), STRUCT_FIELDS.size());
for (String name: STRUCT_FIELDS) {
assertEquals(kafkaSchema.field(name).name(), name);
+ // set by avro schema
+ assertEquals(kafkaSchema.field(name).schema().isOptional(),
+ kafkaSchemaOpt.field(name).schema().isOptional());
}
}
@@ -234,11 +270,16 @@ public class PulsarSchemaToKafkaSchemaTest {
public void avroComplexSchemaTest() {
AvroSchema<ComplexStruct> pulsarAvroSchema =
AvroSchema.of(ComplexStruct.class);
org.apache.kafka.connect.data.Schema kafkaSchema =
-
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema);
+
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema, false);
+ org.apache.kafka.connect.data.Schema kafkaSchemaOpt =
+
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema, true);
assertEquals(kafkaSchema.type(),
org.apache.kafka.connect.data.Schema.Type.STRUCT);
assertEquals(kafkaSchema.fields().size(),
COMPLEX_STRUCT_FIELDS.size());
for (String name: COMPLEX_STRUCT_FIELDS) {
assertEquals(kafkaSchema.field(name).name(), name);
+ // set by avro schema
+ assertEquals(kafkaSchema.field(name).schema().isOptional(),
+ kafkaSchemaOpt.field(name).schema().isOptional());
}
}
@@ -250,11 +291,16 @@ public class PulsarSchemaToKafkaSchemaTest {
.withAlwaysAllowNull(false)
.build());
org.apache.kafka.connect.data.Schema kafkaSchema =
- PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema);
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema,
false);
+ org.apache.kafka.connect.data.Schema kafkaSchemaOpt =
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema,
true);
assertEquals(kafkaSchema.type(),
org.apache.kafka.connect.data.Schema.Type.STRUCT);
assertEquals(kafkaSchema.fields().size(), STRUCT_FIELDS.size());
for (String name: STRUCT_FIELDS) {
assertEquals(kafkaSchema.field(name).name(), name);
+ // set by schema
+ assertEquals(kafkaSchema.field(name).schema().isOptional(),
+ kafkaSchemaOpt.field(name).schema().isOptional());
}
}
@@ -266,11 +312,27 @@ public class PulsarSchemaToKafkaSchemaTest {
.withAlwaysAllowNull(false)
.build());
org.apache.kafka.connect.data.Schema kafkaSchema =
- PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema);
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema,
false);
assertEquals(kafkaSchema.type(),
org.apache.kafka.connect.data.Schema.Type.STRUCT);
assertEquals(kafkaSchema.fields().size(),
COMPLEX_STRUCT_FIELDS.size());
for (String name: COMPLEX_STRUCT_FIELDS) {
assertEquals(kafkaSchema.field(name).name(), name);
+ assertFalse(kafkaSchema.field(name).schema().isOptional());
+ }
+
+ kafkaSchema =
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema,
true);
+ assertEquals(kafkaSchema.type(),
org.apache.kafka.connect.data.Schema.Type.STRUCT);
+ assertEquals(kafkaSchema.fields().size(),
COMPLEX_STRUCT_FIELDS.size());
+ for (String name: COMPLEX_STRUCT_FIELDS) {
+ assertEquals(kafkaSchema.field(name).name(), name);
+ assertFalse(kafkaSchema.field(name).schema().isOptional());
+
+ if (kafkaSchema.field(name).schema().type().isPrimitive()) {
+ // false because .withAlwaysAllowNull(false), avroschema
values are used
+ assertFalse(kafkaSchema.field(name).schema().isOptional(),
+ kafkaSchema.field(name).schema().type().getName());
+ }
}
}
@@ -308,39 +370,40 @@ public class PulsarSchemaToKafkaSchemaTest {
@Test
public void dateSchemaTest() {
org.apache.kafka.connect.data.Schema kafkaSchema =
- PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.DATE);
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.DATE,
true);
assertEquals(kafkaSchema.type(), Date.SCHEMA.type());
+ assertFalse(kafkaSchema.isOptional());
}
// not supported schemas below:
@Test(expectedExceptions = IllegalStateException.class)
public void timeSchemaTest() {
- PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.TIME);
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.TIME, false);
}
@Test(expectedExceptions = IllegalStateException.class)
public void timestampSchemaTest() {
- PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.TIMESTAMP);
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.TIMESTAMP,
false);
}
@Test(expectedExceptions = IllegalStateException.class)
public void instantSchemaTest() {
- PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INSTANT);
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INSTANT, false);
}
@Test(expectedExceptions = IllegalStateException.class)
public void localDateSchemaTest() {
- PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_DATE);
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_DATE,
false);
}
@Test(expectedExceptions = IllegalStateException.class)
public void localTimeSchemaTest() {
- PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_TIME);
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_TIME,
false);
}
@Test(expectedExceptions = IllegalStateException.class)
public void localDatetimeSchemaTest() {
-
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_DATE_TIME);
+
PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_DATE_TIME, false);
}
}