This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ac41cfe35f3 [feature][connector] JDBC sinks: support upsert and row 
deletion (#16448)
ac41cfe35f3 is described below

commit ac41cfe35f3c3e34c0064e7a6f06fddd029d16f0
Author: Nicolò Boschi <[email protected]>
AuthorDate: Thu Jul 14 14:37:15 2022 +0200

    [feature][connector] JDBC sinks: support upsert and row deletion (#16448)
---
 pom.xml                                            |   2 +-
 .../pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java     | 123 +++++---
 .../apache/pulsar/io/jdbc/JdbcAbstractSink.java    |  88 ++++--
 .../org/apache/pulsar/io/jdbc/JdbcSinkConfig.java  |  29 ++
 .../java/org/apache/pulsar/io/jdbc/JdbcUtils.java  |  11 +-
 .../pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java |  18 +-
 .../pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java  |   9 +
 .../pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java |   9 +
 .../pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java   |   9 +
 .../org/apache/pulsar/io/jdbc/JdbcUtilsTest.java   |  91 +++---
 .../apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java  | 317 +++++++++++++++++----
 .../org/apache/pulsar/io/jdbc/SqliteUtils.java     |  16 +-
 site2/docs/io-jdbc-sink.md                         |   3 +
 13 files changed, 558 insertions(+), 167 deletions(-)

diff --git a/pom.xml b/pom.xml
index d1b78b5c597..37cb0e02d69 100644
--- a/pom.xml
+++ b/pom.xml
@@ -158,7 +158,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <joda.version>2.10.5</joda.version>
     <jclouds.version>2.5.0</jclouds.version>
     <guice.version>5.1.0</guice.version>
-    <sqlite-jdbc.version>3.8.11.2</sqlite-jdbc.version>
+    <sqlite-jdbc.version>3.36.0.3</sqlite-jdbc.version>
     <mysql-jdbc.version>8.0.11</mysql-jdbc.version>
     <postgresql-jdbc.version>42.3.3</postgresql-jdbc.version>
     <clickhouse-jdbc.version>0.3.2</clickhouse-jdbc.version>
diff --git 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
index e5efb15362a..17cfe4cd97b 100644
--- 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
+++ 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
@@ -21,8 +21,8 @@ package org.apache.pulsar.io.jdbc;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
 import java.sql.PreparedStatement;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -44,37 +44,28 @@ import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;
 public abstract class BaseJdbcAutoSchemaSink extends 
JdbcAbstractSink<GenericObject> {
 
     @Override
-    public void bindValue(PreparedStatement statement,
-                          Record<GenericObject> message, String action) throws 
Exception {
-        final GenericObject record = message.getValue();
-        Function<String, Object> recordValueGetter;
-        if (message.getSchema() != null && message.getSchema() instanceof 
KeyValueSchema) {
-            KeyValueSchema<GenericObject, GenericObject> keyValueSchema = 
(KeyValueSchema) message.getSchema();
-
-            final org.apache.pulsar.client.api.Schema<GenericObject> keySchema 
= keyValueSchema.getKeySchema();
-            final org.apache.pulsar.client.api.Schema<GenericObject> 
valueSchema = keyValueSchema.getValueSchema();
-            KeyValue<GenericObject, GenericObject> keyValue =
-                    (KeyValue<GenericObject, GenericObject>) 
record.getNativeObject();
-
-            final GenericObject key = keyValue.getKey();
-            final GenericObject value = keyValue.getValue();
-
-            Map<String, Object> data = new HashMap<>();
-            fillKeyValueSchemaData(keySchema, key, data);
-            fillKeyValueSchemaData(valueSchema, value, data);
-            recordValueGetter = (k) -> data.get(k);
-        } else {
-            recordValueGetter = (key) -> ((GenericRecord) 
record).getField(key);
-        }
+    public String generateUpsertQueryStatement() {
+        throw new IllegalStateException("UPSERT not supported");
+    }
 
-        List<ColumnId> columns = Lists.newArrayList();
-        if (action == null || action.equals(INSERT)) {
-            columns = tableDefinition.getColumns();
-        } else if (action.equals(DELETE)){
-            columns.addAll(tableDefinition.getKeyColumns());
-        } else if (action.equals(UPDATE)){
-            columns.addAll(tableDefinition.getNonKeyColumns());
-            columns.addAll(tableDefinition.getKeyColumns());
+    @Override
+    public void bindValue(PreparedStatement statement, Mutation mutation) 
throws Exception {
+        final List<ColumnId> columns = new ArrayList<>();
+        switch (mutation.getType()) {
+            case INSERT:
+                columns.addAll(tableDefinition.getColumns());
+                break;
+            case UPSERT:
+                columns.addAll(tableDefinition.getColumns());
+                columns.addAll(tableDefinition.getNonKeyColumns());
+                break;
+            case UPDATE:
+                columns.addAll(tableDefinition.getNonKeyColumns());
+                columns.addAll(tableDefinition.getKeyColumns());
+                break;
+            case DELETE:
+                columns.addAll(tableDefinition.getKeyColumns());
+                break;
         }
 
         int index = 1;
@@ -82,10 +73,10 @@ public abstract class BaseJdbcAutoSchemaSink extends 
JdbcAbstractSink<GenericObj
             String colName = columnId.getName();
             int colType = columnId.getType();
             if (log.isDebugEnabled()) {
-                log.debug("colName: {} colType: {}", colName, colType);
+                log.debug("getting value for column: {} type: {}", colName, 
colType);
             }
             try {
-                Object obj = recordValueGetter.apply(colName);
+                Object obj = mutation.getValues().apply(colName);
                 if (obj != null) {
                     setColumnValue(statement, index++, obj);
                 } else {
@@ -105,6 +96,66 @@ public abstract class BaseJdbcAutoSchemaSink extends 
JdbcAbstractSink<GenericObj
         }
     }
 
+    @Override
+    public Mutation createMutation(Record<GenericObject> message) {
+        final GenericObject record = message.getValue();
+        Function<String, Object> recordValueGetter;
+        MutationType mutationType = null;
+        if (message.getSchema() != null && message.getSchema() instanceof 
KeyValueSchema) {
+            KeyValueSchema<GenericObject, GenericObject> keyValueSchema = 
(KeyValueSchema) message.getSchema();
+
+            final org.apache.pulsar.client.api.Schema<GenericObject> keySchema 
= keyValueSchema.getKeySchema();
+            final org.apache.pulsar.client.api.Schema<GenericObject> 
valueSchema = keyValueSchema.getValueSchema();
+            KeyValue<GenericObject, GenericObject> keyValue =
+                    (KeyValue<GenericObject, GenericObject>) 
record.getNativeObject();
+
+            final GenericObject key = keyValue.getKey();
+            final GenericObject value = keyValue.getValue();
+
+            boolean isDelete = false;
+            if (value == null) {
+                switch (jdbcSinkConfig.getNullValueAction()) {
+                    case DELETE:
+                        isDelete = true;
+                        break;
+                    case FAIL:
+                        throw new IllegalArgumentException("Got record with 
value NULL with nullValueAction=FAIL");
+                    default:
+                        break;
+                }
+            }
+            Map<String, Object> data = new HashMap<>();
+            fillKeyValueSchemaData(keySchema, key, data);
+            if (isDelete) {
+                mutationType = MutationType.DELETE;
+            } else {
+                fillKeyValueSchemaData(valueSchema, value, data);
+            }
+            recordValueGetter = (k) -> data.get(k);
+        } else {
+            recordValueGetter = (key) -> ((GenericRecord) 
record).getField(key);
+        }
+        String action = message.getProperties().get(ACTION_PROPERTY);
+        if (action != null) {
+            mutationType = MutationType.valueOf(action);
+        } else if (mutationType == null) {
+            switch (jdbcSinkConfig.getInsertMode()) {
+                case INSERT:
+                    mutationType = MutationType.INSERT;
+                    break;
+                case UPSERT:
+                    mutationType = MutationType.UPSERT;
+                    break;
+                case UPDATE:
+                    mutationType = MutationType.UPDATE;
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown insert mode: " 
+ jdbcSinkConfig.getInsertMode());
+            }
+        }
+        return new Mutation(mutationType, recordValueGetter);
+    }
+
     private static void setColumnNull(PreparedStatement statement, int index, 
int type) throws Exception {
         if (log.isDebugEnabled()) {
             log.debug("Setting column value to null, statement: {}, index: 
{}", statement.toString(), index);
@@ -163,6 +214,9 @@ public abstract class BaseJdbcAutoSchemaSink extends 
JdbcAbstractSink<GenericObj
     private static void 
fillKeyValueSchemaData(org.apache.pulsar.client.api.Schema<GenericObject> 
schema,
                                         GenericObject record,
                                         Map<String, Object> data) {
+        if (record == null) {
+            return;
+        }
         switch (schema.getSchemaInfo().getType()) {
             case JSON:
                 final JsonNode jsonNode = (JsonNode) record.getNativeObject();
@@ -190,6 +244,9 @@ public abstract class BaseJdbcAutoSchemaSink extends 
JdbcAbstractSink<GenericObj
 
     @VisibleForTesting
     static Object convertAvroField(Object avroValue, Schema schema) {
+        if (avroValue == null) {
+            return null;
+        }
         switch (schema.getType()) {
             case NULL:
             case INT:
diff --git 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index 0fba57dcb29..8c8febb6d31 100644
--- 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++ 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -31,6 +31,9 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import lombok.AllArgsConstructor;
+import lombok.Data;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.api.Record;
@@ -43,7 +46,7 @@ import org.apache.pulsar.io.core.SinkContext;
 @Slf4j
 public abstract class JdbcAbstractSink<T> implements Sink<T> {
     // ----- Runtime fields
-    private JdbcSinkConfig jdbcSinkConfig;
+    protected JdbcSinkConfig jdbcSinkConfig;
     @Getter
     private Connection connection;
     private String jdbcUrl;
@@ -52,13 +55,11 @@ public abstract class JdbcAbstractSink<T> implements 
Sink<T> {
     private JdbcUtils.TableId tableId;
     private PreparedStatement insertStatement;
     private PreparedStatement updateStatement;
+    private PreparedStatement upsertStatement;
     private PreparedStatement deleteStatement;
 
 
-    protected static final String ACTION = "ACTION";
-    protected static final String INSERT = "INSERT";
-    protected static final String UPDATE = "UPDATE";
-    protected static final String DELETE = "DELETE";
+    protected static final String ACTION_PROPERTY = "ACTION";
 
     protected JdbcUtils.TableDefinition tableDefinition;
 
@@ -122,12 +123,15 @@ public abstract class JdbcAbstractSink<T> implements 
Sink<T> {
         }
 
         tableDefinition = JdbcUtils.getTableDefinition(connection, tableId, 
keyList, nonKeyList);
-        insertStatement = JdbcUtils.buildInsertStatement(connection, 
JdbcUtils.buildInsertSql(tableDefinition));
+        insertStatement = JdbcUtils.buildInsertStatement(connection, 
generateInsertQueryStatement());
+        if (jdbcSinkConfig.getInsertMode() == 
JdbcSinkConfig.InsertMode.UPSERT) {
+            upsertStatement = JdbcUtils.buildInsertStatement(connection, 
generateUpsertQueryStatement());
+        }
         if (!nonKeyList.isEmpty()) {
-            updateStatement = JdbcUtils.buildUpdateStatement(connection, 
JdbcUtils.buildUpdateSql(tableDefinition));
+            updateStatement = JdbcUtils.buildUpdateStatement(connection, 
generateUpdateQueryStatement());
         }
         if (!keyList.isEmpty()) {
-            deleteStatement = JdbcUtils.buildDeleteStatement(connection, 
JdbcUtils.buildDeleteSql(tableDefinition));
+            deleteStatement = JdbcUtils.buildDeleteStatement(connection, 
generateDeleteQueryStatement());
         }
     }
 
@@ -136,6 +140,18 @@ public abstract class JdbcAbstractSink<T> implements 
Sink<T> {
         if (connection != null && !connection.getAutoCommit()) {
             connection.commit();
         }
+        if (insertStatement != null) {
+            insertStatement.close();
+        }
+        if (updateStatement != null) {
+            updateStatement.close();
+        }
+        if (upsertStatement != null) {
+            upsertStatement.close();
+        }
+        if (deleteStatement != null) {
+            deleteStatement.close();
+        }
         if (flushExecutor != null) {
             flushExecutor.shutdown();
             flushExecutor = null;
@@ -159,10 +175,40 @@ public abstract class JdbcAbstractSink<T> implements 
Sink<T> {
         }
     }
 
+    public String generateInsertQueryStatement() {
+        return JdbcUtils.buildInsertSql(tableDefinition);
+    }
+
+    public String generateUpdateQueryStatement() {
+        return JdbcUtils.buildUpdateSql(tableDefinition);
+    }
+
+    public abstract String generateUpsertQueryStatement();
+
+    public String generateDeleteQueryStatement() {
+        return JdbcUtils.buildDeleteSql(tableDefinition);
+    }
+
     // bind value with a PreparedStetement
     public abstract void bindValue(
         PreparedStatement statement,
-        Record<T> message, String action) throws Exception;
+        Mutation mutation) throws Exception;
+
+    public abstract Mutation createMutation(Record<T> message);
+
+    @Data
+    @AllArgsConstructor
+    protected static class Mutation {
+        private MutationType type;
+        private Function<String, Object> values;
+    }
+    protected enum MutationType {
+        INSERT,
+        UPDATE,
+        UPSERT,
+        DELETE
+    }
+
 
     private void flush() {
         // if not in flushing state, do flush, else return;
@@ -187,42 +233,44 @@ public abstract class JdbcAbstractSink<T> implements 
Sink<T> {
             try {
                 // bind each record value
                 for (Record<T> record : swapList) {
-                    String action = record.getProperties().get(ACTION);
-                    if (action == null) {
-                        action = INSERT;
-                    }
-                    switch (action) {
+                    final Mutation mutation = createMutation(record);
+                    switch (mutation.getType()) {
                         case DELETE:
-                            bindValue(deleteStatement, record, action);
+                            bindValue(deleteStatement, mutation);
                             count += 1;
                             deleteStatement.execute();
                             break;
                         case UPDATE:
-                            bindValue(updateStatement, record, action);
+                            bindValue(updateStatement, mutation);
                             count += 1;
                             updateStatement.execute();
                             break;
                         case INSERT:
-                            bindValue(insertStatement, record, action);
+                            bindValue(insertStatement, mutation);
                             count += 1;
                             insertStatement.execute();
                             break;
+                        case UPSERT:
+                            bindValue(upsertStatement, mutation);
+                            count += 1;
+                            upsertStatement.execute();
+                            break;
                         default:
                             String msg = String.format(
                                     "Unsupported action %s, can be one of %s, 
or not set which indicate %s",
-                                    action, Arrays.asList(INSERT, UPDATE, 
DELETE), INSERT);
+                                    mutation.getType(), 
Arrays.toString(MutationType.values()), MutationType.INSERT);
                             throw new IllegalArgumentException(msg);
                     }
                 }
                 connection.commit();
                 swapList.forEach(Record::ack);
             } catch (Exception e) {
-                log.error("Got exception ", e);
+                log.error("Got exception ", e.getMessage(), e);
                 swapList.forEach(Record::fail);
             }
 
             if (swapList.size() != count) {
-                log.error("Update count {}  not match total number of records 
{}", count, swapList.size());
+                log.error("Update count {} not match total number of records 
{}", count, swapList.size());
             }
 
             // finish flush
diff --git 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
index acd35047b01..89e48980858 100644
--- 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
+++ 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
@@ -86,6 +86,35 @@ public class JdbcSinkConfig implements Serializable {
     )
     private int batchSize = 200;
 
+    @FieldDoc(
+            required = false,
+            defaultValue = "INSERT",
+            help = "If it is configured as UPSERT, the sink will use upsert 
semantics rather than "
+                    + "plain INSERT/UPDATE statements. Upsert semantics refer 
to atomically adding a new row or "
+                    + "updating the existing row if there is a primary key 
constraint violation, "
+                    + "which provides idempotence."
+    )
+    private InsertMode insertMode = InsertMode.INSERT;
+
+    @FieldDoc(
+            required = false,
+            defaultValue = "FAIL",
+            help = "How to handle records with null values, possible options 
are DELETE or FAIL."
+    )
+    private NullValueAction nullValueAction = NullValueAction.FAIL;
+
+    public enum InsertMode {
+        INSERT,
+        UPSERT,
+        UPDATE;
+    }
+
+    public enum NullValueAction {
+        FAIL,
+        DELETE
+    }
+
+
     public static JdbcSinkConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
         return mapper.readValue(new File(yamlFile), JdbcSinkConfig.class);
diff --git 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
index 96a39016c0b..637de0be96f 100644
--- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
+++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
@@ -198,6 +198,13 @@ public class JdbcUtils {
         builder.append("UPDATE ");
         builder.append(table.tableId.getTableName());
         builder.append(" SET ");
+        StringJoiner setJoiner = buildUpdateSqlSetPart(table);
+        builder.append(setJoiner);
+        builder.append(combationWhere(table.keyColumns));
+        return builder.toString();
+    }
+
+    public static StringJoiner buildUpdateSqlSetPart(TableDefinition table) {
         StringJoiner setJoiner = new StringJoiner(",");
 
         table.nonKeyColumns.forEach((columnId) ->{
@@ -205,9 +212,7 @@ public class JdbcUtils {
             equals.add(columnId.getName()).add("? ");
             setJoiner.add(equals.toString());
         });
-        builder.append(setJoiner.toString());
-        builder.append(combationWhere(table.keyColumns));
-        return builder.toString();
+        return setJoiner;
     }
 
     public static PreparedStatement buildUpdateStatement(Connection 
connection, String updateSQL) throws SQLException {
diff --git 
a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
 
b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
index de31466466d..459468facf2 100644
--- 
a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
+++ 
b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
@@ -93,20 +93,20 @@ public class BaseJdbcAutoSchemaSinkTest {
     @Test(expectedExceptions = UnsupportedOperationException.class,
             expectedExceptionsMessageRegExp = "Unsupported avro schema type.*")
     public void testNotSupportedAvroTypesBytes() {
-        BaseJdbcAutoSchemaSink.convertAvroField(null, 
createFieldAndGetSchema((builder) ->
+        BaseJdbcAutoSchemaSink.convertAvroField(new Object(), 
createFieldAndGetSchema((builder) ->
                 builder.name("field").type().bytesType().noDefault()));
     }
 
     @Test(expectedExceptions = UnsupportedOperationException.class,
             expectedExceptionsMessageRegExp = "Unsupported avro schema type.*")
     public void testNotSupportedAvroTypesFixed() {
-        BaseJdbcAutoSchemaSink.convertAvroField(null, 
createFieldAndGetSchema((builder) ->
+        BaseJdbcAutoSchemaSink.convertAvroField(new Object(), 
createFieldAndGetSchema((builder) ->
                 
builder.name("field").type().fixed("fix").size(16).noDefault()));
     }
     @Test(expectedExceptions = UnsupportedOperationException.class,
             expectedExceptionsMessageRegExp = "Unsupported avro schema type.*")
     public void testNotSupportedAvroTypesRecord() {
-        BaseJdbcAutoSchemaSink.convertAvroField(null, 
createFieldAndGetSchema((builder) ->
+        BaseJdbcAutoSchemaSink.convertAvroField(new Object(), 
createFieldAndGetSchema((builder) ->
                 builder.name("field").type()
                         .record("myrecord").fields()
                         .name("f1").type().intType().noDefault()
@@ -116,7 +116,7 @@ public class BaseJdbcAutoSchemaSinkTest {
     @Test(expectedExceptions = UnsupportedOperationException.class,
             expectedExceptionsMessageRegExp = "Unsupported avro schema type.*")
     public void testNotSupportedAvroTypesMap() {
-        BaseJdbcAutoSchemaSink.convertAvroField(null, 
createFieldAndGetSchema((builder) ->
+        BaseJdbcAutoSchemaSink.convertAvroField(new Object(), 
createFieldAndGetSchema((builder) ->
                 
builder.name("field").type().map().values().stringType().noDefault()));
     }
 
@@ -124,11 +124,19 @@ public class BaseJdbcAutoSchemaSinkTest {
     @Test(expectedExceptions = UnsupportedOperationException.class,
             expectedExceptionsMessageRegExp = "Unsupported avro schema type.*")
     public void testNotSupportedAvroTypesArray() {
-        BaseJdbcAutoSchemaSink.convertAvroField(null, 
createFieldAndGetSchema((builder) ->
+        BaseJdbcAutoSchemaSink.convertAvroField(new Object(), 
createFieldAndGetSchema((builder) ->
                 
builder.name("field").type().array().items().stringType().noDefault()));
     }
 
 
+    @Test
+    public void testConvertAvroNullValue() {
+        Object converted = BaseJdbcAutoSchemaSink.convertAvroField(null, 
createFieldAndGetSchema((builder) ->
+                builder.name("field").type().stringType().noDefault()));
+        Assert.assertNull(converted);
+    }
+
+
     private Schema 
createFieldAndGetSchema(Function<SchemaBuilder.FieldAssembler<Schema>,
             SchemaBuilder.FieldAssembler<Schema>> consumer) {
         final SchemaBuilder.FieldAssembler<Schema> record = 
SchemaBuilder.record("record")
diff --git 
a/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
 
b/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
index 3fc45bb3f7b..5c232db1aa0 100644
--- 
a/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
+++ 
b/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.io.jdbc;
 
+import java.util.stream.Collectors;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
 
@@ -29,4 +30,12 @@ import org.apache.pulsar.io.core.annotations.IOType;
 )
 public class MariadbJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
 
+    @Override
+    public String generateUpsertQueryStatement() {
+        final String keys = 
tableDefinition.getKeyColumns().stream().map(JdbcUtils.ColumnId::getName)
+                .collect(Collectors.joining(","));
+        return JdbcUtils.buildInsertSql(tableDefinition)
+                + "ON DUPLICATE KEY UPDATE " + 
JdbcUtils.buildUpdateSqlSetPart(tableDefinition);
+    }
+
 }
diff --git 
a/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
 
b/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
index 9ed122eb3bb..d29e3e3aaca 100644
--- 
a/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
+++ 
b/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.io.jdbc;
 
+import java.util.stream.Collectors;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
 
@@ -29,4 +30,12 @@ import org.apache.pulsar.io.core.annotations.IOType;
 )
 public class PostgresJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
 
+    @Override
+    public String generateUpsertQueryStatement() {
+        final String keys = 
tableDefinition.getKeyColumns().stream().map(JdbcUtils.ColumnId::getName)
+                .collect(Collectors.joining(","));
+        return JdbcUtils.buildInsertSql(tableDefinition)
+                + " ON CONFLICT(" + keys + ") "
+                + "DO UPDATE SET " + 
JdbcUtils.buildUpdateSqlSetPart(tableDefinition);
+    }
 }
diff --git 
a/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java
 
b/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java
index 8cfd3f0962e..76655397003 100644
--- 
a/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java
+++ 
b/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.io.jdbc;
 
+import java.util.stream.Collectors;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
 
@@ -29,4 +30,12 @@ import org.apache.pulsar.io.core.annotations.IOType;
 )
 public class SqliteJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
 
+    @Override
+    public String generateUpsertQueryStatement() {
+        final String keys = 
tableDefinition.getKeyColumns().stream().map(JdbcUtils.ColumnId::getName)
+                .collect(Collectors.joining(","));
+        return JdbcUtils.buildInsertSql(tableDefinition)
+                + " ON CONFLICT(" + keys + ") "
+                + "DO UPDATE SET " + 
JdbcUtils.buildUpdateSqlSetPart(tableDefinition);
+    }
 }
diff --git 
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java
 
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java
index b3ea296bf6f..deb2fb88aa8 100644
--- 
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java
+++ 
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java
@@ -69,53 +69,54 @@ public class JdbcUtilsTest {
                 "PRIMARY KEY (firstName, lastName));"
         );
 
-        Connection connection = sqliteUtils.getConnection();
+        try (Connection connection = sqliteUtils.getConnection(true);) {
 
-        // Test getTableId
-        log.info("verify getTableId");
-        TableId id = JdbcUtils.getTableId(connection, tableName);
-        Assert.assertEquals(id.getTableName(), tableName);
+            // Test getTableId
+            log.info("verify getTableId");
+            TableId id = JdbcUtils.getTableId(connection, tableName);
+            Assert.assertEquals(id.getTableName(), tableName);
 
-        // Test get getTableDefinition
-        log.info("verify getTableDefinition");
-        List<String> keyList = Lists.newArrayList();
-        keyList.add("firstName");
-        keyList.add("lastName");
-        List<String> nonKeyList = Lists.newArrayList();
-        nonKeyList.add("age");
-        nonKeyList.add("long");
-        TableDefinition table = JdbcUtils.getTableDefinition(connection, id, 
keyList, nonKeyList);
-        Assert.assertEquals(table.getColumns().get(0).getName(), "firstName");
-        Assert.assertEquals(table.getColumns().get(0).getTypeName(), "TEXT");
-        Assert.assertEquals(table.getColumns().get(2).getName(), "age");
-        Assert.assertEquals(table.getColumns().get(2).getTypeName(), 
"INTEGER");
-        Assert.assertEquals(table.getColumns().get(7).getName(), "float");
-        Assert.assertEquals(table.getColumns().get(7).getTypeName(), 
"NUMERIC");
-        Assert.assertEquals(table.getKeyColumns().get(0).getName(), 
"firstName");
-        Assert.assertEquals(table.getKeyColumns().get(0).getTypeName(), 
"TEXT");
-        Assert.assertEquals(table.getKeyColumns().get(1).getName(), 
"lastName");
-        Assert.assertEquals(table.getKeyColumns().get(1).getTypeName(), 
"TEXT");
-        Assert.assertEquals(table.getNonKeyColumns().get(0).getName(), "age");
-        Assert.assertEquals(table.getNonKeyColumns().get(0).getTypeName(), 
"INTEGER");
-        Assert.assertEquals(table.getNonKeyColumns().get(1).getName(), "long");
-        Assert.assertEquals(table.getNonKeyColumns().get(1).getTypeName(), 
"INTEGER");
-        // Test get getTableDefinition
-        log.info("verify buildInsertSql");
-        String expctedInsertStatement = "INSERT INTO " + tableName +
-            "(firstName,lastName,age,bool,byte,short,long,float,double,bytes)" 
+
-            " VALUES(?,?,?,?,?,?,?,?,?,?)";
-        String insertStatement = JdbcUtils.buildInsertSql(table);
-        Assert.assertEquals(insertStatement, expctedInsertStatement);
-        log.info("verify buildUpdateSql");
-        String expectedUpdateStatement = "UPDATE " + tableName +
-                " SET age=? ,long=?  WHERE firstName=? AND lastName=?";
-        String updateStatement = JdbcUtils.buildUpdateSql(table);
-        Assert.assertEquals(updateStatement, expectedUpdateStatement);
-        log.info("verify buildDeleteSql");
-        String expectedDeleteStatement = "DELETE FROM " + tableName +
-                " WHERE firstName=? AND lastName=?";
-        String deleteStatement = JdbcUtils.buildDeleteSql(table);
-        Assert.assertEquals(deleteStatement, expectedDeleteStatement);
+            // Test get getTableDefinition
+            log.info("verify getTableDefinition");
+            List<String> keyList = Lists.newArrayList();
+            keyList.add("firstName");
+            keyList.add("lastName");
+            List<String> nonKeyList = Lists.newArrayList();
+            nonKeyList.add("age");
+            nonKeyList.add("long");
+            TableDefinition table = JdbcUtils.getTableDefinition(connection, 
id, keyList, nonKeyList);
+            Assert.assertEquals(table.getColumns().get(0).getName(), 
"firstName");
+            Assert.assertEquals(table.getColumns().get(0).getTypeName(), 
"TEXT");
+            Assert.assertEquals(table.getColumns().get(2).getName(), "age");
+            Assert.assertEquals(table.getColumns().get(2).getTypeName(), 
"INTEGER");
+            Assert.assertEquals(table.getColumns().get(7).getName(), "float");
+            Assert.assertEquals(table.getColumns().get(7).getTypeName(), 
"NUMERIC");
+            Assert.assertEquals(table.getKeyColumns().get(0).getName(), 
"firstName");
+            Assert.assertEquals(table.getKeyColumns().get(0).getTypeName(), 
"TEXT");
+            Assert.assertEquals(table.getKeyColumns().get(1).getName(), 
"lastName");
+            Assert.assertEquals(table.getKeyColumns().get(1).getTypeName(), 
"TEXT");
+            Assert.assertEquals(table.getNonKeyColumns().get(0).getName(), 
"age");
+            Assert.assertEquals(table.getNonKeyColumns().get(0).getTypeName(), 
"INTEGER");
+            Assert.assertEquals(table.getNonKeyColumns().get(1).getName(), 
"long");
+            Assert.assertEquals(table.getNonKeyColumns().get(1).getTypeName(), 
"INTEGER");
+            // Test get getTableDefinition
+            log.info("verify buildInsertSql");
+            String expctedInsertStatement = "INSERT INTO " + tableName +
+                    
"(firstName,lastName,age,bool,byte,short,long,float,double,bytes)" +
+                    " VALUES(?,?,?,?,?,?,?,?,?,?)";
+            String insertStatement = JdbcUtils.buildInsertSql(table);
+            Assert.assertEquals(insertStatement, expctedInsertStatement);
+            log.info("verify buildUpdateSql");
+            String expectedUpdateStatement = "UPDATE " + tableName +
+                    " SET age=? ,long=?  WHERE firstName=? AND lastName=?";
+            String updateStatement = JdbcUtils.buildUpdateSql(table);
+            Assert.assertEquals(updateStatement, expectedUpdateStatement);
+            log.info("verify buildDeleteSql");
+            String expectedDeleteStatement = "DELETE FROM " + tableName +
+                    " WHERE firstName=? AND lastName=?";
+            String deleteStatement = JdbcUtils.buildDeleteSql(table);
+            Assert.assertEquals(deleteStatement, expectedDeleteStatement);
+        }
     }
 
 }
diff --git 
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
 
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
index 1ce3bc5b353..cb1ca844249 100644
--- 
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
+++ 
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
@@ -19,8 +19,20 @@
 
 package org.apache.pulsar.io.jdbc;
 
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.util.Utf8;
@@ -47,22 +59,12 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
-
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 /**
  * Jdbc Sink test
  */
 @Slf4j
 public class SqliteJdbcSinkTest {
-    private final SqliteUtils sqliteUtils = new 
SqliteUtils(getClass().getSimpleName());
+    private final SqliteUtils sqliteUtils = new 
SqliteUtils(UUID.randomUUID().toString());
     private BaseJdbcAutoSchemaSink jdbcSink;
     private final String tableName = "TestOpenAndWriteSink";
 
@@ -409,14 +411,87 @@ public class SqliteJdbcSinkTest {
         Assert.assertEquals(sqliteUtils.select(deleteQuerySql, (resultSet) -> 
{}), 0);
     }
 
-    @DataProvider(name = "schemaType")
+    private static class MockKeyValueGenericRecord implements 
Record<GenericObject> {
+
+        public MockKeyValueGenericRecord(Schema<KeyValue<GenericRecord, 
GenericRecord>> keyValueSchema) {
+            this.keyValueSchema = keyValueSchema;
+        }
+
+        int ackCount;
+        int failCount;
+        AtomicReference<KeyValue> keyValueHolder = new AtomicReference<>();
+        Schema<KeyValue<GenericRecord, GenericRecord>> keyValueSchema;
+
+        private final GenericObject genericObject = new GenericObject() {
+            @Override
+            public SchemaType getSchemaType() {
+                return SchemaType.KEY_VALUE;
+            }
+
+            @Override
+            public Object getNativeObject() {
+                return keyValueHolder.get();
+            }
+        };
+        @Override
+        public Optional<String> getTopicName() {
+            return Optional.of("topic");
+        }
+
+        @Override
+        public org.apache.pulsar.client.api.Schema getSchema() {
+            return keyValueSchema;
+        }
+
+        @Override
+        public GenericObject getValue() {
+            return genericObject;
+        }
+
+        public void setKeyValue(KeyValue kv) {
+            keyValueHolder.set(kv);
+        }
+
+        @Override
+        public void ack() {
+            ackCount++;
+        }
+
+        @Override
+        public void fail() {
+            failCount++;
+        }
+    }
+
+    @Data
+    @AllArgsConstructor
+    static class InsertModeTestConfig {
+        SchemaType schemaType;
+        JdbcSinkConfig.InsertMode insertMode;
+    }
+
+    @DataProvider(name = "insertModes")
     public Object[] schemaType() {
-        return new Object[]{SchemaType.JSON, SchemaType.AVRO};
+        List<SchemaType> schemas = Arrays.asList(SchemaType.JSON, 
SchemaType.AVRO);
+        JdbcSinkConfig.InsertMode[] insertModes = 
JdbcSinkConfig.InsertMode.values();
+        Object[] result = new Object[schemas.size() * insertModes.length];
+        int i = 0;
+        for (SchemaType schema : schemas) {
+            for (JdbcSinkConfig.InsertMode insertMode : insertModes) {
+                result[i++] = new InsertModeTestConfig(schema, insertMode);
+            }
+        }
+        return result;
     }
 
 
-    @Test(dataProvider = "schemaType")
-    public void testKeyValueSchema(SchemaType schemaType) throws Exception {
+    @Test(dataProvider = "insertModes")
+    public void testInsertMode(InsertModeTestConfig config) throws Exception {
+        jdbcSink.close();
+        final JdbcSinkConfig.InsertMode insertMode = config.getInsertMode();
+        final SchemaType schemaType = config.getSchemaType();
+
+        final String tableName = "kvtable_insertmode";
         RecordSchemaBuilder keySchemaBuilder = 
org.apache.pulsar.client.api.schema.SchemaBuilder.record("key");
         
keySchemaBuilder.field("key").type(SchemaType.STRING).optional().defaultValue(null);
         GenericSchema<GenericRecord> keySchema = 
Schema.generic(keySchemaBuilder.build(schemaType));
@@ -427,6 +502,7 @@ public class SqliteJdbcSinkTest {
         RecordSchemaBuilder valueSchemaBuilder = 
org.apache.pulsar.client.api.schema.SchemaBuilder.record("value");
         
valueSchemaBuilder.field("string").type(SchemaType.STRING).optional().defaultValue(null);
         
valueSchemaBuilder.field("stringutf8").type(SchemaType.STRING).optional().defaultValue(null);
+        
valueSchemaBuilder.field("nulltext").type(SchemaType.STRING).optional().defaultValue(null);
         
valueSchemaBuilder.field("int").type(SchemaType.INT32).optional().defaultValue(null);
         
valueSchemaBuilder.field("bool").type(SchemaType.BOOLEAN).optional().defaultValue(null);
         
valueSchemaBuilder.field("double").type(SchemaType.DOUBLE).optional().defaultValue(null);
@@ -434,7 +510,9 @@ public class SqliteJdbcSinkTest {
         
valueSchemaBuilder.field("long").type(SchemaType.INT64).optional().defaultValue(null);
         GenericSchema<GenericRecord> valueSchema = 
Schema.generic(valueSchemaBuilder.build(schemaType));
 
-        GenericRecord valueGenericRecord = valueSchema.newRecordBuilder()
+        Schema<KeyValue<GenericRecord, GenericRecord>> keyValueSchema = 
Schema.KeyValue(keySchema, valueSchema, KeyValueEncodingType.INLINE);
+        MockKeyValueGenericRecord genericObjectRecord = new 
MockKeyValueGenericRecord(keyValueSchema);
+        genericObjectRecord.setKeyValue(new KeyValue<>(keyGenericRecord, 
valueSchema.newRecordBuilder()
                 .set("string", "thestring")
                 .set("stringutf8", schemaType == SchemaType.AVRO ? new 
Utf8("thestringutf8"): "thestringutf8")
                 .set("int", Integer.MAX_VALUE)
@@ -442,40 +520,10 @@ public class SqliteJdbcSinkTest {
                 .set("double", Double.MAX_VALUE)
                 .set("float", Float.MAX_VALUE)
                 .set("long", Long.MIN_VALUE)
-                .build();
-
-        Schema<KeyValue<GenericRecord, GenericRecord>> keyValueSchema = 
Schema.KeyValue(keySchema, valueSchema, KeyValueEncodingType.INLINE);
-        KeyValue<GenericRecord, GenericRecord> keyValue = new 
KeyValue<>(keyGenericRecord, valueGenericRecord);
-        GenericObject genericObject = new GenericObject() {
-            @Override
-            public SchemaType getSchemaType() {
-                return SchemaType.KEY_VALUE;
-            }
-
-            @Override
-            public Object getNativeObject() {
-                return keyValue;
-            }
-        };
-        Record<GenericObject> genericObjectRecord = new Record<>() {
-            @Override
-            public Optional<String> getTopicName() {
-                return Optional.of("topic");
-            }
-
-            @Override
-            public org.apache.pulsar.client.api.Schema getSchema() {
-                return keyValueSchema;
-            }
+                .build()));
 
-            @Override
-            public GenericObject getValue() {
-                return genericObject;
-            }
-        };
-        jdbcSink.close();
         sqliteUtils.createTable(
-                "CREATE TABLE kvtable (" +
+                "CREATE TABLE " + tableName + " (" +
                         "    key  TEXT," +
                         "    int  INTEGER," +
                         "    string TEXT," +
@@ -491,18 +539,19 @@ public class SqliteJdbcSinkTest {
 
         Map<String, Object> conf = Maps.newHashMap();
         conf.put("jdbcUrl", jdbcUrl);
-        conf.put("tableName", "kvtable");
+        conf.put("tableName", tableName);
         conf.put("key", "key");
         conf.put("nonKey", 
"long,int,double,float,bool,nulltext,string,stringutf8");
         // change batchSize to 1, to flush on each write.
         conf.put("batchSize", 1);
+        conf.put("insertMode", insertMode.toString());
         try (SqliteJdbcAutoSchemaSink kvSchemaJdbcSink = new 
SqliteJdbcAutoSchemaSink();) {
             kvSchemaJdbcSink.open(conf, null);
             kvSchemaJdbcSink.write(genericObjectRecord);
 
             Awaitility.await().untilAsserted(() -> {
                 final int count = sqliteUtils.select("select 
int,string,stringutf8,bool,double,float," +
-                        "long,nulltext from kvtable where key='mykey'", 
(resultSet) -> {
+                        "long,nulltext from " + tableName + " where 
key='mykey'", (resultSet) -> {
                     int index = 1;
                     Assert.assertEquals(resultSet.getInt(index++), 
Integer.MAX_VALUE);
                     Assert.assertEquals(resultSet.getString(index++), 
"thestring");
@@ -513,9 +562,175 @@ public class SqliteJdbcSinkTest {
                     Assert.assertEquals(resultSet.getLong(index++), 
Long.MIN_VALUE);
                     Assert.assertNull(resultSet.getString(index++));
                 });
-                Assert.assertEquals(count, 1);
+                if (insertMode == JdbcSinkConfig.InsertMode.INSERT
+                        || insertMode == JdbcSinkConfig.InsertMode.UPSERT) {
+                    Assert.assertEquals(count, 1);
+                } else {
+                    Assert.assertEquals(count, 0);
+                }
+            });
+        }
+
+        if (insertMode == JdbcSinkConfig.InsertMode.UPDATE) {
+            conf.put("insertMode", JdbcSinkConfig.InsertMode.INSERT);
+            try (SqliteJdbcAutoSchemaSink kvSchemaJdbcSink = new 
SqliteJdbcAutoSchemaSink();) {
+                kvSchemaJdbcSink.open(conf, null);
+                kvSchemaJdbcSink.write(genericObjectRecord);
+                Awaitility.await().untilAsserted(() -> {
+                    final int count = sqliteUtils.select("select key from " + 
tableName + " where key='mykey'", (resultSet) -> {
+                    });
+                    Assert.assertEquals(count, 1);
+                });
+            }
+        }
+        conf.put("insertMode", insertMode.toString());
+        try (SqliteJdbcAutoSchemaSink kvSchemaJdbcSink = new 
SqliteJdbcAutoSchemaSink();) {
+            kvSchemaJdbcSink.open(conf, null);
+            genericObjectRecord = new 
MockKeyValueGenericRecord(keyValueSchema);
+            genericObjectRecord.setKeyValue(new KeyValue<>(keyGenericRecord, 
valueSchema.newRecordBuilder()
+                    .set("string", "thestring_updated")
+                    .set("stringutf8", schemaType == SchemaType.AVRO ?
+                            new Utf8("thestringutf8_updated"): 
"thestringutf8_updated")
+                    .set("int", Integer.MIN_VALUE)
+                    .set("bool", false)
+                    .set("double", Double.MIN_VALUE)
+                    .set("float", Float.MIN_VALUE)
+                    .set("long", Long.MAX_VALUE)
+                    .set("nulltext", "nomore-null")
+                    .build()));
+            kvSchemaJdbcSink.write(genericObjectRecord);
+
+            Awaitility.await().untilAsserted(() -> {
+                final int count = sqliteUtils.select("select 
int,string,stringutf8,bool,double,float," +
+                        "long,nulltext from " + tableName + "  where 
key='mykey'", (resultSet) -> {
+                    int index = 1;
+                    if (insertMode == JdbcSinkConfig.InsertMode.INSERT) {
+                        Assert.assertEquals(resultSet.getInt(index++), 
Integer.MAX_VALUE);
+                        Assert.assertEquals(resultSet.getString(index++), 
"thestring");
+                        Assert.assertEquals(resultSet.getString(index++), 
"thestringutf8");
+                        Assert.assertEquals(resultSet.getBoolean(index++), 
true);
+                        Assert.assertEquals(resultSet.getDouble(index++), 
Double.MAX_VALUE);
+                        Assert.assertEquals(resultSet.getFloat(index++), 
Float.MAX_VALUE);
+                        Assert.assertEquals(resultSet.getLong(index++), 
Long.MIN_VALUE);
+                        Assert.assertNull(resultSet.getString(index++));
+                    } else {
+                        Assert.assertEquals(resultSet.getInt(index++), 
Integer.MIN_VALUE);
+                        Assert.assertEquals(resultSet.getString(index++), 
"thestring_updated");
+                        Assert.assertEquals(resultSet.getString(index++), 
"thestringutf8_updated");
+                        Assert.assertEquals(resultSet.getBoolean(index++), 
false);
+                        Assert.assertEquals(resultSet.getDouble(index++), 
Double.MIN_VALUE);
+                        Assert.assertEquals(resultSet.getFloat(index++), 
Float.MIN_VALUE);
+                        Assert.assertEquals(resultSet.getLong(index++), 
Long.MAX_VALUE);
+                        Assert.assertEquals(resultSet.getString(index++), 
"nomore-null");
+                    }
+                });
+                if (insertMode == JdbcSinkConfig.InsertMode.INSERT) {
+                    Assert.assertEquals(count, 1);
+                } else if (insertMode == JdbcSinkConfig.InsertMode.UPSERT) {
+                    Assert.assertEquals(count, 1);
+                } else {
+                    Assert.assertEquals(count, 1);
+                }
             });
         }
     }
 
+    @Data
+    @AllArgsConstructor
+    static class NullValueActionTestConfig {
+        SchemaType schemaType;
+        JdbcSinkConfig.NullValueAction nullValueAction;
+    }
+    @DataProvider(name = "nullValueActions")
+    public Object[] nullValueActions() {
+        List<SchemaType> schemas = Arrays.asList(SchemaType.JSON, 
SchemaType.AVRO);
+        JdbcSinkConfig.NullValueAction[] nullValueActions = 
JdbcSinkConfig.NullValueAction.values();
+        Object[] result = new Object[schemas.size() * nullValueActions.length];
+        int i = 0;
+        for (SchemaType schema : schemas) {
+            for (JdbcSinkConfig.NullValueAction nullValueAction : 
nullValueActions) {
+                result[i++] = new NullValueActionTestConfig(schema, 
nullValueAction);
+            }
+        }
+        return result;
+    }
+
+    @Test(dataProvider = "nullValueActions")
+    public void testNullValueAction(NullValueActionTestConfig config) throws 
Exception {
+        jdbcSink.close();
+        final SchemaType schemaType = config.getSchemaType();
+        final JdbcSinkConfig.NullValueAction nullValueAction = 
config.getNullValueAction();
+        final String tableName = "kvtable_nullvalue";
+
+        RecordSchemaBuilder keySchemaBuilder = 
org.apache.pulsar.client.api.schema.SchemaBuilder.record("key");
+        
keySchemaBuilder.field("key").type(SchemaType.STRING).optional().defaultValue(null);
+        GenericSchema<GenericRecord> keySchema = 
Schema.generic(keySchemaBuilder.build(schemaType));
+        GenericRecord keyGenericRecord = keySchema.newRecordBuilder()
+                .set("key", "mykey")
+                .build();
+
+        RecordSchemaBuilder valueSchemaBuilder = 
org.apache.pulsar.client.api.schema.SchemaBuilder.record("value");
+        
valueSchemaBuilder.field("string").type(SchemaType.STRING).optional().defaultValue(null);
+        GenericSchema<GenericRecord> valueSchema = 
Schema.generic(valueSchemaBuilder.build(schemaType));
+
+        Schema<KeyValue<GenericRecord, GenericRecord>> keyValueSchema = 
Schema.KeyValue(keySchema, valueSchema, KeyValueEncodingType.INLINE);
+        MockKeyValueGenericRecord genericObjectRecord = new 
MockKeyValueGenericRecord(keyValueSchema);
+
+        sqliteUtils.createTable("CREATE TABLE " + tableName + " (" +
+                        "    key  TEXT," +
+                        "    string TEXT," +
+                        "PRIMARY KEY (key));"
+        );
+        String jdbcUrl = sqliteUtils.sqliteUri();
+
+        Map<String, Object> conf = Maps.newHashMap();
+        conf.put("jdbcUrl", jdbcUrl);
+        conf.put("tableName", tableName);
+        conf.put("key", "key");
+        conf.put("nonKey", "string");
+        conf.put("batchSize", 3);
+        conf.put("insertMode", JdbcSinkConfig.InsertMode.UPSERT.toString());
+        conf.put("nullValueAction", nullValueAction.toString());
+        try (SqliteJdbcAutoSchemaSink kvSchemaJdbcSink = new 
SqliteJdbcAutoSchemaSink();) {
+            kvSchemaJdbcSink.open(conf, null);
+            genericObjectRecord.setKeyValue(new KeyValue<>(keyGenericRecord, 
valueSchema.newRecordBuilder()
+                    .set("string", "thestring")
+                    .build()));
+            kvSchemaJdbcSink.write(genericObjectRecord);
+
+            genericObjectRecord = new 
MockKeyValueGenericRecord(keyValueSchema);
+            genericObjectRecord.setKeyValue(new 
KeyValue<>(keySchema.newRecordBuilder()
+                    .set("key", "mykey2")
+                    .build(), valueSchema.newRecordBuilder()
+                    .set("string", "thestring")
+                    .build()));
+            kvSchemaJdbcSink.write(genericObjectRecord);
+
+            genericObjectRecord = new 
MockKeyValueGenericRecord(keyValueSchema);
+            genericObjectRecord.setKeyValue(new KeyValue<>(keyGenericRecord, 
null));
+            kvSchemaJdbcSink.write(genericObjectRecord);
+
+            Awaitility.await().untilAsserted(() -> {
+                final int count = sqliteUtils.select("select key,string from " 
+ tableName, (resultSet) -> {
+                    int index = 1;
+                    final String key = resultSet.getString(index++);
+                    final String value = resultSet.getString(index++);
+                    if (key.equals("mykey2")) {
+                        Assert.assertEquals(value, "thestring");
+                    } else {
+                        throw new IllegalStateException();
+                    }
+                });
+                if (nullValueAction == JdbcSinkConfig.NullValueAction.DELETE) {
+                    Assert.assertEquals(count, 1);
+                } else if (nullValueAction == 
JdbcSinkConfig.NullValueAction.FAIL) {
+                    Assert.assertEquals(count, 0);
+                } else {
+                    throw new IllegalStateException();
+                }
+            });
+
+        }
+    }
+
 }
diff --git 
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java
 
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java
index bb855d64bf1..f8cdf76f7d3 100644
--- 
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java
+++ 
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java
@@ -49,9 +49,9 @@ public final class SqliteUtils {
 
     private final Path dbPath;
 
-    private Connection connection;
-
-    public Connection getConnection() {
+    public Connection getConnection(boolean autocommit) throws SQLException {
+        Connection connection = DriverManager.getConnection(sqliteUri());
+        connection.setAutoCommit(autocommit);
         return connection;
     }
 
@@ -65,12 +65,9 @@ public final class SqliteUtils {
 
     public void setUp() throws SQLException, IOException {
         Files.deleteIfExists(dbPath);
-        connection = DriverManager.getConnection(sqliteUri());
-        connection.setAutoCommit(false);
     }
 
     public void tearDown() throws SQLException, IOException {
-        connection.close();
         Files.deleteIfExists(dbPath);
     }
 
@@ -91,7 +88,8 @@ public final class SqliteUtils {
 
     public int select(final String query, final ResultSetReadCallback 
callback) throws SQLException {
         int count = 0;
-        try (Statement stmt = connection.createStatement()) {
+        try (Connection connection = getConnection(true);
+             Statement stmt = connection.createStatement()) {
             try (ResultSet rs = stmt.executeQuery(query)) {
                 while (rs.next()) {
                     callback.read(rs);
@@ -103,9 +101,9 @@ public final class SqliteUtils {
     }
 
     public void execute(String sql) throws SQLException {
-        try (Statement stmt = connection.createStatement()) {
+        try (Connection connection = getConnection(true);
+                Statement stmt = connection.createStatement()) {
             stmt.executeUpdate(sql);
-            connection.commit();
         }
     }
 
diff --git a/site2/docs/io-jdbc-sink.md b/site2/docs/io-jdbc-sink.md
index 87b06c544c4..abc028132c3 100644
--- a/site2/docs/io-jdbc-sink.md
+++ b/site2/docs/io-jdbc-sink.md
@@ -8,6 +8,7 @@ The JDBC sink connectors allow pulling messages from Pulsar 
topics
 and persists the messages to ClickHouse, MariaDB, PostgreSQL, and SQLite.
 
 > Currently, INSERT, DELETE and UPDATE operations are supported.
+> SQLite, MariaDB and PostgreSQL also support UPSERT operations and idempotent 
writes.
 
 ## Configuration 
 
@@ -25,6 +26,8 @@ The configuration of all JDBC sink connectors has the 
following properties.
 | `key`       | String | false    | " " (empty string) | A comma-separated 
list containing the fields used in `where` condition of updating and deleting 
events.                  |
 | `timeoutMs` | int    | false    | 500                | The JDBC operation 
timeout in milliseconds.                                                        
                      |
 | `batchSize` | int    | false    | 200                | The batch size of 
updates made to the database.                                                   
                       |
+| `insertMode` | enum( INSERT,UPSERT,UPDATE) | false    | INSERT | If it is 
configured as UPSERT, the sink uses upsert semantics rather than plain 
INSERT/UPDATE statements. Upsert semantics refer to atomically adding a new row 
or updating the existing row if there is a primary key constraint violation, 
which provides idempotence. |
+| `nullValueAction` | enum(FAIL, DELETE) | false    | FAIL | How to handle 
records with NULL values. Possible options are `DELETE` or `FAIL`. |
 
 ### Example for ClickHouse
 

Reply via email to