This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b52a92bbb4 [improve][io] JDBC sinks: implement JDBC Batch API (#18017)
7b52a92bbb4 is described below

commit 7b52a92bbb4dca4976233f2e99da289c38856a1a
Author: Nicolò Boschi <[email protected]>
AuthorDate: Thu Oct 20 09:48:21 2022 +0200

    [improve][io] JDBC sinks: implement JDBC Batch API (#18017)
    
    * [improve][io] JDBC sinks: implement JDBC Batch API
    
    * more tests and transactions support
    
    * remove .db files
    
    * doc
    
    * fix batch results and thread safey
    
    * add next flush test - fix doc - improve code readability
---
 .../apache/pulsar/io/jdbc/JdbcAbstractSink.java    | 174 ++++++++----
 .../org/apache/pulsar/io/jdbc/JdbcSinkConfig.java  |  20 +-
 .../pulsar/io/jdbc/SqliteJdbcSinkBatchTest.java    |  35 +++
 .../apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java  | 307 +++++++++++++++------
 .../org/apache/pulsar/io/jdbc/SqliteUtils.java     |  17 ++
 site2/docs/io-jdbc-sink.md                         |  27 +-
 6 files changed, 433 insertions(+), 147 deletions(-)

diff --git 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index 74a19b7b187..06beaaacf9e 100644
--- 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++ 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -23,7 +23,13 @@ import com.google.common.collect.Lists;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -32,6 +38,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.Getter;
@@ -64,8 +71,7 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
     protected JdbcUtils.TableDefinition tableDefinition;
 
     // for flush
-    private List<Record<T>> incomingList;
-    private List<Record<T>> swapList;
+    private Deque<Record<T>> incomingList;
     private AtomicBoolean isFlushing;
     private int batchSize;
     private ScheduledExecutorService flushExecutor;
@@ -73,6 +79,7 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
         jdbcSinkConfig = JdbcSinkConfig.load(config);
+        jdbcSinkConfig.validate();
 
         jdbcUrl = jdbcSinkConfig.getJdbcUrl();
         if (jdbcSinkConfig.getJdbcUrl() == null) {
@@ -100,12 +107,13 @@ public abstract class JdbcAbstractSink<T> implements 
Sink<T> {
 
         int timeoutMs = jdbcSinkConfig.getTimeoutMs();
         batchSize = jdbcSinkConfig.getBatchSize();
-        incomingList = Lists.newArrayList();
-        swapList = Lists.newArrayList();
+        incomingList = new LinkedList<>();
         isFlushing = new AtomicBoolean(false);
 
         flushExecutor = Executors.newScheduledThreadPool(1);
-        flushExecutor.scheduleAtFixedRate(this::flush, timeoutMs, timeoutMs, 
TimeUnit.MILLISECONDS);
+        if (timeoutMs > 0) {
+            flushExecutor.scheduleAtFixedRate(this::flush, timeoutMs, 
timeoutMs, TimeUnit.MILLISECONDS);
+        }
     }
 
     private void initStatement()  throws Exception {
@@ -173,11 +181,14 @@ public abstract class JdbcAbstractSink<T> implements 
Sink<T> {
     @Override
     public void write(Record<T> record) throws Exception {
         int number;
-        synchronized (this) {
+        synchronized (incomingList) {
             incomingList.add(record);
             number = incomingList.size();
         }
-        if (number == batchSize) {
+        if (batchSize > 0 && number >= batchSize) {
+            if (log.isDebugEnabled()) {
+                log.debug("flushing by batches, hit batch size {}", batchSize);
+            }
             flushExecutor.schedule(this::flush, 0, TimeUnit.MILLISECONDS);
         }
     }
@@ -220,49 +231,46 @@ public abstract class JdbcAbstractSink<T> implements 
Sink<T> {
 
 
     private void flush() {
-        // if not in flushing state, do flush, else return;
         if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
-            if (log.isDebugEnabled()) {
-                log.debug("Starting flush, queue size: {}", 
incomingList.size());
-            }
-            if (!swapList.isEmpty()) {
-                throw new IllegalStateException("swapList should be empty 
since last flush. swapList.size: "
-                        + swapList.size());
-            }
-            synchronized (this) {
-                List<Record<T>> tmpList;
-                swapList.clear();
+            boolean needAnotherRound;
+            final Deque<Record<T>> swapList = new LinkedList<>();
+
+            synchronized (incomingList) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Starting flush, queue size: {}", 
incomingList.size());
+                }
+                final int actualBatchSize = batchSize > 0 ? 
Math.min(incomingList.size(), batchSize) :
+                        incomingList.size();
 
-                tmpList = swapList;
-                swapList = incomingList;
-                incomingList = tmpList;
+                for (int i = 0; i < actualBatchSize; i++) {
+                    swapList.add(incomingList.removeFirst());
+                }
+                needAnotherRound = batchSize > 0 && !incomingList.isEmpty() && 
incomingList.size() >= batchSize;
             }
+            long start = System.nanoTime();
 
             int count = 0;
             try {
+                PreparedStatement currentBatch = null;
+                final List<Mutation> mutations = swapList
+                        .stream()
+                        .map(this::createMutation)
+                        .collect(Collectors.toList());
                 // bind each record value
-                for (Record<T> record : swapList) {
-                    final Mutation mutation = createMutation(record);
+                PreparedStatement statement;
+                for (Mutation mutation : mutations) {
                     switch (mutation.getType()) {
                         case DELETE:
-                            bindValue(deleteStatement, mutation);
-                            count += 1;
-                            deleteStatement.execute();
+                            statement = deleteStatement;
                             break;
                         case UPDATE:
-                            bindValue(updateStatement, mutation);
-                            count += 1;
-                            updateStatement.execute();
+                            statement = updateStatement;
                             break;
                         case INSERT:
-                            bindValue(insertStatement, mutation);
-                            count += 1;
-                            insertStatement.execute();
+                            statement = insertStatement;
                             break;
                         case UPSERT:
-                            bindValue(upsertStatement, mutation);
-                            count += 1;
-                            upsertStatement.execute();
+                            statement = upsertStatement;
                             break;
                         default:
                             String msg = String.format(
@@ -270,13 +278,34 @@ public abstract class JdbcAbstractSink<T> implements 
Sink<T> {
                                     mutation.getType(), 
Arrays.toString(MutationType.values()), MutationType.INSERT);
                             throw new IllegalArgumentException(msg);
                     }
+                    bindValue(statement, mutation);
+                    count += 1;
+                    if (jdbcSinkConfig.isUseJdbcBatch()) {
+                        if (currentBatch != null && statement != currentBatch) 
{
+                            internalFlushBatch(swapList, currentBatch, count, 
start);
+                            start = System.nanoTime();
+                        }
+                        statement.addBatch();
+                        currentBatch = statement;
+                    } else {
+                        statement.execute();
+                        if (!jdbcSinkConfig.isUseTransactions()) {
+                            swapList.removeFirst().ack();
+                        }
+                    }
                 }
-                if (jdbcSinkConfig.isUseTransactions()) {
-                    connection.commit();
+
+                if (jdbcSinkConfig.isUseJdbcBatch()) {
+                    internalFlushBatch(swapList, currentBatch, count, start);
+                } else {
+                    internalFlush(swapList);
                 }
-                swapList.forEach(Record::ack);
             } catch (Exception e) {
-                log.error("Got exception {}", e.getMessage(), e);
+                log.error("Got exception {} after {} ms, failing {} messages",
+                        e.getMessage(),
+                        (System.nanoTime() - start) / 1000 / 1000,
+                        swapList.size(),
+                        e);
                 swapList.forEach(Record::fail);
                 try {
                     if (jdbcSinkConfig.isUseTransactions()) {
@@ -287,16 +316,10 @@ public abstract class JdbcAbstractSink<T> implements 
Sink<T> {
                 }
             }
 
-            if (swapList.size() != count) {
-                log.error("Update count {} not match total number of records 
{}", count, swapList.size());
-            }
-
-            // finish flush
-            if (log.isDebugEnabled()) {
-                log.debug("Finish flush, queue size: {}", swapList.size());
-            }
-            swapList.clear();
             isFlushing.set(false);
+            if (needAnotherRound) {
+                flush();
+            }
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("Already in flushing state, will not flush, queue 
size: {}", incomingList.size());
@@ -304,4 +327,59 @@ public abstract class JdbcAbstractSink<T> implements 
Sink<T> {
         }
     }
 
+    private void internalFlush(Deque<Record<T>> swapList) throws SQLException {
+        if (jdbcSinkConfig.isUseTransactions()) {
+            connection.commit();
+            swapList.forEach(Record::ack);
+        }
+    }
+
+    private void internalFlushBatch(Deque<Record<T>> swapList, 
PreparedStatement currentBatch, int count, long start) throws SQLException {
+        executeBatch(swapList, currentBatch);
+        if (log.isDebugEnabled()) {
+            log.debug("Flushed {} messages in {} ms", count, 
(System.nanoTime() - start) / 1000 / 1000);
+        }
+    }
+
+    private void executeBatch(Deque<Record<T>> swapList, PreparedStatement 
statement) throws SQLException {
+        final int[] results = statement.executeBatch();
+        Map<Integer, Integer> failuresMapping = null;
+        final boolean useTransactions = jdbcSinkConfig.isUseTransactions();
+
+        for (int r: results) {
+            if (isBatchItemFailed(r)) {
+                if (failuresMapping == null) {
+                    failuresMapping = new HashMap<>();
+                }
+                final Integer current = failuresMapping.computeIfAbsent(r, 
code -> 1);
+                failuresMapping.put(r, current + 1);
+            }
+        }
+        if (failuresMapping == null || failuresMapping.isEmpty()) {
+            if (useTransactions) {
+                connection.commit();
+            }
+            for (int r: results) {
+                swapList.removeFirst().ack();
+            }
+        } else {
+            if (useTransactions) {
+                connection.rollback();
+            }
+            for (int r: results) {
+                swapList.removeFirst().fail();
+            }
+            String msg = "Batch failed, got error results (error_code->count): 
" + failuresMapping;
+            // throwing an exception here means the main loop cycle will nack 
the messages in the next batch
+            throw new SQLException(msg);
+        }
+    }
+
+    private static boolean isBatchItemFailed(int returnCode) {
+        if (returnCode == Statement.SUCCESS_NO_INFO || returnCode >= 0) {
+            return false;
+        }
+        return true;
+    }
+
 }
diff --git 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
index 74f339bd443..ac9a36be796 100644
--- 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
+++ 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
@@ -87,15 +87,24 @@ public class JdbcSinkConfig implements Serializable {
     @FieldDoc(
         required = false,
         defaultValue = "500",
-        help = "The jdbc operation timeout in milliseconds"
+        help = "Enable batch mode by time. After timeoutMs milliseconds the 
operations queue will be flushed."
     )
     private int timeoutMs = 500;
     @FieldDoc(
         required = false,
         defaultValue = "200",
-        help = "The batch size of updates made to the database"
+        help = "Enable batch mode by number of operations. This value is the 
max number of operations "
+                + "batched in the same transaction/batch."
     )
     private int batchSize = 200;
+
+    @FieldDoc(
+            required = false,
+            defaultValue = "false",
+            help = "Use the JDBC batch API. This option is suggested to 
improve write performance."
+    )
+    private boolean useJdbcBatch = false;
+
     @FieldDoc(
             required = false,
             defaultValue = "true",
@@ -141,4 +150,11 @@ public class JdbcSinkConfig implements Serializable {
         ObjectMapper mapper = new ObjectMapper();
         return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
JdbcSinkConfig.class);
     }
+
+    public void validate() {
+        if (timeoutMs <= 0 && batchSize <= 0) {
+            throw new IllegalArgumentException("timeoutMs or batchSize must be 
set to a positive value.");
+        }
+    }
+
 }
diff --git 
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkBatchTest.java
 
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkBatchTest.java
new file mode 100644
index 00000000000..012b37bacec
--- /dev/null
+++ 
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkBatchTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import lombok.extern.slf4j.Slf4j;
+import java.util.Map;
+
+/**
+ * Jdbc Sink test with JDBC Batches API enabled
+ */
+@Slf4j
+public class SqliteJdbcSinkBatchTest extends SqliteJdbcSinkTest {
+
+    @Override
+    protected void configure(Map<String, Object> configuration) {
+        configuration.put("useJdbcBatch", "true");
+    }
+}
diff --git 
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
 
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
index 030a9b4187b..38cdc29a1c9 100644
--- 
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
+++ 
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
@@ -25,17 +25,22 @@ import static org.mockito.Mockito.when;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.util.Utf8;
+import org.apache.commons.lang.reflect.FieldUtils;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.GenericObject;
@@ -45,9 +50,7 @@ import 
org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
-import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
-import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -72,6 +75,8 @@ public class SqliteJdbcSinkTest {
      * A Simple class to test jdbc class
      */
     @Data
+    @NoArgsConstructor
+    @AllArgsConstructor
     public static class Foo {
         private String field1;
         private String field2;
@@ -96,17 +101,26 @@ public class SqliteJdbcSinkTest {
         // prepare data for delete sql
         String deleteSql = "insert into " + tableName + " 
values('ValueOfField5', 'ValueOfField5', 5)";
         sqliteUtils.execute(deleteSql);
-        Map<String, Object> conf;
+        restartSinkWithConfig(null);
+    }
 
+    private void restartSinkWithConfig(Map<String, Object> additional) throws 
Exception {
+        if (jdbcSink != null) {
+            jdbcSink.close();
+        }
         String jdbcUrl = sqliteUtils.sqliteUri();
 
-        conf = Maps.newHashMap();
+        Map<String, Object> conf = Maps.newHashMap();
         conf.put("jdbcUrl", jdbcUrl);
         conf.put("tableName", tableName);
         conf.put("key", "field3");
         conf.put("nonKey", "field1,field2");
         // change batchSize to 1, to flush on each write.
         conf.put("batchSize", 1);
+        if (additional != null) {
+            conf.putAll(additional);
+        }
+        configure(conf);
 
         jdbcSink = new SqliteJdbcAutoSchemaSink();
 
@@ -114,6 +128,9 @@ public class SqliteJdbcSinkTest {
         jdbcSink.open(conf, null);
     }
 
+    protected void configure(Map<String, Object> configuration) {
+    }
+
     @AfterMethod(alwaysRun = true)
     public void tearDown() throws Exception {
         jdbcSink.close();
@@ -121,7 +138,7 @@ public class SqliteJdbcSinkTest {
     }
 
     private void testOpenAndWriteSinkNullValue(Map<String, String> 
actionProperties) throws Exception {
-        Message<GenericObject> insertMessage = mock(MessageImpl.class);
+        CompletableFuture<Boolean> future = new CompletableFuture<>();
         GenericSchema<GenericRecord> genericAvroSchema;
         // prepare a foo Record
         Foo insertObj = new Foo();
@@ -129,26 +146,8 @@ public class SqliteJdbcSinkTest {
         // Not setting field2
         // Field1 is the key and field3 is used for selecting records 
         insertObj.setField3(3);
-        AvroSchema<Foo> schema = 
AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(true).build());
-
-        byte[] insertBytes = schema.encode(insertObj);
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        Record<GenericObject> insertRecord = 
PulsarRecord.<GenericObject>builder()
-            .message(insertMessage)
-            .topicName("fake_topic_name")
-            .ackFunction(() -> future.complete(null))
-            .build();
-
-        genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
-        
when(insertMessage.getValue()).thenReturn(genericAvroSchema.decode(insertBytes));
-        when(insertMessage.getProperties()).thenReturn(actionProperties);
-        log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
-                insertObj.toString(),
-                insertMessage.getValue().toString(),
-                insertRecord.getValue().toString());
-
-        // write should success.
-        jdbcSink.write(insertRecord);
+        final Record<GenericObject> record = createMockFooRecord(insertObj, 
actionProperties, future);
+        jdbcSink.write(record);
         log.info("executed write");
         // sleep to wait backend flush complete
         future.get(1, TimeUnit.SECONDS);
@@ -165,33 +164,16 @@ public class SqliteJdbcSinkTest {
     }
 
     private void testOpenAndWriteSinkJson(Map<String, String> 
actionProperties) throws Exception {
-        Message<GenericObject> insertMessage = mock(MessageImpl.class);
-        GenericSchema<GenericRecord> genericAvroSchema;
         // prepare a foo Record
         Foo insertObj = new Foo();
         insertObj.setField1("ValueOfField1");
         insertObj.setField2("ValueOfField2");
         insertObj.setField3(3);
-        JSONSchema<Foo> schema = 
JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(true).build());
-
-        byte[] insertBytes = schema.encode(insertObj);
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        Record<GenericObject> insertRecord = 
PulsarRecord.<GenericObject>builder()
-            .message(insertMessage)
-            .topicName("fake_topic_name")
-            .ackFunction(() -> future.complete(null))
-            .build();
-
-        GenericSchema<GenericRecord> decodeSchema = 
GenericSchemaImpl.of(schema.getSchemaInfo());
-        
when(insertMessage.getValue()).thenReturn(decodeSchema.decode(insertBytes));
-        when(insertMessage.getProperties()).thenReturn(actionProperties);
-        log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
-                insertObj.toString(),
-                insertMessage.getValue().toString(),
-                insertRecord.getValue().toString());
+        CompletableFuture<Boolean> future = new CompletableFuture<>();
+        final Record<GenericObject> record = createMockFooRecord(insertObj, 
actionProperties, future);
 
         // write should success.
-        jdbcSink.write(insertRecord);
+        jdbcSink.write(record);
         log.info("executed write");
         // sleep to wait backend flush complete
         future.get(1, TimeUnit.SECONDS);
@@ -216,26 +198,11 @@ public class SqliteJdbcSinkTest {
         // Not setting field2
         // Field1 is the key and field3 is used for selecting records 
         insertObj.setField3(3);
-        JSONSchema<Foo> schema = 
JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(true).build());
-
-        byte[] insertBytes = schema.encode(insertObj);
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        Record<GenericObject> insertRecord = 
PulsarRecord.<GenericObject>builder()
-            .message(insertMessage)
-            .topicName("fake_topic_name")
-            .ackFunction(() -> future.complete(null))
-            .build();
-
-        GenericSchema<GenericRecord> decodeSchema = 
GenericSchemaImpl.of(schema.getSchemaInfo());
-        
when(insertMessage.getValue()).thenReturn(decodeSchema.decode(insertBytes));
-        when(insertMessage.getProperties()).thenReturn(actionProperties);
-        log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
-                insertObj.toString(),
-                insertMessage.getValue().toString(),
-                insertRecord.getValue().toString());
+        CompletableFuture<Boolean> future = new CompletableFuture<>();
+        final Record<GenericObject> record = createMockFooRecord(insertObj, 
actionProperties, future);
 
         // write should success.
-        jdbcSink.write(insertRecord);
+        jdbcSink.write(record);
         log.info("executed write");
         // sleep to wait backend flush complete
         // sleep to wait backend flush complete
@@ -260,26 +227,11 @@ public class SqliteJdbcSinkTest {
         insertObj.setField1("ValueOfField1");
         insertObj.setField2("ValueOfField2");
         insertObj.setField3(3);
-        AvroSchema<Foo> schema = 
AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
-
-        byte[] insertBytes = schema.encode(insertObj);
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        Record<GenericObject> insertRecord = 
PulsarRecord.<GenericObject>builder()
-            .message(insertMessage)
-            .topicName("fake_topic_name")
-            .ackFunction(() -> future.complete(null))
-            .build();
-
-        genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
-        
when(insertMessage.getValue()).thenReturn(genericAvroSchema.decode(insertBytes));
-        when(insertMessage.getProperties()).thenReturn(actionProperties);
-        log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
-                insertObj.toString(),
-                insertMessage.getValue().toString(),
-                insertRecord.getValue().toString());
+        CompletableFuture<Boolean> future = new CompletableFuture<>();
+        final Record<GenericObject> record = createMockFooRecord(insertObj, 
actionProperties, future);
 
         // write should success.
-        jdbcSink.write(insertRecord);
+        jdbcSink.write(record);
         log.info("executed write");
         // sleep to wait backend flush complete
         future.get(1, TimeUnit.SECONDS);
@@ -342,7 +294,7 @@ public class SqliteJdbcSinkTest {
 
         byte[] updateBytes = schema.encode(updateObj);
         Message<GenericObject> updateMessage = mock(MessageImpl.class);
-        CompletableFuture<Void> future = new CompletableFuture<>();
+        CompletableFuture<Boolean> future = new CompletableFuture<>();
         Record<GenericObject> updateRecord = 
PulsarRecord.<GenericObject>builder()
                 .message(updateMessage)
                 .topicName("fake_topic_name")
@@ -383,7 +335,7 @@ public class SqliteJdbcSinkTest {
 
         byte[] deleteBytes = schema.encode(deleteObj);
         Message<GenericObject> deleteMessage = mock(MessageImpl.class);
-        CompletableFuture<Void> future = new CompletableFuture<>();
+        CompletableFuture<Boolean> future = new CompletableFuture<>();
         Record<GenericObject> deleteRecord = 
PulsarRecord.<GenericObject>builder()
                 .message(deleteMessage)
                 .topicName("fake_topic_name")
@@ -409,6 +361,172 @@ public class SqliteJdbcSinkTest {
         Assert.assertEquals(sqliteUtils.select(deleteQuerySql, (resultSet) -> 
{}), 0);
     }
 
+    @Test
+    public void testBatchMode() throws Exception {
+        Map<String, Object> config = new HashMap<>();
+        config.put("batchSize", 3);
+        config.put("timeoutMs", 0);
+        restartSinkWithConfig(config);
+        Foo updateObj = new Foo();
+        updateObj.setField1("f1");
+        updateObj.setField2("f12");
+        updateObj.setField3(1);
+        Map<String, String> updateProperties = Maps.newHashMap();
+        updateProperties.put("ACTION", "INSERT");
+        final CompletableFuture<Boolean> futureByEntries1 = new 
CompletableFuture<>();
+        jdbcSink.write(createMockFooRecord(updateObj, updateProperties, 
futureByEntries1));
+        Assert.assertThrows(TimeoutException.class, () -> 
futureByEntries1.get(1, TimeUnit.SECONDS));
+        final CompletableFuture<Boolean> futureByEntries2 = new 
CompletableFuture<>();
+        updateProperties.put("ACTION", "UPDATE");
+        updateObj.setField2("f13");
+        jdbcSink.write(createMockFooRecord(updateObj, updateProperties, 
futureByEntries2));
+        Assert.assertThrows(TimeoutException.class, () -> 
futureByEntries1.get(1, TimeUnit.SECONDS));
+        Assert.assertThrows(TimeoutException.class, () -> 
futureByEntries2.get(1, TimeUnit.SECONDS));
+        updateObj.setField2("f14");
+        final CompletableFuture<Boolean> futureByEntries3 = new 
CompletableFuture<>();
+        jdbcSink.write(createMockFooRecord(updateObj, updateProperties, 
futureByEntries3));
+        futureByEntries1.get(1, TimeUnit.SECONDS);
+        futureByEntries2.get(1, TimeUnit.SECONDS);
+        futureByEntries3.get(1, TimeUnit.SECONDS);
+
+        config.put("batchSize", 0);
+        config.put("timeoutMs", TimeUnit.SECONDS.toMillis(3));
+        restartSinkWithConfig(config);
+        final CompletableFuture<Boolean> futureByTime = new 
CompletableFuture<>();
+        jdbcSink.write(createMockFooRecord(updateObj, updateProperties, 
futureByTime));
+        Assert.assertThrows(TimeoutException.class, () -> futureByTime.get(1, 
TimeUnit.SECONDS));
+        futureByTime.get(3, TimeUnit.SECONDS);
+
+    }
+
+    /**
+     * Verify that if the flush is finished but the incoming records list size 
is equals
+     * or greater than the batch size,
+     * the next flush is immediately triggered (without any other writes).
+     * @throws Exception
+     */
+    @Test
+    public void testBatchModeContinueFlushing() throws Exception {
+        Map<String, Object> config = new HashMap<>();
+        config.put("batchSize", 1);
+        config.put("timeoutMs", 0);
+        restartSinkWithConfig(config);
+        // block the auto flushing mechanism
+        FieldUtils.writeField(jdbcSink, "isFlushing", new AtomicBoolean(true), 
true);
+        Foo updateObj = new Foo("f1", "f12", 1);
+        Map<String, String> updateProperties = Maps.newHashMap();
+        updateProperties.put("ACTION", "INSERT");
+        final CompletableFuture<Boolean> futureByEntries1 = new 
CompletableFuture<>();
+        jdbcSink.write(createMockFooRecord(updateObj, updateProperties, 
futureByEntries1));
+        Assert.assertThrows(TimeoutException.class, () -> 
futureByEntries1.get(1, TimeUnit.SECONDS));
+
+        FieldUtils.writeField(jdbcSink, "isFlushing", new 
AtomicBoolean(false), true);
+
+        updateObj = new Foo("f2", "f12", 1);
+        updateProperties = Maps.newHashMap();
+        updateProperties.put("ACTION", "INSERT");
+        final CompletableFuture<Boolean> futureByEntries2 = new 
CompletableFuture<>();
+        jdbcSink.write(createMockFooRecord(updateObj, updateProperties, 
futureByEntries2));
+
+        futureByEntries1.get(1, TimeUnit.SECONDS);
+        futureByEntries2.get(1, TimeUnit.SECONDS);
+    }
+
+    @DataProvider(name = "useTransactions")
+    public Object[] useTransactions() {
+        return Arrays.asList(true, false).toArray();
+    }
+
+    @Test(dataProvider = "useTransactions")
+    public void testBatchModeFailures(boolean useTransactions) throws 
Exception {
+        jdbcSink.close();
+        jdbcSink = null;
+        sqliteUtils.execute("delete from " + tableName);
+        restartSinkWithConfig(null);
+        Foo foo = new Foo("f1", "f2", 1);
+        Map<String, String> updateProperties = Maps.newHashMap();
+        updateProperties.put("ACTION", "INSERT");
+        final CompletableFuture<Boolean> future0 = new CompletableFuture<>();
+        jdbcSink.write(createMockFooRecord(foo, updateProperties, future0));
+        Assert.assertTrue(future0.get());
+
+        Map<String, Object> config = new HashMap<>();
+        config.put("batchSize", 5);
+        config.put("useTransactions", useTransactions);
+        restartSinkWithConfig(config);
+
+        foo = new Foo("f2", "f2", 2);
+        updateProperties = Maps.newHashMap();
+        updateProperties.put("ACTION", "INSERT");
+        final CompletableFuture<Boolean> future2 = new CompletableFuture<>();
+        jdbcSink.write(createMockFooRecord(foo, updateProperties, future2));
+
+        foo = new Foo("f3", "f2", 3);
+        updateProperties = Maps.newHashMap();
+        updateProperties.put("ACTION", "INSERT");
+        final CompletableFuture<Boolean> future3 = new CompletableFuture<>();
+        jdbcSink.write(createMockFooRecord(foo, updateProperties, future3));
+
+        foo = new Foo("f1", "f21", 11);
+        updateProperties = Maps.newHashMap();
+        updateProperties.put("ACTION", "UPDATE");
+        final CompletableFuture<Boolean> future4 = new CompletableFuture<>();
+        jdbcSink.write(createMockFooRecord(foo, updateProperties, future4));
+
+        foo = new Foo("f1", "f2no", 9);
+        updateProperties = Maps.newHashMap();
+        updateProperties.put("ACTION", "INSERT");
+        final CompletableFuture<Boolean> future5 = new CompletableFuture<>();
+        jdbcSink.write(createMockFooRecord(foo, updateProperties, future5));
+
+        foo = new Foo("f1", "f3", 5);
+        updateProperties = Maps.newHashMap();
+        updateProperties.put("ACTION", "UPDATE");
+        final CompletableFuture<Boolean> future6 = new CompletableFuture<>();
+        jdbcSink.write(createMockFooRecord(foo, updateProperties, future6));
+
+
+        if (jdbcSink.jdbcSinkConfig.isUseTransactions()) {
+            if (jdbcSink.jdbcSinkConfig.isUseJdbcBatch()) {
+                Assert.assertTrue(future2.get(1, TimeUnit.SECONDS));
+                Assert.assertTrue(future3.get(1, TimeUnit.SECONDS));
+                Assert.assertTrue(future4.get(1, TimeUnit.SECONDS));
+                Assert.assertFalse(future5.get(1, TimeUnit.SECONDS));
+                Assert.assertFalse(future6.get(1, TimeUnit.SECONDS));
+                final int count = sqliteUtils.select("select 
field1,field2,field3 from "
+                        + tableName
+                        + " where (field1='f1' and field2='f21') or 
field1='f2' or field1='f3'", (r) -> {});
+                Assert.assertEquals(count, 2);
+
+            } else {
+                Assert.assertFalse(future2.get(1, TimeUnit.SECONDS));
+                Assert.assertFalse(future3.get(1, TimeUnit.SECONDS));
+                Assert.assertFalse(future4.get(1, TimeUnit.SECONDS));
+                Assert.assertFalse(future5.get(1, TimeUnit.SECONDS));
+                Assert.assertFalse(future6.get(1, TimeUnit.SECONDS));
+                final int count = sqliteUtils.select("select 
field1,field2,field3 from "
+                        + tableName
+                        + " where (field1='f1' and field2='f2') or field1='f2' 
or field1='f3'", (r) -> {});
+                Assert.assertEquals(count, 1);
+            }
+        } else {
+            Assert.assertTrue(future2.get(1, TimeUnit.SECONDS));
+            Assert.assertTrue(future3.get(1, TimeUnit.SECONDS));
+            Assert.assertTrue(future4.get(1, TimeUnit.SECONDS));
+            Assert.assertFalse(future5.get(1, TimeUnit.SECONDS));
+            Assert.assertFalse(future6.get(1, TimeUnit.SECONDS));
+            System.out.println("dump:\n" + sqliteUtils.dump("select 
field1,field2,field3 from " + tableName));
+
+            final int count = sqliteUtils.select("select field1,field2,field3 
from "
+                    + tableName
+                    + " where (field1='f1' and field2='f21') or field1='f2' or 
field1='f3'", (r) -> {
+                log.info("got {};{};{}", r.getString(1), r.getString(2), 
r.getInt(3));
+            });
+            Assert.assertEquals(count, 2);
+        }
+    }
+
+
     private static class MockKeyValueGenericRecord implements 
Record<GenericObject> {
 
         public MockKeyValueGenericRecord(Schema<KeyValue<GenericRecord, 
GenericRecord>> keyValueSchema) {
@@ -716,7 +834,7 @@ public class SqliteJdbcSinkTest {
                     if (key.equals("mykey2")) {
                         Assert.assertEquals(value, "thestring");
                     } else {
-                        throw new IllegalStateException();
+                        throw new IllegalStateException("got unexpected key " 
+ key);
                     }
                 });
                 if (nullValueAction == JdbcSinkConfig.NullValueAction.DELETE) {
@@ -731,4 +849,25 @@ public class SqliteJdbcSinkTest {
         }
     }
 
+    private Record<GenericObject> createMockFooRecord(Foo record, Map<String, 
String> actionProperties,
+                                                        
CompletableFuture<Boolean> future) {
+        Message<GenericObject> insertMessage = mock(MessageImpl.class);
+        GenericSchema<GenericRecord> genericAvroSchema;
+        AvroSchema<Foo> schema = 
AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(true).build());
+
+        byte[] insertBytes = schema.encode(record);
+
+        Record<GenericObject> insertRecord = 
PulsarRecord.<GenericObject>builder()
+                .message(insertMessage)
+                .topicName("fake_topic_name")
+                .ackFunction(() -> future.complete(true))
+                .failFunction(() -> future.complete(false))
+                .build();
+
+        genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
+        
when(insertMessage.getValue()).thenReturn(genericAvroSchema.decode(insertBytes));
+        when(insertMessage.getProperties()).thenReturn(actionProperties);
+        return insertRecord;
+    }
+
 }
diff --git 
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java
 
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java
index f8cdf76f7d3..9e3a3fe255d 100644
--- 
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java
+++ 
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java
@@ -100,6 +100,23 @@ public final class SqliteUtils {
         return count;
     }
 
+    public String dump(final String query) throws SQLException {
+        StringBuilder builder = new StringBuilder();
+        try (Connection connection = getConnection(true);
+             Statement stmt = connection.createStatement()) {
+            try (ResultSet rs = stmt.executeQuery(query)) {
+                while (rs.next()) {
+                    for (int i = 0; i < rs.getMetaData().getColumnCount(); 
i++) {
+                        builder.append(rs.getObject(i + 1));
+                        builder.append(";");
+                    }
+                    builder.append(System.lineSeparator());
+                }
+            }
+        }
+        return builder.toString();
+    }
+
     public void execute(String sql) throws SQLException {
         try (Connection connection = getConnection(true);
                 Statement stmt = connection.createStatement()) {
diff --git a/site2/docs/io-jdbc-sink.md b/site2/docs/io-jdbc-sink.md
index 4c9a473e027..63150517e28 100644
--- a/site2/docs/io-jdbc-sink.md
+++ b/site2/docs/io-jdbc-sink.md
@@ -15,20 +15,21 @@ The configuration of all JDBC sink connectors has the 
following properties.
 
 ### Property
 
-| Name        | Type   | Required | Default            | Description           
                                                                                
                                                                                
                                                                                
                                                                    |
-|-------------|--------|----------|--------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| `userName`  | String | false    | " " (empty string) | The username used to 
connect to the database specified by `jdbcUrl`.<br /><br />**Note: `userName` 
is case-sensitive.**                                                            
                                                                                
                                                                       |
-| `password`  | String | false    | " " (empty string) | The password used to 
connect to the database specified by `jdbcUrl`. <br /><br />**Note: `password` 
is case-sensitive.**                                                            
                                                                                
                                                                      |
-| `jdbcUrl`   | String | true     | " " (empty string) | The JDBC URL of the 
database that the connector connects to.                                        
                                                                                
                                                                                
                                                                      |
-| `tableName` | String | true     | " " (empty string) | The name of the table 
that the connector writes to.                                                   
                                                                                
                                                                                
                                                                    |
-| `nonKey`    | String | false    | " " (empty string) | A comma-separated 
list containing the fields used in updating events.                             
                                                                                
                                                                                
                                                                        |
-| `key`       | String | false    | " " (empty string) | A comma-separated 
list containing the fields used in `where` condition of updating and deleting 
events.                                                                         
                                                                                
                                                                          |
-| `timeoutMs` | int    | false    | 500                | The JDBC operation 
timeout in milliseconds.                                                        
                                                                                
                                                                                
                                                                       |
-| `batchSize` | int    | false    | 200                | The batch size of 
updates made to the database.                                                   
                                                                                
                                                                                
                                                                        |
-| `insertMode` | enum( INSERT,UPSERT,UPDATE) | false    | INSERT             | 
If it is configured as UPSERT, the sink uses upsert semantics rather than plain 
INSERT/UPDATE statements. Upsert semantics refer to atomically adding a new row 
or updating the existing row if there is a primary key constraint violation, 
which provides idempotence.                                                     
             |
-| `nullValueAction` | enum(FAIL, DELETE) | false    | FAIL               | How 
to handle records with NULL values. Possible options are `DELETE` or `FAIL`.    
                                                                                
                                                                                
                                                                                
      |
-| `useTransactions` | boolean | false    | true               | Enable 
transactions of the database.                                                   
                                                                                
                                                                                
                                                                                
   
+| Name        | Type   | Required | Default            | Description           
                                                                                
                                                                                
                                                                                
                                                                  |
+|-------------|--------|----------|--------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `userName`  | String | false    | " " (empty string) | The username used to 
connect to the database specified by `jdbcUrl`.<br /><br />**Note: `userName` 
is case-sensitive.**                                                            
                                                                                
                                                                     |
+| `password`  | String | false    | " " (empty string) | The password used to 
connect to the database specified by `jdbcUrl`. <br /><br />**Note: `password` 
is case-sensitive.**                                                            
                                                                                
                                                                    |
+| `jdbcUrl`   | String | true     | " " (empty string) | The JDBC URL of the 
database that the connector connects to.                                        
                                                                                
                                                                                
                                                                    |
+| `tableName` | String | true     | " " (empty string) | The name of the table 
that the connector writes to.                                                   
                                                                                
                                                                                
                                                                  |
+| `nonKey`    | String | false    | " " (empty string) | A comma-separated 
list containing the fields used in updating events.                             
                                                                                
                                                                                
                                                                      |
+| `key`       | String | false    | " " (empty string) | A comma-separated 
list containing the fields used in `where` condition of updating and deleting 
events.                                                                         
                                                                                
                                                                        |
+| `timeoutMs` | int    | false    | 500                | The JDBC operation 
timeout in milliseconds.                                                        
                                                                                
                                                                                
                                                                     |
+| `batchSize` | int    | false    | 200                | The batch size of 
updates made to the database.                                                   
                                                                                
                                                                                
                                                                      |
+| `insertMode` | enum( INSERT,UPSERT,UPDATE) | false    | INSERT             | 
If it is configured as UPSERT, the sink uses upsert semantics rather than plain 
INSERT/UPDATE statements. Upsert semantics refer to atomically adding a new row 
or updating the existing row if there is a primary key constraint violation, 
which provides idempotence.                                                     
           |
+| `nullValueAction` | enum(FAIL, DELETE) | false    | FAIL               | How 
to handle records with NULL values. Possible options are `DELETE` or `FAIL`.    
                                                                                
                                                                                
                                                                                
    |
+| `useTransactions` | boolean | false    | true               | Enable 
transactions of the database.                                                   
                                                                                
                                                                                
                                                                                
 
 | `excludeNonDeclaredFields` | boolean | false    | false              | All 
the table fields are discovered automatically. `excludeNonDeclaredFields` 
indicates if the table fields not explicitly listed in `nonKey` and `key` must 
be included in the query. By default all the table fields are included. To 
leverage of table fields defaults during insertion, it is suggested to set this 
value to `false`. |
+| `useJdbcBatch`    | boolean | false    | false              | Use the JDBC 
batch API. This option is suggested to improve write performance. |
 
 ### Example of ClickHouse
 

Reply via email to