This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 1639ae2a0ec [improve][io]: Add validation for JDBC sink not supporting 
primitive schema (#22376)
1639ae2a0ec is described below

commit 1639ae2a0ec34ce475bb813f629a3ce97a3c5e14
Author: Baodi Shi <ba...@apache.org>
AuthorDate: Fri Mar 29 08:33:27 2024 +0800

    [improve][io]: Add validation for JDBC sink not supporting primitive schema 
(#22376)
---
 pulsar-io/jdbc/core/pom.xml                        |  7 +++++
 .../pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java     |  5 ++++
 .../pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java | 25 +++++++++++++++++
 .../apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java  | 31 +++++++++++++++-------
 4 files changed, 59 insertions(+), 9 deletions(-)

diff --git a/pulsar-io/jdbc/core/pom.xml b/pulsar-io/jdbc/core/pom.xml
index e1f15332a6c..7617e221105 100644
--- a/pulsar-io/jdbc/core/pom.xml
+++ b/pulsar-io/jdbc/core/pom.xml
@@ -71,6 +71,13 @@
       <scope>provided</scope>
     </dependency>
 
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-original</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
index 36c36740919..3655688c0f3 100644
--- 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
+++ 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.KeyValueSchema;
 import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;
 
@@ -137,6 +138,10 @@ public abstract class BaseJdbcAutoSchemaSink extends 
JdbcAbstractSink<GenericObj
             }
             recordValueGetter = (k) -> data.get(k);
         } else {
+            SchemaType schemaType = 
message.getSchema().getSchemaInfo().getType();
+            if (schemaType.isPrimitive()) {
+                throw new UnsupportedOperationException("Primitive schema is 
not supported: " + schemaType);
+            }
             recordValueGetter = (key) -> ((GenericRecord) 
record).getField(key);
         }
         String action = message.getProperties().get(ACTION_PROPERTY);
diff --git 
a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
 
b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
index b15eb832242..c088dd3c42c 100644
--- 
a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
+++ 
b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
@@ -22,6 +22,10 @@ import java.util.function.Function;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.util.Utf8;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.functions.api.Record;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -143,5 +147,26 @@ public class BaseJdbcAutoSchemaSinkTest {
         return consumer.apply(record).endRecord().getFields().get(0).schema();
     }
 
+    @Test(expectedExceptions = UnsupportedOperationException.class,
+            expectedExceptionsMessageRegExp = "Primitive schema is not 
supported.*")
+    @SuppressWarnings("unchecked")
+    public void testNotSupportPrimitiveSchema() {
+        BaseJdbcAutoSchemaSink baseJdbcAutoSchemaSink = new 
BaseJdbcAutoSchemaSink() {};
+        AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
+        
autoConsumeSchema.setSchema(org.apache.pulsar.client.api.Schema.STRING);
+        Record<? extends GenericObject> record = new Record<GenericRecord>() {
+            @Override
+            public org.apache.pulsar.client.api.Schema<GenericRecord> 
getSchema() {
+                return autoConsumeSchema;
+            }
+
+            @Override
+            public GenericRecord getValue() {
+                return null;
+            }
+        };
+        baseJdbcAutoSchemaSink.createMutation((Record<GenericObject>) record);
+    }
+
 
 }
\ No newline at end of file
diff --git 
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
 
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
index d9ed4cbd442..ca01615bef1 100644
--- 
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
+++ 
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
 import org.apache.pulsar.common.schema.KeyValue;
@@ -282,9 +283,12 @@ public class SqliteJdbcSinkTest {
     }
 
     @Test
+    @SuppressWarnings("unchecked")
     public void TestUpdateAction() throws Exception {
 
         AvroSchema<Foo> schema = 
AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
+        autoConsumeSchema.setSchema(schema);
 
         Foo updateObj = new Foo();
         updateObj.setField1("ValueOfField3");
@@ -292,10 +296,11 @@ public class SqliteJdbcSinkTest {
         updateObj.setField3(4);
 
         byte[] updateBytes = schema.encode(updateObj);
-        Message<GenericObject> updateMessage = mock(MessageImpl.class);
+        Message<GenericRecord> updateMessage = mock(MessageImpl.class);
         CompletableFuture<Boolean> future = new CompletableFuture<>();
-        Record<GenericObject> updateRecord = 
PulsarRecord.<GenericObject>builder()
+        Record<? extends GenericObject> updateRecord = 
PulsarRecord.<GenericRecord>builder()
                 .message(updateMessage)
+                .schema(autoConsumeSchema)
                 .topicName("fake_topic_name")
                 .ackFunction(() -> future.complete(null))
                 .build();
@@ -312,7 +317,7 @@ public class SqliteJdbcSinkTest {
                 updateMessage.getValue().toString(),
                 updateRecord.getValue().toString());
 
-        jdbcSink.write(updateRecord);
+        jdbcSink.write((Record<GenericObject>) updateRecord);
         future.get(1, TimeUnit.SECONDS);
 
         // value has been written to db, read it out and verify.
@@ -325,18 +330,22 @@ public class SqliteJdbcSinkTest {
     }
 
     @Test
+    @SuppressWarnings("unchecked")
     public void TestDeleteAction() throws Exception {
 
         AvroSchema<Foo> schema = 
AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
+        autoConsumeSchema.setSchema(schema);
 
         Foo deleteObj = new Foo();
         deleteObj.setField3(5);
 
         byte[] deleteBytes = schema.encode(deleteObj);
-        Message<GenericObject> deleteMessage = mock(MessageImpl.class);
+        Message<GenericRecord> deleteMessage = mock(MessageImpl.class);
         CompletableFuture<Boolean> future = new CompletableFuture<>();
-        Record<GenericObject> deleteRecord = 
PulsarRecord.<GenericObject>builder()
+        Record<? extends GenericObject> deleteRecord = 
PulsarRecord.<GenericRecord>builder()
                 .message(deleteMessage)
+                .schema(autoConsumeSchema)
                 .topicName("fake_topic_name")
                 .ackFunction(() -> future.complete(null))
                 .build();
@@ -352,7 +361,7 @@ public class SqliteJdbcSinkTest {
                 deleteMessage.getValue().toString(),
                 deleteRecord.getValue().toString());
 
-        jdbcSink.write(deleteRecord);
+        jdbcSink.write((Record<GenericObject>) deleteRecord);
         future.get(1, TimeUnit.SECONDS);
 
         // value has been written to db, read it out and verify.
@@ -848,17 +857,21 @@ public class SqliteJdbcSinkTest {
         }
     }
 
+    @SuppressWarnings("unchecked")
     private Record<GenericObject> createMockFooRecord(Foo record, Map<String, 
String> actionProperties,
                                                         
CompletableFuture<Boolean> future) {
-        Message<GenericObject> insertMessage = mock(MessageImpl.class);
+        Message<GenericRecord> insertMessage = mock(MessageImpl.class);
         GenericSchema<GenericRecord> genericAvroSchema;
         AvroSchema<Foo> schema = 
AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(true).build());
+        AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
+        autoConsumeSchema.setSchema(schema);
 
         byte[] insertBytes = schema.encode(record);
 
-        Record<GenericObject> insertRecord = 
PulsarRecord.<GenericObject>builder()
+        Record<? extends GenericObject> insertRecord = 
PulsarRecord.<GenericRecord>builder()
                 .message(insertMessage)
                 .topicName("fake_topic_name")
+                .schema(autoConsumeSchema)
                 .ackFunction(() -> future.complete(true))
                 .failFunction(() -> future.complete(false))
                 .build();
@@ -866,7 +879,7 @@ public class SqliteJdbcSinkTest {
         genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
         
when(insertMessage.getValue()).thenReturn(genericAvroSchema.decode(insertBytes));
         when(insertMessage.getProperties()).thenReturn(actionProperties);
-        return insertRecord;
+        return (Record<GenericObject>) insertRecord;
     }
 
 }

Reply via email to