This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new a503efc826e [improve][io]: Add validation for JDBC sink not supporting primitive schema (#22376) a503efc826e is described below commit a503efc826e60b8e26f9792aeb45223374b8f4ca Author: Baodi Shi <ba...@apache.org> AuthorDate: Fri Mar 29 08:33:27 2024 +0800 [improve][io]: Add validation for JDBC sink not supporting primitive schema (#22376) --- pulsar-io/jdbc/core/pom.xml | 7 +++++ .../pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java | 5 ++++ .../pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java | 25 +++++++++++++++++ .../apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java | 31 +++++++++++++++------- 4 files changed, 59 insertions(+), 9 deletions(-) diff --git a/pulsar-io/jdbc/core/pom.xml b/pulsar-io/jdbc/core/pom.xml index 3f44a062fb9..0322f6e11f1 100644 --- a/pulsar-io/jdbc/core/pom.xml +++ b/pulsar-io/jdbc/core/pom.xml @@ -71,6 +71,13 @@ <scope>provided</scope> </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-client-original</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> </project> \ No newline at end of file diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java index 36c36740919..3655688c0f3 100644 --- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java +++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java @@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.schema.GenericObject; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.KeyValueSchema; import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId; @@ -137,6 +138,10 @@ public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink<GenericObj } recordValueGetter = (k) -> data.get(k); } else { + SchemaType schemaType = message.getSchema().getSchemaInfo().getType(); + if (schemaType.isPrimitive()) { + throw new UnsupportedOperationException("Primitive schema is not supported: " + schemaType); + } recordValueGetter = (key) -> ((GenericRecord) record).getField(key); } String action = message.getProperties().get(ACTION_PROPERTY); diff --git a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java index b15eb832242..c088dd3c42c 100644 --- a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java +++ b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java @@ -22,6 +22,10 @@ import java.util.function.Function; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.util.Utf8; +import org.apache.pulsar.client.api.schema.GenericObject; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; +import org.apache.pulsar.functions.api.Record; import org.testng.Assert; import org.testng.annotations.Test; @@ -143,5 +147,26 @@ public class BaseJdbcAutoSchemaSinkTest { return consumer.apply(record).endRecord().getFields().get(0).schema(); } + @Test(expectedExceptions = UnsupportedOperationException.class, + expectedExceptionsMessageRegExp = "Primitive schema is not supported.*") + @SuppressWarnings("unchecked") + public void testNotSupportPrimitiveSchema() { + BaseJdbcAutoSchemaSink baseJdbcAutoSchemaSink = new BaseJdbcAutoSchemaSink() {}; + AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema(); + autoConsumeSchema.setSchema(org.apache.pulsar.client.api.Schema.STRING); + Record<? extends GenericObject> record = new Record<GenericRecord>() { + @Override + public org.apache.pulsar.client.api.Schema<GenericRecord> getSchema() { + return autoConsumeSchema; + } + + @Override + public GenericRecord getValue() { + return null; + } + }; + baseJdbcAutoSchemaSink.createMutation((Record<GenericObject>) record); + } + } \ No newline at end of file diff --git a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java index d9ed4cbd442..ca01615bef1 100644 --- a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java +++ b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java @@ -48,6 +48,7 @@ import org.apache.pulsar.client.api.schema.GenericSchema; import org.apache.pulsar.client.api.schema.RecordSchemaBuilder; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; import org.apache.pulsar.common.schema.KeyValue; @@ -282,9 +283,12 @@ public class SqliteJdbcSinkTest { } @Test + @SuppressWarnings("unchecked") public void TestUpdateAction() throws Exception { AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build()); + AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema(); + autoConsumeSchema.setSchema(schema); Foo updateObj = new Foo(); updateObj.setField1("ValueOfField3"); @@ -292,10 +296,11 @@ public class SqliteJdbcSinkTest { updateObj.setField3(4); byte[] updateBytes = schema.encode(updateObj); - Message<GenericObject> updateMessage = mock(MessageImpl.class); + Message<GenericRecord> updateMessage = mock(MessageImpl.class); CompletableFuture<Boolean> future = new CompletableFuture<>(); - Record<GenericObject> updateRecord = PulsarRecord.<GenericObject>builder() + Record<? extends GenericObject> updateRecord = PulsarRecord.<GenericRecord>builder() .message(updateMessage) + .schema(autoConsumeSchema) .topicName("fake_topic_name") .ackFunction(() -> future.complete(null)) .build(); @@ -312,7 +317,7 @@ public class SqliteJdbcSinkTest { updateMessage.getValue().toString(), updateRecord.getValue().toString()); - jdbcSink.write(updateRecord); + jdbcSink.write((Record<GenericObject>) updateRecord); future.get(1, TimeUnit.SECONDS); // value has been written to db, read it out and verify. @@ -325,18 +330,22 @@ public class SqliteJdbcSinkTest { } @Test + @SuppressWarnings("unchecked") public void TestDeleteAction() throws Exception { AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build()); + AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema(); + autoConsumeSchema.setSchema(schema); Foo deleteObj = new Foo(); deleteObj.setField3(5); byte[] deleteBytes = schema.encode(deleteObj); - Message<GenericObject> deleteMessage = mock(MessageImpl.class); + Message<GenericRecord> deleteMessage = mock(MessageImpl.class); CompletableFuture<Boolean> future = new CompletableFuture<>(); - Record<GenericObject> deleteRecord = PulsarRecord.<GenericObject>builder() + Record<? extends GenericObject> deleteRecord = PulsarRecord.<GenericRecord>builder() .message(deleteMessage) + .schema(autoConsumeSchema) .topicName("fake_topic_name") .ackFunction(() -> future.complete(null)) .build(); @@ -352,7 +361,7 @@ public class SqliteJdbcSinkTest { deleteMessage.getValue().toString(), deleteRecord.getValue().toString()); - jdbcSink.write(deleteRecord); + jdbcSink.write((Record<GenericObject>) deleteRecord); future.get(1, TimeUnit.SECONDS); // value has been written to db, read it out and verify. @@ -848,17 +857,21 @@ public class SqliteJdbcSinkTest { } } + @SuppressWarnings("unchecked") private Record<GenericObject> createMockFooRecord(Foo record, Map<String, String> actionProperties, CompletableFuture<Boolean> future) { - Message<GenericObject> insertMessage = mock(MessageImpl.class); + Message<GenericRecord> insertMessage = mock(MessageImpl.class); GenericSchema<GenericRecord> genericAvroSchema; AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(true).build()); + AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema(); + autoConsumeSchema.setSchema(schema); byte[] insertBytes = schema.encode(record); - Record<GenericObject> insertRecord = PulsarRecord.<GenericObject>builder() + Record<? extends GenericObject> insertRecord = PulsarRecord.<GenericRecord>builder() .message(insertMessage) .topicName("fake_topic_name") + .schema(autoConsumeSchema) .ackFunction(() -> future.complete(true)) .failFunction(() -> future.complete(false)) .build(); @@ -866,7 +879,7 @@ public class SqliteJdbcSinkTest { genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo()); when(insertMessage.getValue()).thenReturn(genericAvroSchema.decode(insertBytes)); when(insertMessage.getProperties()).thenReturn(actionProperties); - return insertRecord; + return (Record<GenericObject>) insertRecord; } }