This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ac41cfe35f3 [feature][connector] JDBC sinks: support upsert and row
deletion (#16448)
ac41cfe35f3 is described below
commit ac41cfe35f3c3e34c0064e7a6f06fddd029d16f0
Author: Nicolò Boschi <[email protected]>
AuthorDate: Thu Jul 14 14:37:15 2022 +0200
[feature][connector] JDBC sinks: support upsert and row deletion (#16448)
---
pom.xml | 2 +-
.../pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java | 123 +++++---
.../apache/pulsar/io/jdbc/JdbcAbstractSink.java | 88 ++++--
.../org/apache/pulsar/io/jdbc/JdbcSinkConfig.java | 29 ++
.../java/org/apache/pulsar/io/jdbc/JdbcUtils.java | 11 +-
.../pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java | 18 +-
.../pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java | 9 +
.../pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java | 9 +
.../pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java | 9 +
.../org/apache/pulsar/io/jdbc/JdbcUtilsTest.java | 91 +++---
.../apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java | 317 +++++++++++++++++----
.../org/apache/pulsar/io/jdbc/SqliteUtils.java | 16 +-
site2/docs/io-jdbc-sink.md | 3 +
13 files changed, 558 insertions(+), 167 deletions(-)
diff --git a/pom.xml b/pom.xml
index d1b78b5c597..37cb0e02d69 100644
--- a/pom.xml
+++ b/pom.xml
@@ -158,7 +158,7 @@ flexible messaging model and an intuitive client
API.</description>
<joda.version>2.10.5</joda.version>
<jclouds.version>2.5.0</jclouds.version>
<guice.version>5.1.0</guice.version>
- <sqlite-jdbc.version>3.8.11.2</sqlite-jdbc.version>
+ <sqlite-jdbc.version>3.36.0.3</sqlite-jdbc.version>
<mysql-jdbc.version>8.0.11</mysql-jdbc.version>
<postgresql-jdbc.version>42.3.3</postgresql-jdbc.version>
<clickhouse-jdbc.version>0.3.2</clickhouse-jdbc.version>
diff --git
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
index e5efb15362a..17cfe4cd97b 100644
---
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
+++
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
@@ -21,8 +21,8 @@ package org.apache.pulsar.io.jdbc;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
import java.sql.PreparedStatement;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -44,37 +44,28 @@ import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;
public abstract class BaseJdbcAutoSchemaSink extends
JdbcAbstractSink<GenericObject> {
@Override
- public void bindValue(PreparedStatement statement,
- Record<GenericObject> message, String action) throws
Exception {
- final GenericObject record = message.getValue();
- Function<String, Object> recordValueGetter;
- if (message.getSchema() != null && message.getSchema() instanceof
KeyValueSchema) {
- KeyValueSchema<GenericObject, GenericObject> keyValueSchema =
(KeyValueSchema) message.getSchema();
-
- final org.apache.pulsar.client.api.Schema<GenericObject> keySchema
= keyValueSchema.getKeySchema();
- final org.apache.pulsar.client.api.Schema<GenericObject>
valueSchema = keyValueSchema.getValueSchema();
- KeyValue<GenericObject, GenericObject> keyValue =
- (KeyValue<GenericObject, GenericObject>)
record.getNativeObject();
-
- final GenericObject key = keyValue.getKey();
- final GenericObject value = keyValue.getValue();
-
- Map<String, Object> data = new HashMap<>();
- fillKeyValueSchemaData(keySchema, key, data);
- fillKeyValueSchemaData(valueSchema, value, data);
- recordValueGetter = (k) -> data.get(k);
- } else {
- recordValueGetter = (key) -> ((GenericRecord)
record).getField(key);
- }
+ public String generateUpsertQueryStatement() {
+ throw new IllegalStateException("UPSERT not supported");
+ }
- List<ColumnId> columns = Lists.newArrayList();
- if (action == null || action.equals(INSERT)) {
- columns = tableDefinition.getColumns();
- } else if (action.equals(DELETE)){
- columns.addAll(tableDefinition.getKeyColumns());
- } else if (action.equals(UPDATE)){
- columns.addAll(tableDefinition.getNonKeyColumns());
- columns.addAll(tableDefinition.getKeyColumns());
+ @Override
+ public void bindValue(PreparedStatement statement, Mutation mutation)
throws Exception {
+ final List<ColumnId> columns = new ArrayList<>();
+ switch (mutation.getType()) {
+ case INSERT:
+ columns.addAll(tableDefinition.getColumns());
+ break;
+ case UPSERT:
+ columns.addAll(tableDefinition.getColumns());
+ columns.addAll(tableDefinition.getNonKeyColumns());
+ break;
+ case UPDATE:
+ columns.addAll(tableDefinition.getNonKeyColumns());
+ columns.addAll(tableDefinition.getKeyColumns());
+ break;
+ case DELETE:
+ columns.addAll(tableDefinition.getKeyColumns());
+ break;
}
int index = 1;
@@ -82,10 +73,10 @@ public abstract class BaseJdbcAutoSchemaSink extends
JdbcAbstractSink<GenericObj
String colName = columnId.getName();
int colType = columnId.getType();
if (log.isDebugEnabled()) {
- log.debug("colName: {} colType: {}", colName, colType);
+ log.debug("getting value for column: {} type: {}", colName,
colType);
}
try {
- Object obj = recordValueGetter.apply(colName);
+ Object obj = mutation.getValues().apply(colName);
if (obj != null) {
setColumnValue(statement, index++, obj);
} else {
@@ -105,6 +96,66 @@ public abstract class BaseJdbcAutoSchemaSink extends
JdbcAbstractSink<GenericObj
}
}
+ @Override
+ public Mutation createMutation(Record<GenericObject> message) {
+ final GenericObject record = message.getValue();
+ Function<String, Object> recordValueGetter;
+ MutationType mutationType = null;
+ if (message.getSchema() != null && message.getSchema() instanceof
KeyValueSchema) {
+ KeyValueSchema<GenericObject, GenericObject> keyValueSchema =
(KeyValueSchema) message.getSchema();
+
+ final org.apache.pulsar.client.api.Schema<GenericObject> keySchema
= keyValueSchema.getKeySchema();
+ final org.apache.pulsar.client.api.Schema<GenericObject>
valueSchema = keyValueSchema.getValueSchema();
+ KeyValue<GenericObject, GenericObject> keyValue =
+ (KeyValue<GenericObject, GenericObject>)
record.getNativeObject();
+
+ final GenericObject key = keyValue.getKey();
+ final GenericObject value = keyValue.getValue();
+
+ boolean isDelete = false;
+ if (value == null) {
+ switch (jdbcSinkConfig.getNullValueAction()) {
+ case DELETE:
+ isDelete = true;
+ break;
+ case FAIL:
+ throw new IllegalArgumentException("Got record with
value NULL with nullValueAction=FAIL");
+ default:
+ break;
+ }
+ }
+ Map<String, Object> data = new HashMap<>();
+ fillKeyValueSchemaData(keySchema, key, data);
+ if (isDelete) {
+ mutationType = MutationType.DELETE;
+ } else {
+ fillKeyValueSchemaData(valueSchema, value, data);
+ }
+ recordValueGetter = (k) -> data.get(k);
+ } else {
+ recordValueGetter = (key) -> ((GenericRecord)
record).getField(key);
+ }
+ String action = message.getProperties().get(ACTION_PROPERTY);
+ if (action != null) {
+ mutationType = MutationType.valueOf(action);
+ } else if (mutationType == null) {
+ switch (jdbcSinkConfig.getInsertMode()) {
+ case INSERT:
+ mutationType = MutationType.INSERT;
+ break;
+ case UPSERT:
+ mutationType = MutationType.UPSERT;
+ break;
+ case UPDATE:
+ mutationType = MutationType.UPDATE;
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown insert mode: "
+ jdbcSinkConfig.getInsertMode());
+ }
+ }
+ return new Mutation(mutationType, recordValueGetter);
+ }
+
private static void setColumnNull(PreparedStatement statement, int index,
int type) throws Exception {
if (log.isDebugEnabled()) {
log.debug("Setting column value to null, statement: {}, index:
{}", statement.toString(), index);
@@ -163,6 +214,9 @@ public abstract class BaseJdbcAutoSchemaSink extends
JdbcAbstractSink<GenericObj
private static void
fillKeyValueSchemaData(org.apache.pulsar.client.api.Schema<GenericObject>
schema,
GenericObject record,
Map<String, Object> data) {
+ if (record == null) {
+ return;
+ }
switch (schema.getSchemaInfo().getType()) {
case JSON:
final JsonNode jsonNode = (JsonNode) record.getNativeObject();
@@ -190,6 +244,9 @@ public abstract class BaseJdbcAutoSchemaSink extends
JdbcAbstractSink<GenericObj
@VisibleForTesting
static Object convertAvroField(Object avroValue, Schema schema) {
+ if (avroValue == null) {
+ return null;
+ }
switch (schema.getType()) {
case NULL:
case INT:
diff --git
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index 0fba57dcb29..8c8febb6d31 100644
---
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -31,6 +31,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import lombok.AllArgsConstructor;
+import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.Record;
@@ -43,7 +46,7 @@ import org.apache.pulsar.io.core.SinkContext;
@Slf4j
public abstract class JdbcAbstractSink<T> implements Sink<T> {
// ----- Runtime fields
- private JdbcSinkConfig jdbcSinkConfig;
+ protected JdbcSinkConfig jdbcSinkConfig;
@Getter
private Connection connection;
private String jdbcUrl;
@@ -52,13 +55,11 @@ public abstract class JdbcAbstractSink<T> implements
Sink<T> {
private JdbcUtils.TableId tableId;
private PreparedStatement insertStatement;
private PreparedStatement updateStatement;
+ private PreparedStatement upsertStatement;
private PreparedStatement deleteStatement;
- protected static final String ACTION = "ACTION";
- protected static final String INSERT = "INSERT";
- protected static final String UPDATE = "UPDATE";
- protected static final String DELETE = "DELETE";
+ protected static final String ACTION_PROPERTY = "ACTION";
protected JdbcUtils.TableDefinition tableDefinition;
@@ -122,12 +123,15 @@ public abstract class JdbcAbstractSink<T> implements
Sink<T> {
}
tableDefinition = JdbcUtils.getTableDefinition(connection, tableId,
keyList, nonKeyList);
- insertStatement = JdbcUtils.buildInsertStatement(connection,
JdbcUtils.buildInsertSql(tableDefinition));
+ insertStatement = JdbcUtils.buildInsertStatement(connection,
generateInsertQueryStatement());
+ if (jdbcSinkConfig.getInsertMode() ==
JdbcSinkConfig.InsertMode.UPSERT) {
+ upsertStatement = JdbcUtils.buildInsertStatement(connection,
generateUpsertQueryStatement());
+ }
if (!nonKeyList.isEmpty()) {
- updateStatement = JdbcUtils.buildUpdateStatement(connection,
JdbcUtils.buildUpdateSql(tableDefinition));
+ updateStatement = JdbcUtils.buildUpdateStatement(connection,
generateUpdateQueryStatement());
}
if (!keyList.isEmpty()) {
- deleteStatement = JdbcUtils.buildDeleteStatement(connection,
JdbcUtils.buildDeleteSql(tableDefinition));
+ deleteStatement = JdbcUtils.buildDeleteStatement(connection,
generateDeleteQueryStatement());
}
}
@@ -136,6 +140,18 @@ public abstract class JdbcAbstractSink<T> implements
Sink<T> {
if (connection != null && !connection.getAutoCommit()) {
connection.commit();
}
+ if (insertStatement != null) {
+ insertStatement.close();
+ }
+ if (updateStatement != null) {
+ updateStatement.close();
+ }
+ if (upsertStatement != null) {
+ upsertStatement.close();
+ }
+ if (deleteStatement != null) {
+ deleteStatement.close();
+ }
if (flushExecutor != null) {
flushExecutor.shutdown();
flushExecutor = null;
@@ -159,10 +175,40 @@ public abstract class JdbcAbstractSink<T> implements
Sink<T> {
}
}
+ public String generateInsertQueryStatement() {
+ return JdbcUtils.buildInsertSql(tableDefinition);
+ }
+
+ public String generateUpdateQueryStatement() {
+ return JdbcUtils.buildUpdateSql(tableDefinition);
+ }
+
+ public abstract String generateUpsertQueryStatement();
+
+ public String generateDeleteQueryStatement() {
+ return JdbcUtils.buildDeleteSql(tableDefinition);
+ }
+
// bind value with a PreparedStetement
public abstract void bindValue(
PreparedStatement statement,
- Record<T> message, String action) throws Exception;
+ Mutation mutation) throws Exception;
+
+ public abstract Mutation createMutation(Record<T> message);
+
+ @Data
+ @AllArgsConstructor
+ protected static class Mutation {
+ private MutationType type;
+ private Function<String, Object> values;
+ }
+ protected enum MutationType {
+ INSERT,
+ UPDATE,
+ UPSERT,
+ DELETE
+ }
+
private void flush() {
// if not in flushing state, do flush, else return;
@@ -187,42 +233,44 @@ public abstract class JdbcAbstractSink<T> implements
Sink<T> {
try {
// bind each record value
for (Record<T> record : swapList) {
- String action = record.getProperties().get(ACTION);
- if (action == null) {
- action = INSERT;
- }
- switch (action) {
+ final Mutation mutation = createMutation(record);
+ switch (mutation.getType()) {
case DELETE:
- bindValue(deleteStatement, record, action);
+ bindValue(deleteStatement, mutation);
count += 1;
deleteStatement.execute();
break;
case UPDATE:
- bindValue(updateStatement, record, action);
+ bindValue(updateStatement, mutation);
count += 1;
updateStatement.execute();
break;
case INSERT:
- bindValue(insertStatement, record, action);
+ bindValue(insertStatement, mutation);
count += 1;
insertStatement.execute();
break;
+ case UPSERT:
+ bindValue(upsertStatement, mutation);
+ count += 1;
+ upsertStatement.execute();
+ break;
default:
String msg = String.format(
"Unsupported action %s, can be one of %s,
or not set which indicate %s",
- action, Arrays.asList(INSERT, UPDATE,
DELETE), INSERT);
+ mutation.getType(),
Arrays.toString(MutationType.values()), MutationType.INSERT);
throw new IllegalArgumentException(msg);
}
}
connection.commit();
swapList.forEach(Record::ack);
} catch (Exception e) {
- log.error("Got exception ", e);
+ log.error("Got exception ", e.getMessage(), e);
swapList.forEach(Record::fail);
}
if (swapList.size() != count) {
- log.error("Update count {} not match total number of records
{}", count, swapList.size());
+ log.error("Update count {} not match total number of records
{}", count, swapList.size());
}
// finish flush
diff --git
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
index acd35047b01..89e48980858 100644
---
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
+++
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
@@ -86,6 +86,35 @@ public class JdbcSinkConfig implements Serializable {
)
private int batchSize = 200;
+ @FieldDoc(
+ required = false,
+ defaultValue = "INSERT",
+ help = "If it is configured as UPSERT, the sink will use upsert
semantics rather than "
+ + "plain INSERT/UPDATE statements. Upsert semantics refer
to atomically adding a new row or "
+ + "updating the existing row if there is a primary key
constraint violation, "
+ + "which provides idempotence."
+ )
+ private InsertMode insertMode = InsertMode.INSERT;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "FAIL",
+ help = "How to handle records with null values, possible options
are DELETE or FAIL."
+ )
+ private NullValueAction nullValueAction = NullValueAction.FAIL;
+
+ public enum InsertMode {
+ INSERT,
+ UPSERT,
+ UPDATE;
+ }
+
+ public enum NullValueAction {
+ FAIL,
+ DELETE
+ }
+
+
public static JdbcSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), JdbcSinkConfig.class);
diff --git
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
index 96a39016c0b..637de0be96f 100644
--- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
+++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
@@ -198,6 +198,13 @@ public class JdbcUtils {
builder.append("UPDATE ");
builder.append(table.tableId.getTableName());
builder.append(" SET ");
+ StringJoiner setJoiner = buildUpdateSqlSetPart(table);
+ builder.append(setJoiner);
+ builder.append(combationWhere(table.keyColumns));
+ return builder.toString();
+ }
+
+ public static StringJoiner buildUpdateSqlSetPart(TableDefinition table) {
StringJoiner setJoiner = new StringJoiner(",");
table.nonKeyColumns.forEach((columnId) ->{
@@ -205,9 +212,7 @@ public class JdbcUtils {
equals.add(columnId.getName()).add("? ");
setJoiner.add(equals.toString());
});
- builder.append(setJoiner.toString());
- builder.append(combationWhere(table.keyColumns));
- return builder.toString();
+ return setJoiner;
}
public static PreparedStatement buildUpdateStatement(Connection
connection, String updateSQL) throws SQLException {
diff --git
a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
index de31466466d..459468facf2 100644
---
a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
+++
b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
@@ -93,20 +93,20 @@ public class BaseJdbcAutoSchemaSinkTest {
@Test(expectedExceptions = UnsupportedOperationException.class,
expectedExceptionsMessageRegExp = "Unsupported avro schema type.*")
public void testNotSupportedAvroTypesBytes() {
- BaseJdbcAutoSchemaSink.convertAvroField(null,
createFieldAndGetSchema((builder) ->
+ BaseJdbcAutoSchemaSink.convertAvroField(new Object(),
createFieldAndGetSchema((builder) ->
builder.name("field").type().bytesType().noDefault()));
}
@Test(expectedExceptions = UnsupportedOperationException.class,
expectedExceptionsMessageRegExp = "Unsupported avro schema type.*")
public void testNotSupportedAvroTypesFixed() {
- BaseJdbcAutoSchemaSink.convertAvroField(null,
createFieldAndGetSchema((builder) ->
+ BaseJdbcAutoSchemaSink.convertAvroField(new Object(),
createFieldAndGetSchema((builder) ->
builder.name("field").type().fixed("fix").size(16).noDefault()));
}
@Test(expectedExceptions = UnsupportedOperationException.class,
expectedExceptionsMessageRegExp = "Unsupported avro schema type.*")
public void testNotSupportedAvroTypesRecord() {
- BaseJdbcAutoSchemaSink.convertAvroField(null,
createFieldAndGetSchema((builder) ->
+ BaseJdbcAutoSchemaSink.convertAvroField(new Object(),
createFieldAndGetSchema((builder) ->
builder.name("field").type()
.record("myrecord").fields()
.name("f1").type().intType().noDefault()
@@ -116,7 +116,7 @@ public class BaseJdbcAutoSchemaSinkTest {
@Test(expectedExceptions = UnsupportedOperationException.class,
expectedExceptionsMessageRegExp = "Unsupported avro schema type.*")
public void testNotSupportedAvroTypesMap() {
- BaseJdbcAutoSchemaSink.convertAvroField(null,
createFieldAndGetSchema((builder) ->
+ BaseJdbcAutoSchemaSink.convertAvroField(new Object(),
createFieldAndGetSchema((builder) ->
builder.name("field").type().map().values().stringType().noDefault()));
}
@@ -124,11 +124,19 @@ public class BaseJdbcAutoSchemaSinkTest {
@Test(expectedExceptions = UnsupportedOperationException.class,
expectedExceptionsMessageRegExp = "Unsupported avro schema type.*")
public void testNotSupportedAvroTypesArray() {
- BaseJdbcAutoSchemaSink.convertAvroField(null,
createFieldAndGetSchema((builder) ->
+ BaseJdbcAutoSchemaSink.convertAvroField(new Object(),
createFieldAndGetSchema((builder) ->
builder.name("field").type().array().items().stringType().noDefault()));
}
+ @Test
+ public void testConvertAvroNullValue() {
+ Object converted = BaseJdbcAutoSchemaSink.convertAvroField(null,
createFieldAndGetSchema((builder) ->
+ builder.name("field").type().stringType().noDefault()));
+ Assert.assertNull(converted);
+ }
+
+
private Schema
createFieldAndGetSchema(Function<SchemaBuilder.FieldAssembler<Schema>,
SchemaBuilder.FieldAssembler<Schema>> consumer) {
final SchemaBuilder.FieldAssembler<Schema> record =
SchemaBuilder.record("record")
diff --git
a/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
b/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
index 3fc45bb3f7b..5c232db1aa0 100644
---
a/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
+++
b/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.jdbc;
+import java.util.stream.Collectors;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
@@ -29,4 +30,12 @@ import org.apache.pulsar.io.core.annotations.IOType;
)
public class MariadbJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
+ @Override
+ public String generateUpsertQueryStatement() {
+ final String keys =
tableDefinition.getKeyColumns().stream().map(JdbcUtils.ColumnId::getName)
+ .collect(Collectors.joining(","));
+ return JdbcUtils.buildInsertSql(tableDefinition)
+ + "ON DUPLICATE KEY UPDATE " +
JdbcUtils.buildUpdateSqlSetPart(tableDefinition);
+ }
+
}
diff --git
a/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
b/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
index 9ed122eb3bb..d29e3e3aaca 100644
---
a/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
+++
b/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.jdbc;
+import java.util.stream.Collectors;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
@@ -29,4 +30,12 @@ import org.apache.pulsar.io.core.annotations.IOType;
)
public class PostgresJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
+ @Override
+ public String generateUpsertQueryStatement() {
+ final String keys =
tableDefinition.getKeyColumns().stream().map(JdbcUtils.ColumnId::getName)
+ .collect(Collectors.joining(","));
+ return JdbcUtils.buildInsertSql(tableDefinition)
+ + " ON CONFLICT(" + keys + ") "
+ + "DO UPDATE SET " +
JdbcUtils.buildUpdateSqlSetPart(tableDefinition);
+ }
}
diff --git
a/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java
b/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java
index 8cfd3f0962e..76655397003 100644
---
a/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java
+++
b/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.jdbc;
+import java.util.stream.Collectors;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
@@ -29,4 +30,12 @@ import org.apache.pulsar.io.core.annotations.IOType;
)
public class SqliteJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
+ @Override
+ public String generateUpsertQueryStatement() {
+ final String keys =
tableDefinition.getKeyColumns().stream().map(JdbcUtils.ColumnId::getName)
+ .collect(Collectors.joining(","));
+ return JdbcUtils.buildInsertSql(tableDefinition)
+ + " ON CONFLICT(" + keys + ") "
+ + "DO UPDATE SET " +
JdbcUtils.buildUpdateSqlSetPart(tableDefinition);
+ }
}
diff --git
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java
index b3ea296bf6f..deb2fb88aa8 100644
---
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java
+++
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java
@@ -69,53 +69,54 @@ public class JdbcUtilsTest {
"PRIMARY KEY (firstName, lastName));"
);
- Connection connection = sqliteUtils.getConnection();
+ try (Connection connection = sqliteUtils.getConnection(true);) {
- // Test getTableId
- log.info("verify getTableId");
- TableId id = JdbcUtils.getTableId(connection, tableName);
- Assert.assertEquals(id.getTableName(), tableName);
+ // Test getTableId
+ log.info("verify getTableId");
+ TableId id = JdbcUtils.getTableId(connection, tableName);
+ Assert.assertEquals(id.getTableName(), tableName);
- // Test get getTableDefinition
- log.info("verify getTableDefinition");
- List<String> keyList = Lists.newArrayList();
- keyList.add("firstName");
- keyList.add("lastName");
- List<String> nonKeyList = Lists.newArrayList();
- nonKeyList.add("age");
- nonKeyList.add("long");
- TableDefinition table = JdbcUtils.getTableDefinition(connection, id,
keyList, nonKeyList);
- Assert.assertEquals(table.getColumns().get(0).getName(), "firstName");
- Assert.assertEquals(table.getColumns().get(0).getTypeName(), "TEXT");
- Assert.assertEquals(table.getColumns().get(2).getName(), "age");
- Assert.assertEquals(table.getColumns().get(2).getTypeName(),
"INTEGER");
- Assert.assertEquals(table.getColumns().get(7).getName(), "float");
- Assert.assertEquals(table.getColumns().get(7).getTypeName(),
"NUMERIC");
- Assert.assertEquals(table.getKeyColumns().get(0).getName(),
"firstName");
- Assert.assertEquals(table.getKeyColumns().get(0).getTypeName(),
"TEXT");
- Assert.assertEquals(table.getKeyColumns().get(1).getName(),
"lastName");
- Assert.assertEquals(table.getKeyColumns().get(1).getTypeName(),
"TEXT");
- Assert.assertEquals(table.getNonKeyColumns().get(0).getName(), "age");
- Assert.assertEquals(table.getNonKeyColumns().get(0).getTypeName(),
"INTEGER");
- Assert.assertEquals(table.getNonKeyColumns().get(1).getName(), "long");
- Assert.assertEquals(table.getNonKeyColumns().get(1).getTypeName(),
"INTEGER");
- // Test get getTableDefinition
- log.info("verify buildInsertSql");
- String expctedInsertStatement = "INSERT INTO " + tableName +
- "(firstName,lastName,age,bool,byte,short,long,float,double,bytes)"
+
- " VALUES(?,?,?,?,?,?,?,?,?,?)";
- String insertStatement = JdbcUtils.buildInsertSql(table);
- Assert.assertEquals(insertStatement, expctedInsertStatement);
- log.info("verify buildUpdateSql");
- String expectedUpdateStatement = "UPDATE " + tableName +
- " SET age=? ,long=? WHERE firstName=? AND lastName=?";
- String updateStatement = JdbcUtils.buildUpdateSql(table);
- Assert.assertEquals(updateStatement, expectedUpdateStatement);
- log.info("verify buildDeleteSql");
- String expectedDeleteStatement = "DELETE FROM " + tableName +
- " WHERE firstName=? AND lastName=?";
- String deleteStatement = JdbcUtils.buildDeleteSql(table);
- Assert.assertEquals(deleteStatement, expectedDeleteStatement);
+ // Test get getTableDefinition
+ log.info("verify getTableDefinition");
+ List<String> keyList = Lists.newArrayList();
+ keyList.add("firstName");
+ keyList.add("lastName");
+ List<String> nonKeyList = Lists.newArrayList();
+ nonKeyList.add("age");
+ nonKeyList.add("long");
+ TableDefinition table = JdbcUtils.getTableDefinition(connection,
id, keyList, nonKeyList);
+ Assert.assertEquals(table.getColumns().get(0).getName(),
"firstName");
+ Assert.assertEquals(table.getColumns().get(0).getTypeName(),
"TEXT");
+ Assert.assertEquals(table.getColumns().get(2).getName(), "age");
+ Assert.assertEquals(table.getColumns().get(2).getTypeName(),
"INTEGER");
+ Assert.assertEquals(table.getColumns().get(7).getName(), "float");
+ Assert.assertEquals(table.getColumns().get(7).getTypeName(),
"NUMERIC");
+ Assert.assertEquals(table.getKeyColumns().get(0).getName(),
"firstName");
+ Assert.assertEquals(table.getKeyColumns().get(0).getTypeName(),
"TEXT");
+ Assert.assertEquals(table.getKeyColumns().get(1).getName(),
"lastName");
+ Assert.assertEquals(table.getKeyColumns().get(1).getTypeName(),
"TEXT");
+ Assert.assertEquals(table.getNonKeyColumns().get(0).getName(),
"age");
+ Assert.assertEquals(table.getNonKeyColumns().get(0).getTypeName(),
"INTEGER");
+ Assert.assertEquals(table.getNonKeyColumns().get(1).getName(),
"long");
+ Assert.assertEquals(table.getNonKeyColumns().get(1).getTypeName(),
"INTEGER");
+ // Test get getTableDefinition
+ log.info("verify buildInsertSql");
+ String expctedInsertStatement = "INSERT INTO " + tableName +
+
"(firstName,lastName,age,bool,byte,short,long,float,double,bytes)" +
+ " VALUES(?,?,?,?,?,?,?,?,?,?)";
+ String insertStatement = JdbcUtils.buildInsertSql(table);
+ Assert.assertEquals(insertStatement, expctedInsertStatement);
+ log.info("verify buildUpdateSql");
+ String expectedUpdateStatement = "UPDATE " + tableName +
+ " SET age=? ,long=? WHERE firstName=? AND lastName=?";
+ String updateStatement = JdbcUtils.buildUpdateSql(table);
+ Assert.assertEquals(updateStatement, expectedUpdateStatement);
+ log.info("verify buildDeleteSql");
+ String expectedDeleteStatement = "DELETE FROM " + tableName +
+ " WHERE firstName=? AND lastName=?";
+ String deleteStatement = JdbcUtils.buildDeleteSql(table);
+ Assert.assertEquals(deleteStatement, expectedDeleteStatement);
+ }
}
}
diff --git
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
index 1ce3bc5b353..cb1ca844249 100644
---
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
+++
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
@@ -19,8 +19,20 @@
package org.apache.pulsar.io.jdbc;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.util.Utf8;
@@ -47,22 +59,12 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
/**
* Jdbc Sink test
*/
@Slf4j
public class SqliteJdbcSinkTest {
- private final SqliteUtils sqliteUtils = new
SqliteUtils(getClass().getSimpleName());
+ private final SqliteUtils sqliteUtils = new
SqliteUtils(UUID.randomUUID().toString());
private BaseJdbcAutoSchemaSink jdbcSink;
private final String tableName = "TestOpenAndWriteSink";
@@ -409,14 +411,87 @@ public class SqliteJdbcSinkTest {
Assert.assertEquals(sqliteUtils.select(deleteQuerySql, (resultSet) ->
{}), 0);
}
- @DataProvider(name = "schemaType")
+ private static class MockKeyValueGenericRecord implements
Record<GenericObject> {
+
+ public MockKeyValueGenericRecord(Schema<KeyValue<GenericRecord,
GenericRecord>> keyValueSchema) {
+ this.keyValueSchema = keyValueSchema;
+ }
+
+ int ackCount;
+ int failCount;
+ AtomicReference<KeyValue> keyValueHolder = new AtomicReference<>();
+ Schema<KeyValue<GenericRecord, GenericRecord>> keyValueSchema;
+
+ private final GenericObject genericObject = new GenericObject() {
+ @Override
+ public SchemaType getSchemaType() {
+ return SchemaType.KEY_VALUE;
+ }
+
+ @Override
+ public Object getNativeObject() {
+ return keyValueHolder.get();
+ }
+ };
+ @Override
+ public Optional<String> getTopicName() {
+ return Optional.of("topic");
+ }
+
+ @Override
+ public org.apache.pulsar.client.api.Schema getSchema() {
+ return keyValueSchema;
+ }
+
+ @Override
+ public GenericObject getValue() {
+ return genericObject;
+ }
+
+ public void setKeyValue(KeyValue kv) {
+ keyValueHolder.set(kv);
+ }
+
+ @Override
+ public void ack() {
+ ackCount++;
+ }
+
+ @Override
+ public void fail() {
+ failCount++;
+ }
+ }
+
+ @Data
+ @AllArgsConstructor
+ static class InsertModeTestConfig {
+ SchemaType schemaType;
+ JdbcSinkConfig.InsertMode insertMode;
+ }
+
+ @DataProvider(name = "insertModes")
public Object[] schemaType() {
- return new Object[]{SchemaType.JSON, SchemaType.AVRO};
+ List<SchemaType> schemas = Arrays.asList(SchemaType.JSON,
SchemaType.AVRO);
+ JdbcSinkConfig.InsertMode[] insertModes =
JdbcSinkConfig.InsertMode.values();
+ Object[] result = new Object[schemas.size() * insertModes.length];
+ int i = 0;
+ for (SchemaType schema : schemas) {
+ for (JdbcSinkConfig.InsertMode insertMode : insertModes) {
+ result[i++] = new InsertModeTestConfig(schema, insertMode);
+ }
+ }
+ return result;
}
- @Test(dataProvider = "schemaType")
- public void testKeyValueSchema(SchemaType schemaType) throws Exception {
+ @Test(dataProvider = "insertModes")
+ public void testInsertMode(InsertModeTestConfig config) throws Exception {
+ jdbcSink.close();
+ final JdbcSinkConfig.InsertMode insertMode = config.getInsertMode();
+ final SchemaType schemaType = config.getSchemaType();
+
+ final String tableName = "kvtable_insertmode";
RecordSchemaBuilder keySchemaBuilder =
org.apache.pulsar.client.api.schema.SchemaBuilder.record("key");
keySchemaBuilder.field("key").type(SchemaType.STRING).optional().defaultValue(null);
GenericSchema<GenericRecord> keySchema =
Schema.generic(keySchemaBuilder.build(schemaType));
@@ -427,6 +502,7 @@ public class SqliteJdbcSinkTest {
RecordSchemaBuilder valueSchemaBuilder =
org.apache.pulsar.client.api.schema.SchemaBuilder.record("value");
valueSchemaBuilder.field("string").type(SchemaType.STRING).optional().defaultValue(null);
valueSchemaBuilder.field("stringutf8").type(SchemaType.STRING).optional().defaultValue(null);
+
valueSchemaBuilder.field("nulltext").type(SchemaType.STRING).optional().defaultValue(null);
valueSchemaBuilder.field("int").type(SchemaType.INT32).optional().defaultValue(null);
valueSchemaBuilder.field("bool").type(SchemaType.BOOLEAN).optional().defaultValue(null);
valueSchemaBuilder.field("double").type(SchemaType.DOUBLE).optional().defaultValue(null);
@@ -434,7 +510,9 @@ public class SqliteJdbcSinkTest {
valueSchemaBuilder.field("long").type(SchemaType.INT64).optional().defaultValue(null);
GenericSchema<GenericRecord> valueSchema =
Schema.generic(valueSchemaBuilder.build(schemaType));
- GenericRecord valueGenericRecord = valueSchema.newRecordBuilder()
+ Schema<KeyValue<GenericRecord, GenericRecord>> keyValueSchema =
Schema.KeyValue(keySchema, valueSchema, KeyValueEncodingType.INLINE);
+ MockKeyValueGenericRecord genericObjectRecord = new
MockKeyValueGenericRecord(keyValueSchema);
+ genericObjectRecord.setKeyValue(new KeyValue<>(keyGenericRecord,
valueSchema.newRecordBuilder()
.set("string", "thestring")
.set("stringutf8", schemaType == SchemaType.AVRO ? new
Utf8("thestringutf8"): "thestringutf8")
.set("int", Integer.MAX_VALUE)
@@ -442,40 +520,10 @@ public class SqliteJdbcSinkTest {
.set("double", Double.MAX_VALUE)
.set("float", Float.MAX_VALUE)
.set("long", Long.MIN_VALUE)
- .build();
-
- Schema<KeyValue<GenericRecord, GenericRecord>> keyValueSchema =
Schema.KeyValue(keySchema, valueSchema, KeyValueEncodingType.INLINE);
- KeyValue<GenericRecord, GenericRecord> keyValue = new
KeyValue<>(keyGenericRecord, valueGenericRecord);
- GenericObject genericObject = new GenericObject() {
- @Override
- public SchemaType getSchemaType() {
- return SchemaType.KEY_VALUE;
- }
-
- @Override
- public Object getNativeObject() {
- return keyValue;
- }
- };
- Record<GenericObject> genericObjectRecord = new Record<>() {
- @Override
- public Optional<String> getTopicName() {
- return Optional.of("topic");
- }
-
- @Override
- public org.apache.pulsar.client.api.Schema getSchema() {
- return keyValueSchema;
- }
+ .build()));
- @Override
- public GenericObject getValue() {
- return genericObject;
- }
- };
- jdbcSink.close();
sqliteUtils.createTable(
- "CREATE TABLE kvtable (" +
+ "CREATE TABLE " + tableName + " (" +
" key TEXT," +
" int INTEGER," +
" string TEXT," +
@@ -491,18 +539,19 @@ public class SqliteJdbcSinkTest {
Map<String, Object> conf = Maps.newHashMap();
conf.put("jdbcUrl", jdbcUrl);
- conf.put("tableName", "kvtable");
+ conf.put("tableName", tableName);
conf.put("key", "key");
conf.put("nonKey",
"long,int,double,float,bool,nulltext,string,stringutf8");
// change batchSize to 1, to flush on each write.
conf.put("batchSize", 1);
+ conf.put("insertMode", insertMode.toString());
try (SqliteJdbcAutoSchemaSink kvSchemaJdbcSink = new
SqliteJdbcAutoSchemaSink();) {
kvSchemaJdbcSink.open(conf, null);
kvSchemaJdbcSink.write(genericObjectRecord);
Awaitility.await().untilAsserted(() -> {
final int count = sqliteUtils.select("select
int,string,stringutf8,bool,double,float," +
- "long,nulltext from kvtable where key='mykey'",
(resultSet) -> {
+ "long,nulltext from " + tableName + " where
key='mykey'", (resultSet) -> {
int index = 1;
Assert.assertEquals(resultSet.getInt(index++),
Integer.MAX_VALUE);
Assert.assertEquals(resultSet.getString(index++),
"thestring");
@@ -513,9 +562,175 @@ public class SqliteJdbcSinkTest {
Assert.assertEquals(resultSet.getLong(index++),
Long.MIN_VALUE);
Assert.assertNull(resultSet.getString(index++));
});
- Assert.assertEquals(count, 1);
+ if (insertMode == JdbcSinkConfig.InsertMode.INSERT
+ || insertMode == JdbcSinkConfig.InsertMode.UPSERT) {
+ Assert.assertEquals(count, 1);
+ } else {
+ Assert.assertEquals(count, 0);
+ }
+ });
+ }
+
+ if (insertMode == JdbcSinkConfig.InsertMode.UPDATE) {
+ conf.put("insertMode", JdbcSinkConfig.InsertMode.INSERT);
+ try (SqliteJdbcAutoSchemaSink kvSchemaJdbcSink = new
SqliteJdbcAutoSchemaSink();) {
+ kvSchemaJdbcSink.open(conf, null);
+ kvSchemaJdbcSink.write(genericObjectRecord);
+ Awaitility.await().untilAsserted(() -> {
+ final int count = sqliteUtils.select("select key from " +
tableName + " where key='mykey'", (resultSet) -> {
+ });
+ Assert.assertEquals(count, 1);
+ });
+ }
+ }
+ conf.put("insertMode", insertMode.toString());
+ try (SqliteJdbcAutoSchemaSink kvSchemaJdbcSink = new
SqliteJdbcAutoSchemaSink();) {
+ kvSchemaJdbcSink.open(conf, null);
+ genericObjectRecord = new
MockKeyValueGenericRecord(keyValueSchema);
+ genericObjectRecord.setKeyValue(new KeyValue<>(keyGenericRecord,
valueSchema.newRecordBuilder()
+ .set("string", "thestring_updated")
+ .set("stringutf8", schemaType == SchemaType.AVRO ?
+ new Utf8("thestringutf8_updated"):
"thestringutf8_updated")
+ .set("int", Integer.MIN_VALUE)
+ .set("bool", false)
+ .set("double", Double.MIN_VALUE)
+ .set("float", Float.MIN_VALUE)
+ .set("long", Long.MAX_VALUE)
+ .set("nulltext", "nomore-null")
+ .build()));
+ kvSchemaJdbcSink.write(genericObjectRecord);
+
+ Awaitility.await().untilAsserted(() -> {
+ final int count = sqliteUtils.select("select
int,string,stringutf8,bool,double,float," +
+ "long,nulltext from " + tableName + " where
key='mykey'", (resultSet) -> {
+ int index = 1;
+ if (insertMode == JdbcSinkConfig.InsertMode.INSERT) {
+ Assert.assertEquals(resultSet.getInt(index++),
Integer.MAX_VALUE);
+ Assert.assertEquals(resultSet.getString(index++),
"thestring");
+ Assert.assertEquals(resultSet.getString(index++),
"thestringutf8");
+ Assert.assertEquals(resultSet.getBoolean(index++),
true);
+ Assert.assertEquals(resultSet.getDouble(index++),
Double.MAX_VALUE);
+ Assert.assertEquals(resultSet.getFloat(index++),
Float.MAX_VALUE);
+ Assert.assertEquals(resultSet.getLong(index++),
Long.MIN_VALUE);
+ Assert.assertNull(resultSet.getString(index++));
+ } else {
+ Assert.assertEquals(resultSet.getInt(index++),
Integer.MIN_VALUE);
+ Assert.assertEquals(resultSet.getString(index++),
"thestring_updated");
+ Assert.assertEquals(resultSet.getString(index++),
"thestringutf8_updated");
+ Assert.assertEquals(resultSet.getBoolean(index++),
false);
+ Assert.assertEquals(resultSet.getDouble(index++),
Double.MIN_VALUE);
+ Assert.assertEquals(resultSet.getFloat(index++),
Float.MIN_VALUE);
+ Assert.assertEquals(resultSet.getLong(index++),
Long.MAX_VALUE);
+ Assert.assertEquals(resultSet.getString(index++),
"nomore-null");
+ }
+ });
+ if (insertMode == JdbcSinkConfig.InsertMode.INSERT) {
+ Assert.assertEquals(count, 1);
+ } else if (insertMode == JdbcSinkConfig.InsertMode.UPSERT) {
+ Assert.assertEquals(count, 1);
+ } else {
+ Assert.assertEquals(count, 1);
+ }
});
}
}
+ @Data
+ @AllArgsConstructor
+ static class NullValueActionTestConfig {
+ SchemaType schemaType;
+ JdbcSinkConfig.NullValueAction nullValueAction;
+ }
+ @DataProvider(name = "nullValueActions")
+ public Object[] nullValueActions() {
+ List<SchemaType> schemas = Arrays.asList(SchemaType.JSON,
SchemaType.AVRO);
+ JdbcSinkConfig.NullValueAction[] nullValueActions =
JdbcSinkConfig.NullValueAction.values();
+ Object[] result = new Object[schemas.size() * nullValueActions.length];
+ int i = 0;
+ for (SchemaType schema : schemas) {
+ for (JdbcSinkConfig.NullValueAction nullValueAction :
nullValueActions) {
+ result[i++] = new NullValueActionTestConfig(schema,
nullValueAction);
+ }
+ }
+ return result;
+ }
+
+ @Test(dataProvider = "nullValueActions")
+ public void testNullValueAction(NullValueActionTestConfig config) throws
Exception {
+ jdbcSink.close();
+ final SchemaType schemaType = config.getSchemaType();
+ final JdbcSinkConfig.NullValueAction nullValueAction =
config.getNullValueAction();
+ final String tableName = "kvtable_nullvalue";
+
+ RecordSchemaBuilder keySchemaBuilder =
org.apache.pulsar.client.api.schema.SchemaBuilder.record("key");
+
keySchemaBuilder.field("key").type(SchemaType.STRING).optional().defaultValue(null);
+ GenericSchema<GenericRecord> keySchema =
Schema.generic(keySchemaBuilder.build(schemaType));
+ GenericRecord keyGenericRecord = keySchema.newRecordBuilder()
+ .set("key", "mykey")
+ .build();
+
+ RecordSchemaBuilder valueSchemaBuilder =
org.apache.pulsar.client.api.schema.SchemaBuilder.record("value");
+
valueSchemaBuilder.field("string").type(SchemaType.STRING).optional().defaultValue(null);
+ GenericSchema<GenericRecord> valueSchema =
Schema.generic(valueSchemaBuilder.build(schemaType));
+
+ Schema<KeyValue<GenericRecord, GenericRecord>> keyValueSchema =
Schema.KeyValue(keySchema, valueSchema, KeyValueEncodingType.INLINE);
+ MockKeyValueGenericRecord genericObjectRecord = new
MockKeyValueGenericRecord(keyValueSchema);
+
+ sqliteUtils.createTable("CREATE TABLE " + tableName + " (" +
+ " key TEXT," +
+ " string TEXT," +
+ "PRIMARY KEY (key));"
+ );
+ String jdbcUrl = sqliteUtils.sqliteUri();
+
+ Map<String, Object> conf = Maps.newHashMap();
+ conf.put("jdbcUrl", jdbcUrl);
+ conf.put("tableName", tableName);
+ conf.put("key", "key");
+ conf.put("nonKey", "string");
+ conf.put("batchSize", 3);
+ conf.put("insertMode", JdbcSinkConfig.InsertMode.UPSERT.toString());
+ conf.put("nullValueAction", nullValueAction.toString());
+ try (SqliteJdbcAutoSchemaSink kvSchemaJdbcSink = new
SqliteJdbcAutoSchemaSink();) {
+ kvSchemaJdbcSink.open(conf, null);
+ genericObjectRecord.setKeyValue(new KeyValue<>(keyGenericRecord,
valueSchema.newRecordBuilder()
+ .set("string", "thestring")
+ .build()));
+ kvSchemaJdbcSink.write(genericObjectRecord);
+
+ genericObjectRecord = new
MockKeyValueGenericRecord(keyValueSchema);
+ genericObjectRecord.setKeyValue(new
KeyValue<>(keySchema.newRecordBuilder()
+ .set("key", "mykey2")
+ .build(), valueSchema.newRecordBuilder()
+ .set("string", "thestring")
+ .build()));
+ kvSchemaJdbcSink.write(genericObjectRecord);
+
+ genericObjectRecord = new
MockKeyValueGenericRecord(keyValueSchema);
+ genericObjectRecord.setKeyValue(new KeyValue<>(keyGenericRecord,
null));
+ kvSchemaJdbcSink.write(genericObjectRecord);
+
+ Awaitility.await().untilAsserted(() -> {
+ final int count = sqliteUtils.select("select key,string from "
+ tableName, (resultSet) -> {
+ int index = 1;
+ final String key = resultSet.getString(index++);
+ final String value = resultSet.getString(index++);
+ if (key.equals("mykey2")) {
+ Assert.assertEquals(value, "thestring");
+ } else {
+ throw new IllegalStateException();
+ }
+ });
+ if (nullValueAction == JdbcSinkConfig.NullValueAction.DELETE) {
+ Assert.assertEquals(count, 1);
+ } else if (nullValueAction ==
JdbcSinkConfig.NullValueAction.FAIL) {
+ Assert.assertEquals(count, 0);
+ } else {
+ throw new IllegalStateException();
+ }
+ });
+
+ }
+ }
+
}
diff --git
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java
index bb855d64bf1..f8cdf76f7d3 100644
---
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java
+++
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java
@@ -49,9 +49,9 @@ public final class SqliteUtils {
private final Path dbPath;
- private Connection connection;
-
- public Connection getConnection() {
+ public Connection getConnection(boolean autocommit) throws SQLException {
+ Connection connection = DriverManager.getConnection(sqliteUri());
+ connection.setAutoCommit(autocommit);
return connection;
}
@@ -65,12 +65,9 @@ public final class SqliteUtils {
public void setUp() throws SQLException, IOException {
Files.deleteIfExists(dbPath);
- connection = DriverManager.getConnection(sqliteUri());
- connection.setAutoCommit(false);
}
public void tearDown() throws SQLException, IOException {
- connection.close();
Files.deleteIfExists(dbPath);
}
@@ -91,7 +88,8 @@ public final class SqliteUtils {
public int select(final String query, final ResultSetReadCallback
callback) throws SQLException {
int count = 0;
- try (Statement stmt = connection.createStatement()) {
+ try (Connection connection = getConnection(true);
+ Statement stmt = connection.createStatement()) {
try (ResultSet rs = stmt.executeQuery(query)) {
while (rs.next()) {
callback.read(rs);
@@ -103,9 +101,9 @@ public final class SqliteUtils {
}
public void execute(String sql) throws SQLException {
- try (Statement stmt = connection.createStatement()) {
+ try (Connection connection = getConnection(true);
+ Statement stmt = connection.createStatement()) {
stmt.executeUpdate(sql);
- connection.commit();
}
}
diff --git a/site2/docs/io-jdbc-sink.md b/site2/docs/io-jdbc-sink.md
index 87b06c544c4..abc028132c3 100644
--- a/site2/docs/io-jdbc-sink.md
+++ b/site2/docs/io-jdbc-sink.md
@@ -8,6 +8,7 @@ The JDBC sink connectors allow pulling messages from Pulsar
topics
and persists the messages to ClickHouse, MariaDB, PostgreSQL, and SQLite.
> Currently, INSERT, DELETE and UPDATE operations are supported.
+> SQLite, MariaDB and PostgreSQL also support UPSERT operations and idempotent
writes.
## Configuration
@@ -25,6 +26,8 @@ The configuration of all JDBC sink connectors has the
following properties.
| `key` | String | false | " " (empty string) | A comma-separated
list containing the fields used in `where` condition of updating and deleting
events. |
| `timeoutMs` | int | false | 500 | The JDBC operation
timeout in milliseconds.
|
| `batchSize` | int | false | 200 | The batch size of
updates made to the database.
|
+| `insertMode` | enum( INSERT,UPSERT,UPDATE) | false | INSERT | If it is
configured as UPSERT, the sink uses upsert semantics rather than plain
INSERT/UPDATE statements. Upsert semantics refer to atomically adding a new row
or updating the existing row if there is a primary key constraint violation,
which provides idempotence. |
+| `nullValueAction` | enum(FAIL, DELETE) | false | FAIL | How to handle
records with NULL values. Possible options are `DELETE` or `FAIL`. |
### Example for ClickHouse