This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a3bf4e8a42c [improve][io]: Add validation for JDBC sink not supporting
primitive schema (#22376)
a3bf4e8a42c is described below
commit a3bf4e8a42c84a0ee5b4c45b50d48daed0b3de0c
Author: Baodi Shi <[email protected]>
AuthorDate: Fri Mar 29 08:33:27 2024 +0800
[improve][io]: Add validation for JDBC sink not supporting primitive schema
(#22376)
---
pulsar-io/jdbc/core/pom.xml | 7 +++++
.../pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java | 5 ++++
.../pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java | 25 +++++++++++++++++
.../apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java | 31 +++++++++++++++-------
4 files changed, 59 insertions(+), 9 deletions(-)
diff --git a/pulsar-io/jdbc/core/pom.xml b/pulsar-io/jdbc/core/pom.xml
index 586307e8b86..0232a699680 100644
--- a/pulsar-io/jdbc/core/pom.xml
+++ b/pulsar-io/jdbc/core/pom.xml
@@ -71,6 +71,13 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-original</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
\ No newline at end of file
diff --git
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
index 36c36740919..3655688c0f3 100644
---
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
+++
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;
@@ -137,6 +138,10 @@ public abstract class BaseJdbcAutoSchemaSink extends
JdbcAbstractSink<GenericObj
}
recordValueGetter = (k) -> data.get(k);
} else {
+ SchemaType schemaType =
message.getSchema().getSchemaInfo().getType();
+ if (schemaType.isPrimitive()) {
+ throw new UnsupportedOperationException("Primitive schema is
not supported: " + schemaType);
+ }
recordValueGetter = (key) -> ((GenericRecord)
record).getField(key);
}
String action = message.getProperties().get(ACTION_PROPERTY);
diff --git
a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
index b15eb832242..c088dd3c42c 100644
---
a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
+++
b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
@@ -22,6 +22,10 @@ import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.util.Utf8;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.functions.api.Record;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -143,5 +147,26 @@ public class BaseJdbcAutoSchemaSinkTest {
return consumer.apply(record).endRecord().getFields().get(0).schema();
}
+ @Test(expectedExceptions = UnsupportedOperationException.class,
+ expectedExceptionsMessageRegExp = "Primitive schema is not
supported.*")
+ @SuppressWarnings("unchecked")
+ public void testNotSupportPrimitiveSchema() {
+ BaseJdbcAutoSchemaSink baseJdbcAutoSchemaSink = new
BaseJdbcAutoSchemaSink() {};
+ AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
+
autoConsumeSchema.setSchema(org.apache.pulsar.client.api.Schema.STRING);
+ Record<? extends GenericObject> record = new Record<GenericRecord>() {
+ @Override
+ public org.apache.pulsar.client.api.Schema<GenericRecord>
getSchema() {
+ return autoConsumeSchema;
+ }
+
+ @Override
+ public GenericRecord getValue() {
+ return null;
+ }
+ };
+ baseJdbcAutoSchemaSink.createMutation((Record<GenericObject>) record);
+ }
+
}
\ No newline at end of file
diff --git
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
index d9ed4cbd442..ca01615bef1 100644
---
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
+++
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.KeyValue;
@@ -282,9 +283,12 @@ public class SqliteJdbcSinkTest {
}
@Test
+ @SuppressWarnings("unchecked")
public void TestUpdateAction() throws Exception {
AvroSchema<Foo> schema =
AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
+ autoConsumeSchema.setSchema(schema);
Foo updateObj = new Foo();
updateObj.setField1("ValueOfField3");
@@ -292,10 +296,11 @@ public class SqliteJdbcSinkTest {
updateObj.setField3(4);
byte[] updateBytes = schema.encode(updateObj);
- Message<GenericObject> updateMessage = mock(MessageImpl.class);
+ Message<GenericRecord> updateMessage = mock(MessageImpl.class);
CompletableFuture<Boolean> future = new CompletableFuture<>();
- Record<GenericObject> updateRecord =
PulsarRecord.<GenericObject>builder()
+ Record<? extends GenericObject> updateRecord =
PulsarRecord.<GenericRecord>builder()
.message(updateMessage)
+ .schema(autoConsumeSchema)
.topicName("fake_topic_name")
.ackFunction(() -> future.complete(null))
.build();
@@ -312,7 +317,7 @@ public class SqliteJdbcSinkTest {
updateMessage.getValue().toString(),
updateRecord.getValue().toString());
- jdbcSink.write(updateRecord);
+ jdbcSink.write((Record<GenericObject>) updateRecord);
future.get(1, TimeUnit.SECONDS);
// value has been written to db, read it out and verify.
@@ -325,18 +330,22 @@ public class SqliteJdbcSinkTest {
}
@Test
+ @SuppressWarnings("unchecked")
public void TestDeleteAction() throws Exception {
AvroSchema<Foo> schema =
AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
+ autoConsumeSchema.setSchema(schema);
Foo deleteObj = new Foo();
deleteObj.setField3(5);
byte[] deleteBytes = schema.encode(deleteObj);
- Message<GenericObject> deleteMessage = mock(MessageImpl.class);
+ Message<GenericRecord> deleteMessage = mock(MessageImpl.class);
CompletableFuture<Boolean> future = new CompletableFuture<>();
- Record<GenericObject> deleteRecord =
PulsarRecord.<GenericObject>builder()
+ Record<? extends GenericObject> deleteRecord =
PulsarRecord.<GenericRecord>builder()
.message(deleteMessage)
+ .schema(autoConsumeSchema)
.topicName("fake_topic_name")
.ackFunction(() -> future.complete(null))
.build();
@@ -352,7 +361,7 @@ public class SqliteJdbcSinkTest {
deleteMessage.getValue().toString(),
deleteRecord.getValue().toString());
- jdbcSink.write(deleteRecord);
+ jdbcSink.write((Record<GenericObject>) deleteRecord);
future.get(1, TimeUnit.SECONDS);
// value has been written to db, read it out and verify.
@@ -848,17 +857,21 @@ public class SqliteJdbcSinkTest {
}
}
+ @SuppressWarnings("unchecked")
private Record<GenericObject> createMockFooRecord(Foo record, Map<String,
String> actionProperties,
CompletableFuture<Boolean> future) {
- Message<GenericObject> insertMessage = mock(MessageImpl.class);
+ Message<GenericRecord> insertMessage = mock(MessageImpl.class);
GenericSchema<GenericRecord> genericAvroSchema;
AvroSchema<Foo> schema =
AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(true).build());
+ AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
+ autoConsumeSchema.setSchema(schema);
byte[] insertBytes = schema.encode(record);
- Record<GenericObject> insertRecord =
PulsarRecord.<GenericObject>builder()
+ Record<? extends GenericObject> insertRecord =
PulsarRecord.<GenericRecord>builder()
.message(insertMessage)
.topicName("fake_topic_name")
+ .schema(autoConsumeSchema)
.ackFunction(() -> future.complete(true))
.failFunction(() -> future.complete(false))
.build();
@@ -866,7 +879,7 @@ public class SqliteJdbcSinkTest {
genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
when(insertMessage.getValue()).thenReturn(genericAvroSchema.decode(insertBytes));
when(insertMessage.getProperties()).thenReturn(actionProperties);
- return insertRecord;
+ return (Record<GenericObject>) insertRecord;
}
}