This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7b52a92bbb4 [improve][io] JDBC sinks: implement JDBC Batch API (#18017)
7b52a92bbb4 is described below
commit 7b52a92bbb4dca4976233f2e99da289c38856a1a
Author: Nicolò Boschi <[email protected]>
AuthorDate: Thu Oct 20 09:48:21 2022 +0200
[improve][io] JDBC sinks: implement JDBC Batch API (#18017)
* [improve][io] JDBC sinks: implement JDBC Batch API
* more tests and transactions support
* remove .db files
* doc
* fix batch results and thread safey
* add next flush test - fix doc - improve code readability
---
.../apache/pulsar/io/jdbc/JdbcAbstractSink.java | 174 ++++++++----
.../org/apache/pulsar/io/jdbc/JdbcSinkConfig.java | 20 +-
.../pulsar/io/jdbc/SqliteJdbcSinkBatchTest.java | 35 +++
.../apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java | 307 +++++++++++++++------
.../org/apache/pulsar/io/jdbc/SqliteUtils.java | 17 ++
site2/docs/io-jdbc-sink.md | 27 +-
6 files changed, 433 insertions(+), 147 deletions(-)
diff --git
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index 74a19b7b187..06beaaacf9e 100644
---
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -23,7 +23,13 @@ import com.google.common.collect.Lists;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -32,6 +38,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
+import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
@@ -64,8 +71,7 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
protected JdbcUtils.TableDefinition tableDefinition;
// for flush
- private List<Record<T>> incomingList;
- private List<Record<T>> swapList;
+ private Deque<Record<T>> incomingList;
private AtomicBoolean isFlushing;
private int batchSize;
private ScheduledExecutorService flushExecutor;
@@ -73,6 +79,7 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
@Override
public void open(Map<String, Object> config, SinkContext sinkContext)
throws Exception {
jdbcSinkConfig = JdbcSinkConfig.load(config);
+ jdbcSinkConfig.validate();
jdbcUrl = jdbcSinkConfig.getJdbcUrl();
if (jdbcSinkConfig.getJdbcUrl() == null) {
@@ -100,12 +107,13 @@ public abstract class JdbcAbstractSink<T> implements
Sink<T> {
int timeoutMs = jdbcSinkConfig.getTimeoutMs();
batchSize = jdbcSinkConfig.getBatchSize();
- incomingList = Lists.newArrayList();
- swapList = Lists.newArrayList();
+ incomingList = new LinkedList<>();
isFlushing = new AtomicBoolean(false);
flushExecutor = Executors.newScheduledThreadPool(1);
- flushExecutor.scheduleAtFixedRate(this::flush, timeoutMs, timeoutMs,
TimeUnit.MILLISECONDS);
+ if (timeoutMs > 0) {
+ flushExecutor.scheduleAtFixedRate(this::flush, timeoutMs,
timeoutMs, TimeUnit.MILLISECONDS);
+ }
}
private void initStatement() throws Exception {
@@ -173,11 +181,14 @@ public abstract class JdbcAbstractSink<T> implements
Sink<T> {
@Override
public void write(Record<T> record) throws Exception {
int number;
- synchronized (this) {
+ synchronized (incomingList) {
incomingList.add(record);
number = incomingList.size();
}
- if (number == batchSize) {
+ if (batchSize > 0 && number >= batchSize) {
+ if (log.isDebugEnabled()) {
+ log.debug("flushing by batches, hit batch size {}", batchSize);
+ }
flushExecutor.schedule(this::flush, 0, TimeUnit.MILLISECONDS);
}
}
@@ -220,49 +231,46 @@ public abstract class JdbcAbstractSink<T> implements
Sink<T> {
private void flush() {
- // if not in flushing state, do flush, else return;
if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
- if (log.isDebugEnabled()) {
- log.debug("Starting flush, queue size: {}",
incomingList.size());
- }
- if (!swapList.isEmpty()) {
- throw new IllegalStateException("swapList should be empty
since last flush. swapList.size: "
- + swapList.size());
- }
- synchronized (this) {
- List<Record<T>> tmpList;
- swapList.clear();
+ boolean needAnotherRound;
+ final Deque<Record<T>> swapList = new LinkedList<>();
+
+ synchronized (incomingList) {
+ if (log.isDebugEnabled()) {
+ log.debug("Starting flush, queue size: {}",
incomingList.size());
+ }
+ final int actualBatchSize = batchSize > 0 ?
Math.min(incomingList.size(), batchSize) :
+ incomingList.size();
- tmpList = swapList;
- swapList = incomingList;
- incomingList = tmpList;
+ for (int i = 0; i < actualBatchSize; i++) {
+ swapList.add(incomingList.removeFirst());
+ }
+ needAnotherRound = batchSize > 0 && !incomingList.isEmpty() &&
incomingList.size() >= batchSize;
}
+ long start = System.nanoTime();
int count = 0;
try {
+ PreparedStatement currentBatch = null;
+ final List<Mutation> mutations = swapList
+ .stream()
+ .map(this::createMutation)
+ .collect(Collectors.toList());
// bind each record value
- for (Record<T> record : swapList) {
- final Mutation mutation = createMutation(record);
+ PreparedStatement statement;
+ for (Mutation mutation : mutations) {
switch (mutation.getType()) {
case DELETE:
- bindValue(deleteStatement, mutation);
- count += 1;
- deleteStatement.execute();
+ statement = deleteStatement;
break;
case UPDATE:
- bindValue(updateStatement, mutation);
- count += 1;
- updateStatement.execute();
+ statement = updateStatement;
break;
case INSERT:
- bindValue(insertStatement, mutation);
- count += 1;
- insertStatement.execute();
+ statement = insertStatement;
break;
case UPSERT:
- bindValue(upsertStatement, mutation);
- count += 1;
- upsertStatement.execute();
+ statement = upsertStatement;
break;
default:
String msg = String.format(
@@ -270,13 +278,34 @@ public abstract class JdbcAbstractSink<T> implements
Sink<T> {
mutation.getType(),
Arrays.toString(MutationType.values()), MutationType.INSERT);
throw new IllegalArgumentException(msg);
}
+ bindValue(statement, mutation);
+ count += 1;
+ if (jdbcSinkConfig.isUseJdbcBatch()) {
+ if (currentBatch != null && statement != currentBatch)
{
+ internalFlushBatch(swapList, currentBatch, count,
start);
+ start = System.nanoTime();
+ }
+ statement.addBatch();
+ currentBatch = statement;
+ } else {
+ statement.execute();
+ if (!jdbcSinkConfig.isUseTransactions()) {
+ swapList.removeFirst().ack();
+ }
+ }
}
- if (jdbcSinkConfig.isUseTransactions()) {
- connection.commit();
+
+ if (jdbcSinkConfig.isUseJdbcBatch()) {
+ internalFlushBatch(swapList, currentBatch, count, start);
+ } else {
+ internalFlush(swapList);
}
- swapList.forEach(Record::ack);
} catch (Exception e) {
- log.error("Got exception {}", e.getMessage(), e);
+ log.error("Got exception {} after {} ms, failing {} messages",
+ e.getMessage(),
+ (System.nanoTime() - start) / 1000 / 1000,
+ swapList.size(),
+ e);
swapList.forEach(Record::fail);
try {
if (jdbcSinkConfig.isUseTransactions()) {
@@ -287,16 +316,10 @@ public abstract class JdbcAbstractSink<T> implements
Sink<T> {
}
}
- if (swapList.size() != count) {
- log.error("Update count {} not match total number of records
{}", count, swapList.size());
- }
-
- // finish flush
- if (log.isDebugEnabled()) {
- log.debug("Finish flush, queue size: {}", swapList.size());
- }
- swapList.clear();
isFlushing.set(false);
+ if (needAnotherRound) {
+ flush();
+ }
} else {
if (log.isDebugEnabled()) {
log.debug("Already in flushing state, will not flush, queue
size: {}", incomingList.size());
@@ -304,4 +327,59 @@ public abstract class JdbcAbstractSink<T> implements
Sink<T> {
}
}
+ private void internalFlush(Deque<Record<T>> swapList) throws SQLException {
+ if (jdbcSinkConfig.isUseTransactions()) {
+ connection.commit();
+ swapList.forEach(Record::ack);
+ }
+ }
+
+ private void internalFlushBatch(Deque<Record<T>> swapList,
PreparedStatement currentBatch, int count, long start) throws SQLException {
+ executeBatch(swapList, currentBatch);
+ if (log.isDebugEnabled()) {
+ log.debug("Flushed {} messages in {} ms", count,
(System.nanoTime() - start) / 1000 / 1000);
+ }
+ }
+
+ private void executeBatch(Deque<Record<T>> swapList, PreparedStatement
statement) throws SQLException {
+ final int[] results = statement.executeBatch();
+ Map<Integer, Integer> failuresMapping = null;
+ final boolean useTransactions = jdbcSinkConfig.isUseTransactions();
+
+ for (int r: results) {
+ if (isBatchItemFailed(r)) {
+ if (failuresMapping == null) {
+ failuresMapping = new HashMap<>();
+ }
+ final Integer current = failuresMapping.computeIfAbsent(r,
code -> 1);
+ failuresMapping.put(r, current + 1);
+ }
+ }
+ if (failuresMapping == null || failuresMapping.isEmpty()) {
+ if (useTransactions) {
+ connection.commit();
+ }
+ for (int r: results) {
+ swapList.removeFirst().ack();
+ }
+ } else {
+ if (useTransactions) {
+ connection.rollback();
+ }
+ for (int r: results) {
+ swapList.removeFirst().fail();
+ }
+ String msg = "Batch failed, got error results (error_code->count):
" + failuresMapping;
+ // throwing an exception here means the main loop cycle will nack
the messages in the next batch
+ throw new SQLException(msg);
+ }
+ }
+
+ private static boolean isBatchItemFailed(int returnCode) {
+ if (returnCode == Statement.SUCCESS_NO_INFO || returnCode >= 0) {
+ return false;
+ }
+ return true;
+ }
+
}
diff --git
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
index 74f339bd443..ac9a36be796 100644
---
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
+++
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
@@ -87,15 +87,24 @@ public class JdbcSinkConfig implements Serializable {
@FieldDoc(
required = false,
defaultValue = "500",
- help = "The jdbc operation timeout in milliseconds"
+ help = "Enable batch mode by time. After timeoutMs milliseconds the
operations queue will be flushed."
)
private int timeoutMs = 500;
@FieldDoc(
required = false,
defaultValue = "200",
- help = "The batch size of updates made to the database"
+ help = "Enable batch mode by number of operations. This value is the
max number of operations "
+ + "batched in the same transaction/batch."
)
private int batchSize = 200;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "false",
+ help = "Use the JDBC batch API. This option is suggested to
improve write performance."
+ )
+ private boolean useJdbcBatch = false;
+
@FieldDoc(
required = false,
defaultValue = "true",
@@ -141,4 +150,11 @@ public class JdbcSinkConfig implements Serializable {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(new ObjectMapper().writeValueAsString(map),
JdbcSinkConfig.class);
}
+
+ public void validate() {
+ if (timeoutMs <= 0 && batchSize <= 0) {
+ throw new IllegalArgumentException("timeoutMs or batchSize must be
set to a positive value.");
+ }
+ }
+
}
diff --git
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkBatchTest.java
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkBatchTest.java
new file mode 100644
index 00000000000..012b37bacec
--- /dev/null
+++
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkBatchTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import lombok.extern.slf4j.Slf4j;
+import java.util.Map;
+
+/**
+ * Jdbc Sink test with JDBC Batches API enabled
+ */
+@Slf4j
+public class SqliteJdbcSinkBatchTest extends SqliteJdbcSinkTest {
+
+ @Override
+ protected void configure(Map<String, Object> configuration) {
+ configuration.put("useJdbcBatch", "true");
+ }
+}
diff --git
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
index 030a9b4187b..38cdc29a1c9 100644
---
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
+++
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
@@ -25,17 +25,22 @@ import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.util.Utf8;
+import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericObject;
@@ -45,9 +50,7 @@ import
org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.AvroSchema;
-import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
-import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
@@ -72,6 +75,8 @@ public class SqliteJdbcSinkTest {
* A Simple class to test jdbc class
*/
@Data
+ @NoArgsConstructor
+ @AllArgsConstructor
public static class Foo {
private String field1;
private String field2;
@@ -96,17 +101,26 @@ public class SqliteJdbcSinkTest {
// prepare data for delete sql
String deleteSql = "insert into " + tableName + "
values('ValueOfField5', 'ValueOfField5', 5)";
sqliteUtils.execute(deleteSql);
- Map<String, Object> conf;
+ restartSinkWithConfig(null);
+ }
+ private void restartSinkWithConfig(Map<String, Object> additional) throws
Exception {
+ if (jdbcSink != null) {
+ jdbcSink.close();
+ }
String jdbcUrl = sqliteUtils.sqliteUri();
- conf = Maps.newHashMap();
+ Map<String, Object> conf = Maps.newHashMap();
conf.put("jdbcUrl", jdbcUrl);
conf.put("tableName", tableName);
conf.put("key", "field3");
conf.put("nonKey", "field1,field2");
// change batchSize to 1, to flush on each write.
conf.put("batchSize", 1);
+ if (additional != null) {
+ conf.putAll(additional);
+ }
+ configure(conf);
jdbcSink = new SqliteJdbcAutoSchemaSink();
@@ -114,6 +128,9 @@ public class SqliteJdbcSinkTest {
jdbcSink.open(conf, null);
}
+ protected void configure(Map<String, Object> configuration) {
+ }
+
@AfterMethod(alwaysRun = true)
public void tearDown() throws Exception {
jdbcSink.close();
@@ -121,7 +138,7 @@ public class SqliteJdbcSinkTest {
}
private void testOpenAndWriteSinkNullValue(Map<String, String>
actionProperties) throws Exception {
- Message<GenericObject> insertMessage = mock(MessageImpl.class);
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
GenericSchema<GenericRecord> genericAvroSchema;
// prepare a foo Record
Foo insertObj = new Foo();
@@ -129,26 +146,8 @@ public class SqliteJdbcSinkTest {
// Not setting field2
// Field1 is the key and field3 is used for selecting records
insertObj.setField3(3);
- AvroSchema<Foo> schema =
AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(true).build());
-
- byte[] insertBytes = schema.encode(insertObj);
- CompletableFuture<Void> future = new CompletableFuture<>();
- Record<GenericObject> insertRecord =
PulsarRecord.<GenericObject>builder()
- .message(insertMessage)
- .topicName("fake_topic_name")
- .ackFunction(() -> future.complete(null))
- .build();
-
- genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
-
when(insertMessage.getValue()).thenReturn(genericAvroSchema.decode(insertBytes));
- when(insertMessage.getProperties()).thenReturn(actionProperties);
- log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
- insertObj.toString(),
- insertMessage.getValue().toString(),
- insertRecord.getValue().toString());
-
- // write should success.
- jdbcSink.write(insertRecord);
+ final Record<GenericObject> record = createMockFooRecord(insertObj,
actionProperties, future);
+ jdbcSink.write(record);
log.info("executed write");
// sleep to wait backend flush complete
future.get(1, TimeUnit.SECONDS);
@@ -165,33 +164,16 @@ public class SqliteJdbcSinkTest {
}
private void testOpenAndWriteSinkJson(Map<String, String>
actionProperties) throws Exception {
- Message<GenericObject> insertMessage = mock(MessageImpl.class);
- GenericSchema<GenericRecord> genericAvroSchema;
// prepare a foo Record
Foo insertObj = new Foo();
insertObj.setField1("ValueOfField1");
insertObj.setField2("ValueOfField2");
insertObj.setField3(3);
- JSONSchema<Foo> schema =
JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(true).build());
-
- byte[] insertBytes = schema.encode(insertObj);
- CompletableFuture<Void> future = new CompletableFuture<>();
- Record<GenericObject> insertRecord =
PulsarRecord.<GenericObject>builder()
- .message(insertMessage)
- .topicName("fake_topic_name")
- .ackFunction(() -> future.complete(null))
- .build();
-
- GenericSchema<GenericRecord> decodeSchema =
GenericSchemaImpl.of(schema.getSchemaInfo());
-
when(insertMessage.getValue()).thenReturn(decodeSchema.decode(insertBytes));
- when(insertMessage.getProperties()).thenReturn(actionProperties);
- log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
- insertObj.toString(),
- insertMessage.getValue().toString(),
- insertRecord.getValue().toString());
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ final Record<GenericObject> record = createMockFooRecord(insertObj,
actionProperties, future);
// write should success.
- jdbcSink.write(insertRecord);
+ jdbcSink.write(record);
log.info("executed write");
// sleep to wait backend flush complete
future.get(1, TimeUnit.SECONDS);
@@ -216,26 +198,11 @@ public class SqliteJdbcSinkTest {
// Not setting field2
// Field1 is the key and field3 is used for selecting records
insertObj.setField3(3);
- JSONSchema<Foo> schema =
JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(true).build());
-
- byte[] insertBytes = schema.encode(insertObj);
- CompletableFuture<Void> future = new CompletableFuture<>();
- Record<GenericObject> insertRecord =
PulsarRecord.<GenericObject>builder()
- .message(insertMessage)
- .topicName("fake_topic_name")
- .ackFunction(() -> future.complete(null))
- .build();
-
- GenericSchema<GenericRecord> decodeSchema =
GenericSchemaImpl.of(schema.getSchemaInfo());
-
when(insertMessage.getValue()).thenReturn(decodeSchema.decode(insertBytes));
- when(insertMessage.getProperties()).thenReturn(actionProperties);
- log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
- insertObj.toString(),
- insertMessage.getValue().toString(),
- insertRecord.getValue().toString());
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ final Record<GenericObject> record = createMockFooRecord(insertObj,
actionProperties, future);
// write should success.
- jdbcSink.write(insertRecord);
+ jdbcSink.write(record);
log.info("executed write");
// sleep to wait backend flush complete
// sleep to wait backend flush complete
@@ -260,26 +227,11 @@ public class SqliteJdbcSinkTest {
insertObj.setField1("ValueOfField1");
insertObj.setField2("ValueOfField2");
insertObj.setField3(3);
- AvroSchema<Foo> schema =
AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
-
- byte[] insertBytes = schema.encode(insertObj);
- CompletableFuture<Void> future = new CompletableFuture<>();
- Record<GenericObject> insertRecord =
PulsarRecord.<GenericObject>builder()
- .message(insertMessage)
- .topicName("fake_topic_name")
- .ackFunction(() -> future.complete(null))
- .build();
-
- genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
-
when(insertMessage.getValue()).thenReturn(genericAvroSchema.decode(insertBytes));
- when(insertMessage.getProperties()).thenReturn(actionProperties);
- log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
- insertObj.toString(),
- insertMessage.getValue().toString(),
- insertRecord.getValue().toString());
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ final Record<GenericObject> record = createMockFooRecord(insertObj,
actionProperties, future);
// write should success.
- jdbcSink.write(insertRecord);
+ jdbcSink.write(record);
log.info("executed write");
// sleep to wait backend flush complete
future.get(1, TimeUnit.SECONDS);
@@ -342,7 +294,7 @@ public class SqliteJdbcSinkTest {
byte[] updateBytes = schema.encode(updateObj);
Message<GenericObject> updateMessage = mock(MessageImpl.class);
- CompletableFuture<Void> future = new CompletableFuture<>();
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
Record<GenericObject> updateRecord =
PulsarRecord.<GenericObject>builder()
.message(updateMessage)
.topicName("fake_topic_name")
@@ -383,7 +335,7 @@ public class SqliteJdbcSinkTest {
byte[] deleteBytes = schema.encode(deleteObj);
Message<GenericObject> deleteMessage = mock(MessageImpl.class);
- CompletableFuture<Void> future = new CompletableFuture<>();
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
Record<GenericObject> deleteRecord =
PulsarRecord.<GenericObject>builder()
.message(deleteMessage)
.topicName("fake_topic_name")
@@ -409,6 +361,172 @@ public class SqliteJdbcSinkTest {
Assert.assertEquals(sqliteUtils.select(deleteQuerySql, (resultSet) ->
{}), 0);
}
+ @Test
+ public void testBatchMode() throws Exception {
+ Map<String, Object> config = new HashMap<>();
+ config.put("batchSize", 3);
+ config.put("timeoutMs", 0);
+ restartSinkWithConfig(config);
+ Foo updateObj = new Foo();
+ updateObj.setField1("f1");
+ updateObj.setField2("f12");
+ updateObj.setField3(1);
+ Map<String, String> updateProperties = Maps.newHashMap();
+ updateProperties.put("ACTION", "INSERT");
+ final CompletableFuture<Boolean> futureByEntries1 = new
CompletableFuture<>();
+ jdbcSink.write(createMockFooRecord(updateObj, updateProperties,
futureByEntries1));
+ Assert.assertThrows(TimeoutException.class, () ->
futureByEntries1.get(1, TimeUnit.SECONDS));
+ final CompletableFuture<Boolean> futureByEntries2 = new
CompletableFuture<>();
+ updateProperties.put("ACTION", "UPDATE");
+ updateObj.setField2("f13");
+ jdbcSink.write(createMockFooRecord(updateObj, updateProperties,
futureByEntries2));
+ Assert.assertThrows(TimeoutException.class, () ->
futureByEntries1.get(1, TimeUnit.SECONDS));
+ Assert.assertThrows(TimeoutException.class, () ->
futureByEntries2.get(1, TimeUnit.SECONDS));
+ updateObj.setField2("f14");
+ final CompletableFuture<Boolean> futureByEntries3 = new
CompletableFuture<>();
+ jdbcSink.write(createMockFooRecord(updateObj, updateProperties,
futureByEntries3));
+ futureByEntries1.get(1, TimeUnit.SECONDS);
+ futureByEntries2.get(1, TimeUnit.SECONDS);
+ futureByEntries3.get(1, TimeUnit.SECONDS);
+
+ config.put("batchSize", 0);
+ config.put("timeoutMs", TimeUnit.SECONDS.toMillis(3));
+ restartSinkWithConfig(config);
+ final CompletableFuture<Boolean> futureByTime = new
CompletableFuture<>();
+ jdbcSink.write(createMockFooRecord(updateObj, updateProperties,
futureByTime));
+ Assert.assertThrows(TimeoutException.class, () -> futureByTime.get(1,
TimeUnit.SECONDS));
+ futureByTime.get(3, TimeUnit.SECONDS);
+
+ }
+
+ /**
+ * Verify that if the flush is finished but the incoming records list size
is equals
+ * or greater than the batch size,
+ * the next flush is immediately triggered (without any other writes).
+ * @throws Exception
+ */
+ @Test
+ public void testBatchModeContinueFlushing() throws Exception {
+ Map<String, Object> config = new HashMap<>();
+ config.put("batchSize", 1);
+ config.put("timeoutMs", 0);
+ restartSinkWithConfig(config);
+ // block the auto flushing mechanism
+ FieldUtils.writeField(jdbcSink, "isFlushing", new AtomicBoolean(true),
true);
+ Foo updateObj = new Foo("f1", "f12", 1);
+ Map<String, String> updateProperties = Maps.newHashMap();
+ updateProperties.put("ACTION", "INSERT");
+ final CompletableFuture<Boolean> futureByEntries1 = new
CompletableFuture<>();
+ jdbcSink.write(createMockFooRecord(updateObj, updateProperties,
futureByEntries1));
+ Assert.assertThrows(TimeoutException.class, () ->
futureByEntries1.get(1, TimeUnit.SECONDS));
+
+ FieldUtils.writeField(jdbcSink, "isFlushing", new
AtomicBoolean(false), true);
+
+ updateObj = new Foo("f2", "f12", 1);
+ updateProperties = Maps.newHashMap();
+ updateProperties.put("ACTION", "INSERT");
+ final CompletableFuture<Boolean> futureByEntries2 = new
CompletableFuture<>();
+ jdbcSink.write(createMockFooRecord(updateObj, updateProperties,
futureByEntries2));
+
+ futureByEntries1.get(1, TimeUnit.SECONDS);
+ futureByEntries2.get(1, TimeUnit.SECONDS);
+ }
+
+ @DataProvider(name = "useTransactions")
+ public Object[] useTransactions() {
+ return Arrays.asList(true, false).toArray();
+ }
+
+ @Test(dataProvider = "useTransactions")
+ public void testBatchModeFailures(boolean useTransactions) throws
Exception {
+ jdbcSink.close();
+ jdbcSink = null;
+ sqliteUtils.execute("delete from " + tableName);
+ restartSinkWithConfig(null);
+ Foo foo = new Foo("f1", "f2", 1);
+ Map<String, String> updateProperties = Maps.newHashMap();
+ updateProperties.put("ACTION", "INSERT");
+ final CompletableFuture<Boolean> future0 = new CompletableFuture<>();
+ jdbcSink.write(createMockFooRecord(foo, updateProperties, future0));
+ Assert.assertTrue(future0.get());
+
+ Map<String, Object> config = new HashMap<>();
+ config.put("batchSize", 5);
+ config.put("useTransactions", useTransactions);
+ restartSinkWithConfig(config);
+
+ foo = new Foo("f2", "f2", 2);
+ updateProperties = Maps.newHashMap();
+ updateProperties.put("ACTION", "INSERT");
+ final CompletableFuture<Boolean> future2 = new CompletableFuture<>();
+ jdbcSink.write(createMockFooRecord(foo, updateProperties, future2));
+
+ foo = new Foo("f3", "f2", 3);
+ updateProperties = Maps.newHashMap();
+ updateProperties.put("ACTION", "INSERT");
+ final CompletableFuture<Boolean> future3 = new CompletableFuture<>();
+ jdbcSink.write(createMockFooRecord(foo, updateProperties, future3));
+
+ foo = new Foo("f1", "f21", 11);
+ updateProperties = Maps.newHashMap();
+ updateProperties.put("ACTION", "UPDATE");
+ final CompletableFuture<Boolean> future4 = new CompletableFuture<>();
+ jdbcSink.write(createMockFooRecord(foo, updateProperties, future4));
+
+ foo = new Foo("f1", "f2no", 9);
+ updateProperties = Maps.newHashMap();
+ updateProperties.put("ACTION", "INSERT");
+ final CompletableFuture<Boolean> future5 = new CompletableFuture<>();
+ jdbcSink.write(createMockFooRecord(foo, updateProperties, future5));
+
+ foo = new Foo("f1", "f3", 5);
+ updateProperties = Maps.newHashMap();
+ updateProperties.put("ACTION", "UPDATE");
+ final CompletableFuture<Boolean> future6 = new CompletableFuture<>();
+ jdbcSink.write(createMockFooRecord(foo, updateProperties, future6));
+
+
+ if (jdbcSink.jdbcSinkConfig.isUseTransactions()) {
+ if (jdbcSink.jdbcSinkConfig.isUseJdbcBatch()) {
+ Assert.assertTrue(future2.get(1, TimeUnit.SECONDS));
+ Assert.assertTrue(future3.get(1, TimeUnit.SECONDS));
+ Assert.assertTrue(future4.get(1, TimeUnit.SECONDS));
+ Assert.assertFalse(future5.get(1, TimeUnit.SECONDS));
+ Assert.assertFalse(future6.get(1, TimeUnit.SECONDS));
+ final int count = sqliteUtils.select("select
field1,field2,field3 from "
+ + tableName
+ + " where (field1='f1' and field2='f21') or
field1='f2' or field1='f3'", (r) -> {});
+ Assert.assertEquals(count, 2);
+
+ } else {
+ Assert.assertFalse(future2.get(1, TimeUnit.SECONDS));
+ Assert.assertFalse(future3.get(1, TimeUnit.SECONDS));
+ Assert.assertFalse(future4.get(1, TimeUnit.SECONDS));
+ Assert.assertFalse(future5.get(1, TimeUnit.SECONDS));
+ Assert.assertFalse(future6.get(1, TimeUnit.SECONDS));
+ final int count = sqliteUtils.select("select
field1,field2,field3 from "
+ + tableName
+ + " where (field1='f1' and field2='f2') or field1='f2'
or field1='f3'", (r) -> {});
+ Assert.assertEquals(count, 1);
+ }
+ } else {
+ Assert.assertTrue(future2.get(1, TimeUnit.SECONDS));
+ Assert.assertTrue(future3.get(1, TimeUnit.SECONDS));
+ Assert.assertTrue(future4.get(1, TimeUnit.SECONDS));
+ Assert.assertFalse(future5.get(1, TimeUnit.SECONDS));
+ Assert.assertFalse(future6.get(1, TimeUnit.SECONDS));
+ System.out.println("dump:\n" + sqliteUtils.dump("select
field1,field2,field3 from " + tableName));
+
+ final int count = sqliteUtils.select("select field1,field2,field3
from "
+ + tableName
+ + " where (field1='f1' and field2='f21') or field1='f2' or
field1='f3'", (r) -> {
+ log.info("got {};{};{}", r.getString(1), r.getString(2),
r.getInt(3));
+ });
+ Assert.assertEquals(count, 2);
+ }
+ }
+
+
private static class MockKeyValueGenericRecord implements
Record<GenericObject> {
public MockKeyValueGenericRecord(Schema<KeyValue<GenericRecord,
GenericRecord>> keyValueSchema) {
@@ -716,7 +834,7 @@ public class SqliteJdbcSinkTest {
if (key.equals("mykey2")) {
Assert.assertEquals(value, "thestring");
} else {
- throw new IllegalStateException();
+ throw new IllegalStateException("got unexpected key "
+ key);
}
});
if (nullValueAction == JdbcSinkConfig.NullValueAction.DELETE) {
@@ -731,4 +849,25 @@ public class SqliteJdbcSinkTest {
}
}
+ private Record<GenericObject> createMockFooRecord(Foo record, Map<String,
String> actionProperties,
+
CompletableFuture<Boolean> future) {
+ Message<GenericObject> insertMessage = mock(MessageImpl.class);
+ GenericSchema<GenericRecord> genericAvroSchema;
+ AvroSchema<Foo> schema =
AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(true).build());
+
+ byte[] insertBytes = schema.encode(record);
+
+ Record<GenericObject> insertRecord =
PulsarRecord.<GenericObject>builder()
+ .message(insertMessage)
+ .topicName("fake_topic_name")
+ .ackFunction(() -> future.complete(true))
+ .failFunction(() -> future.complete(false))
+ .build();
+
+ genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
+
when(insertMessage.getValue()).thenReturn(genericAvroSchema.decode(insertBytes));
+ when(insertMessage.getProperties()).thenReturn(actionProperties);
+ return insertRecord;
+ }
+
}
diff --git
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java
index f8cdf76f7d3..9e3a3fe255d 100644
---
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java
+++
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java
@@ -100,6 +100,23 @@ public final class SqliteUtils {
return count;
}
+ public String dump(final String query) throws SQLException {
+ StringBuilder builder = new StringBuilder();
+ try (Connection connection = getConnection(true);
+ Statement stmt = connection.createStatement()) {
+ try (ResultSet rs = stmt.executeQuery(query)) {
+ while (rs.next()) {
+ for (int i = 0; i < rs.getMetaData().getColumnCount();
i++) {
+ builder.append(rs.getObject(i + 1));
+ builder.append(";");
+ }
+ builder.append(System.lineSeparator());
+ }
+ }
+ }
+ return builder.toString();
+ }
+
public void execute(String sql) throws SQLException {
try (Connection connection = getConnection(true);
Statement stmt = connection.createStatement()) {
diff --git a/site2/docs/io-jdbc-sink.md b/site2/docs/io-jdbc-sink.md
index 4c9a473e027..63150517e28 100644
--- a/site2/docs/io-jdbc-sink.md
+++ b/site2/docs/io-jdbc-sink.md
@@ -15,20 +15,21 @@ The configuration of all JDBC sink connectors has the
following properties.
### Property
-| Name | Type | Required | Default | Description
|
-|-------------|--------|----------|--------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| `userName` | String | false | " " (empty string) | The username used to
connect to the database specified by `jdbcUrl`.<br /><br />**Note: `userName`
is case-sensitive.**
|
-| `password` | String | false | " " (empty string) | The password used to
connect to the database specified by `jdbcUrl`. <br /><br />**Note: `password`
is case-sensitive.**
|
-| `jdbcUrl` | String | true | " " (empty string) | The JDBC URL of the
database that the connector connects to.
|
-| `tableName` | String | true | " " (empty string) | The name of the table
that the connector writes to.
|
-| `nonKey` | String | false | " " (empty string) | A comma-separated
list containing the fields used in updating events.
|
-| `key` | String | false | " " (empty string) | A comma-separated
list containing the fields used in `where` condition of updating and deleting
events.
|
-| `timeoutMs` | int | false | 500 | The JDBC operation
timeout in milliseconds.
|
-| `batchSize` | int | false | 200 | The batch size of
updates made to the database.
|
-| `insertMode` | enum( INSERT,UPSERT,UPDATE) | false | INSERT |
If it is configured as UPSERT, the sink uses upsert semantics rather than plain
INSERT/UPDATE statements. Upsert semantics refer to atomically adding a new row
or updating the existing row if there is a primary key constraint violation,
which provides idempotence.
|
-| `nullValueAction` | enum(FAIL, DELETE) | false | FAIL | How
to handle records with NULL values. Possible options are `DELETE` or `FAIL`.
|
-| `useTransactions` | boolean | false | true | Enable
transactions of the database.
+| Name | Type | Required | Default | Description
|
+|-------------|--------|----------|--------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `userName` | String | false | " " (empty string) | The username used to
connect to the database specified by `jdbcUrl`.<br /><br />**Note: `userName`
is case-sensitive.**
|
+| `password` | String | false | " " (empty string) | The password used to
connect to the database specified by `jdbcUrl`. <br /><br />**Note: `password`
is case-sensitive.**
|
+| `jdbcUrl` | String | true | " " (empty string) | The JDBC URL of the
database that the connector connects to.
|
+| `tableName` | String | true | " " (empty string) | The name of the table
that the connector writes to.
|
+| `nonKey` | String | false | " " (empty string) | A comma-separated
list containing the fields used in updating events.
|
+| `key` | String | false | " " (empty string) | A comma-separated
list containing the fields used in `where` condition of updating and deleting
events.
|
+| `timeoutMs` | int | false | 500 | The JDBC operation
timeout in milliseconds.
|
+| `batchSize` | int | false | 200 | The batch size of
updates made to the database.
|
+| `insertMode` | enum( INSERT,UPSERT,UPDATE) | false | INSERT |
If it is configured as UPSERT, the sink uses upsert semantics rather than plain
INSERT/UPDATE statements. Upsert semantics refer to atomically adding a new row
or updating the existing row if there is a primary key constraint violation,
which provides idempotence.
|
+| `nullValueAction` | enum(FAIL, DELETE) | false | FAIL | How
to handle records with NULL values. Possible options are `DELETE` or `FAIL`.
|
+| `useTransactions` | boolean | false | true | Enable
transactions of the database.
| `excludeNonDeclaredFields` | boolean | false | false | All
the table fields are discovered automatically. `excludeNonDeclaredFields`
indicates if the table fields not explicitly listed in `nonKey` and `key` must
be included in the query. By default all the table fields are included. To
leverage of table fields defaults during insertion, it is suggested to set this
value to `false`. |
+| `useJdbcBatch` | boolean | false | false | Use the JDBC
batch API. This option is suggested to improve write performance. |
### Example of ClickHouse