This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new a1cb0fd63db [improve][io] The JDBC connector supports JSON
substructure schema (#23043)
a1cb0fd63db is described below
commit a1cb0fd63db50268a14d6bb8bbc552e3b2f34944
Author: Baodi Shi <[email protected]>
AuthorDate: Sun Jul 21 12:11:06 2024 +0800
[improve][io] The JDBC connector supports JSON substructure schema (#23043)
(cherry picked from commit d08e2e08b43e86f27f8206c7e189234163bfd795)
---
pulsar-io/jdbc/core/pom.xml | 13 ++---
.../pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java | 5 +-
.../pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java | 68 ++++++++++++++++++++++
3 files changed, 78 insertions(+), 8 deletions(-)
diff --git a/pulsar-io/jdbc/core/pom.xml b/pulsar-io/jdbc/core/pom.xml
index 028b635338d..35f8bfed430 100644
--- a/pulsar-io/jdbc/core/pom.xml
+++ b/pulsar-io/jdbc/core/pom.xml
@@ -44,6 +44,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-original</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
@@ -71,13 +77,6 @@
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-client-original</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
</dependencies>
</project>
\ No newline at end of file
diff --git
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
index 3655688c0f3..c1f44cf37ef 100644
---
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
+++
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
@@ -33,6 +33,7 @@ import org.apache.avro.Schema;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
@@ -173,7 +174,7 @@ public abstract class BaseJdbcAutoSchemaSink extends
JdbcAbstractSink<GenericObj
}
- private static void setColumnValue(PreparedStatement statement, int index,
Object value) throws Exception {
+ protected void setColumnValue(PreparedStatement statement, int index,
Object value) throws Exception {
log.debug("Setting column value, statement: {}, index: {}, value: {}",
statement, index, value);
@@ -193,6 +194,8 @@ public abstract class BaseJdbcAutoSchemaSink extends
JdbcAbstractSink<GenericObj
statement.setShort(index, (Short) value);
} else if (value instanceof ByteString) {
statement.setBytes(index, ((ByteString) value).toByteArray());
+ } else if (value instanceof GenericJsonRecord) {
+ statement.setString(index, ((GenericJsonRecord)
value).getJsonNode().toString());
} else {
throw new Exception("Not supported value type, need to add it. " +
value.getClass());
}
diff --git
a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
index c088dd3c42c..8cb6219deb8 100644
---
a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
+++
b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
@@ -18,13 +18,23 @@
*/
package org.apache.pulsar.io.jdbc;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import java.lang.reflect.Field;
+import java.sql.PreparedStatement;
+import java.util.Arrays;
+import java.util.List;
import java.util.function.Function;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.util.Utf8;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
import org.apache.pulsar.functions.api.Record;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -169,4 +179,62 @@ public class BaseJdbcAutoSchemaSinkTest {
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testSubFieldJsonArray() throws Exception {
+ BaseJdbcAutoSchemaSink baseJdbcAutoSchemaSink = new
BaseJdbcAutoSchemaSink() {};
+
+ Field field =
JdbcAbstractSink.class.getDeclaredField("jdbcSinkConfig");
+ field.setAccessible(true);
+ JdbcSinkConfig jdbcSinkConfig = new JdbcSinkConfig();
+ jdbcSinkConfig.setNullValueAction(JdbcSinkConfig.NullValueAction.FAIL);
+ field.set(baseJdbcAutoSchemaSink, jdbcSinkConfig);
+
+ TStates tStates = new TStates("tstats", Arrays.asList(
+ new PC("brand1", "model1"),
+ new PC("brand2", "model2")
+ ));
+ org.apache.pulsar.client.api.Schema<TStates> jsonSchema =
org.apache.pulsar.client.api.Schema.JSON(TStates.class);
+ GenericJsonSchema genericJsonSchema = new
GenericJsonSchema(jsonSchema.getSchemaInfo());
+ byte[] encode = jsonSchema.encode(tStates);
+ GenericRecord genericRecord = genericJsonSchema.decode(encode);
+
+ AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
+
autoConsumeSchema.setSchema(org.apache.pulsar.client.api.Schema.JSON(TStates.class));
+ Record<? extends GenericObject> record = new Record<GenericRecord>() {
+ @Override
+ public org.apache.pulsar.client.api.Schema<GenericRecord>
getSchema() {
+ return genericJsonSchema;
+ }
+
+ @Override
+ public GenericRecord getValue() {
+ return genericRecord;
+ }
+ };
+ JdbcAbstractSink.Mutation mutation =
baseJdbcAutoSchemaSink.createMutation((Record<GenericObject>) record);
+ PreparedStatement mockPreparedStatement =
mock(PreparedStatement.class);
+ baseJdbcAutoSchemaSink.setColumnValue(mockPreparedStatement, 0,
mutation.getValues().apply("state"));
+ baseJdbcAutoSchemaSink.setColumnValue(mockPreparedStatement, 1,
mutation.getValues().apply("pcList"));
+ verify(mockPreparedStatement).setString(0, "tstats");
+ verify(mockPreparedStatement).setString(1,
"[{\"brand\":\"brand1\",\"model\":\"model1\"},{\"brand\":\"brand2\",\"model\":\"model2\"}]");
+ }
+
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ private static class TStates {
+ public String state;
+ public List<PC> pcList;
+ }
+
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ private static class PC {
+ public String brand;
+ public String model;
+ }
+
+
}
\ No newline at end of file