This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new a1cb0fd63db [improve][io] The JDBC connector supports JSON 
substructure schema (#23043)
a1cb0fd63db is described below

commit a1cb0fd63db50268a14d6bb8bbc552e3b2f34944
Author: Baodi Shi <[email protected]>
AuthorDate: Sun Jul 21 12:11:06 2024 +0800

    [improve][io] The JDBC connector supports JSON substructure schema (#23043)
    
    (cherry picked from commit d08e2e08b43e86f27f8206c7e189234163bfd795)
---
 pulsar-io/jdbc/core/pom.xml                        | 13 ++---
 .../pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java     |  5 +-
 .../pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java | 68 ++++++++++++++++++++++
 3 files changed, 78 insertions(+), 8 deletions(-)

diff --git a/pulsar-io/jdbc/core/pom.xml b/pulsar-io/jdbc/core/pom.xml
index 028b635338d..35f8bfed430 100644
--- a/pulsar-io/jdbc/core/pom.xml
+++ b/pulsar-io/jdbc/core/pom.xml
@@ -44,6 +44,12 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-original</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.avro</groupId>
       <artifactId>avro</artifactId>
@@ -71,13 +77,6 @@
       <scope>provided</scope>
     </dependency>
 
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-client-original</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
-
   </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
index 3655688c0f3..c1f44cf37ef 100644
--- 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
+++ 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
@@ -33,6 +33,7 @@ import org.apache.avro.Schema;
 import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
@@ -173,7 +174,7 @@ public abstract class BaseJdbcAutoSchemaSink extends 
JdbcAbstractSink<GenericObj
 
     }
 
-    private static void setColumnValue(PreparedStatement statement, int index, 
Object value) throws Exception {
+    protected void setColumnValue(PreparedStatement statement, int index, 
Object value) throws Exception {
 
         log.debug("Setting column value, statement: {}, index: {}, value: {}", 
statement, index, value);
 
@@ -193,6 +194,8 @@ public abstract class BaseJdbcAutoSchemaSink extends 
JdbcAbstractSink<GenericObj
             statement.setShort(index, (Short) value);
         } else if (value instanceof ByteString) {
             statement.setBytes(index, ((ByteString) value).toByteArray());
+        } else if (value instanceof GenericJsonRecord) {
+            statement.setString(index, ((GenericJsonRecord) 
value).getJsonNode().toString());
         } else {
             throw new Exception("Not supported value type, need to add it. " + 
value.getClass());
         }
diff --git 
a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
 
b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
index c088dd3c42c..8cb6219deb8 100644
--- 
a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
+++ 
b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
@@ -18,13 +18,23 @@
  */
 package org.apache.pulsar.io.jdbc;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import java.lang.reflect.Field;
+import java.sql.PreparedStatement;
+import java.util.Arrays;
+import java.util.List;
 import java.util.function.Function;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.util.Utf8;
 import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
 import org.apache.pulsar.functions.api.Record;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -169,4 +179,62 @@ public class BaseJdbcAutoSchemaSinkTest {
     }
 
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testSubFieldJsonArray() throws Exception {
+        BaseJdbcAutoSchemaSink baseJdbcAutoSchemaSink = new 
BaseJdbcAutoSchemaSink() {};
+
+        Field field = 
JdbcAbstractSink.class.getDeclaredField("jdbcSinkConfig");
+        field.setAccessible(true);
+        JdbcSinkConfig jdbcSinkConfig = new JdbcSinkConfig();
+        jdbcSinkConfig.setNullValueAction(JdbcSinkConfig.NullValueAction.FAIL);
+        field.set(baseJdbcAutoSchemaSink, jdbcSinkConfig);
+
+        TStates tStates = new TStates("tstats", Arrays.asList(
+                new PC("brand1", "model1"),
+                new PC("brand2", "model2")
+        ));
+        org.apache.pulsar.client.api.Schema<TStates> jsonSchema = 
org.apache.pulsar.client.api.Schema.JSON(TStates.class);
+        GenericJsonSchema genericJsonSchema = new 
GenericJsonSchema(jsonSchema.getSchemaInfo());
+        byte[] encode = jsonSchema.encode(tStates);
+        GenericRecord genericRecord = genericJsonSchema.decode(encode);
+
+        AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
+        
autoConsumeSchema.setSchema(org.apache.pulsar.client.api.Schema.JSON(TStates.class));
+        Record<? extends GenericObject> record = new Record<GenericRecord>() {
+            @Override
+            public org.apache.pulsar.client.api.Schema<GenericRecord> 
getSchema() {
+                return genericJsonSchema;
+            }
+
+            @Override
+            public GenericRecord getValue() {
+                return genericRecord;
+            }
+        };
+        JdbcAbstractSink.Mutation mutation = 
baseJdbcAutoSchemaSink.createMutation((Record<GenericObject>) record);
+        PreparedStatement mockPreparedStatement = 
mock(PreparedStatement.class);
+        baseJdbcAutoSchemaSink.setColumnValue(mockPreparedStatement, 0, 
mutation.getValues().apply("state"));
+        baseJdbcAutoSchemaSink.setColumnValue(mockPreparedStatement, 1, 
mutation.getValues().apply("pcList"));
+        verify(mockPreparedStatement).setString(0, "tstats");
+        verify(mockPreparedStatement).setString(1, 
"[{\"brand\":\"brand1\",\"model\":\"model1\"},{\"brand\":\"brand2\",\"model\":\"model2\"}]");
+    }
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    private static class TStates {
+        public String state;
+        public List<PC> pcList;
+    }
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    private static class PC {
+        public String brand;
+        public String model;
+    }
+
+
 }
\ No newline at end of file

Reply via email to