dlg99 commented on code in PR #18017:
URL: https://github.com/apache/pulsar/pull/18017#discussion_r997545101
##########
pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java:
##########
@@ -213,63 +224,90 @@ protected enum MutationType {
private void flush() {
- // if not in flushing state, do flush, else return;
if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
- if (log.isDebugEnabled()) {
- log.debug("Starting flush, queue size: {}",
incomingList.size());
- }
- if (!swapList.isEmpty()) {
- throw new IllegalStateException("swapList should be empty
since last flush. swapList.size: "
- + swapList.size());
- }
- synchronized (this) {
- List<Record<T>> tmpList;
- swapList.clear();
+ boolean needAnotherRound;
+ final Deque<Record<T>> swapList = new LinkedList<>();
+
+ synchronized (incomingList) {
+ if (log.isDebugEnabled()) {
+ log.debug("Starting flush, queue size: {}",
incomingList.size());
+ }
+ final int actualBatchSize = batchSize > 0 ?
Math.min(incomingList.size(), batchSize) :
+ incomingList.size();
- tmpList = swapList;
- swapList = incomingList;
- incomingList = tmpList;
+ for (int i = 0; i < actualBatchSize; i++) {
+ swapList.add(incomingList.removeFirst());
+ }
+ needAnotherRound = batchSize > 0 && !incomingList.isEmpty() &&
incomingList.size() >= batchSize;
}
+ long start = System.nanoTime();
int count = 0;
try {
+ PreparedStatement currentBatch = null;
+ final List<Mutation> mutations = swapList
+ .stream()
+ .map(this::createMutation)
+ .collect(Collectors.toList());
// bind each record value
- for (Record<T> record : swapList) {
- final Mutation mutation = createMutation(record);
+ PreparedStatement statement;
+ for (Mutation mutation : mutations) {
switch (mutation.getType()) {
case DELETE:
- bindValue(deleteStatement, mutation);
- count += 1;
- deleteStatement.execute();
+ statement = deleteStatement;
break;
case UPDATE:
- bindValue(updateStatement, mutation);
- count += 1;
- updateStatement.execute();
+ statement = updateStatement;
break;
case INSERT:
- bindValue(insertStatement, mutation);
- count += 1;
- insertStatement.execute();
+ statement = insertStatement;
break;
case UPSERT:
- bindValue(upsertStatement, mutation);
- count += 1;
- upsertStatement.execute();
+ statement = upsertStatement;
break;
default:
String msg = String.format(
"Unsupported action %s, can be one of %s,
or not set which indicate %s",
mutation.getType(),
Arrays.toString(MutationType.values()), MutationType.INSERT);
throw new IllegalArgumentException(msg);
}
+ bindValue(statement, mutation);
+ count += 1;
+ if (jdbcSinkConfig.isUseJdbcBatch()) {
+ if (currentBatch != null && statement != currentBatch)
{
+ executeBatch(swapList, currentBatch);
+ if (log.isDebugEnabled()) {
+ log.debug("Flushed {} messages in {} ms",
count, (System.nanoTime() - start) / 1000 / 1000);
+ }
+ start = System.nanoTime();
+ }
+ statement.addBatch();
+ currentBatch = statement;
+ } else {
+ statement.execute();
+ if (!jdbcSinkConfig.isUseTransactions()) {
+ swapList.removeFirst().ack();
+ }
+ }
}
- if (jdbcSinkConfig.isUseTransactions()) {
- connection.commit();
+
+ if (jdbcSinkConfig.isUseJdbcBatch()) {
+ executeBatch(swapList, currentBatch);
+ if (log.isDebugEnabled()) {
+ log.debug("Flushed {} messages in {} ms", count,
(System.nanoTime() - start) / 1000 / 1000);
+ }
+ } else {
+ if (jdbcSinkConfig.isUseTransactions()) {
+ connection.commit();
+ swapList.forEach(Record::ack);
+ }
Review Comment:
This looks like transactions and batches are mutually exclusive.
transactions and batching are all interwoven here, I'd do two helper methods
like "internalFlushBatch" and "internalFlush" so you can do something like
```java
if (needToBatch) {
internalFlushBatch();
else {
internalFlush();
}
```
##########
pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java:
##########
@@ -280,21 +318,56 @@ private void flush() {
}
}
- if (swapList.size() != count) {
- log.error("Update count {} not match total number of records
{}", count, swapList.size());
- }
-
- // finish flush
- if (log.isDebugEnabled()) {
- log.debug("Finish flush, queue size: {}", swapList.size());
- }
- swapList.clear();
isFlushing.set(false);
+ if (needAnotherRound) {
+ flush();
+ }
Review Comment:
I think this is not covered by tests (have I missed it?)
##########
pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java:
##########
@@ -76,15 +76,24 @@ public class JdbcSinkConfig implements Serializable {
@FieldDoc(
required = false,
defaultValue = "500",
- help = "The jdbc operation timeout in milliseconds"
+ help = "Enable batch mode by time. After timeoutMs milliseconds the
operations queue will be flushed."
)
private int timeoutMs = 500;
@FieldDoc(
required = false,
defaultValue = "200",
- help = "The batch size of updates made to the database"
+ help = "Enable batch mode by number of operations. This value is the
max number of operations "
+ + "batched in the same transaction/batch."
)
private int batchSize = 200;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "false",
+ help = "Use the JDBC batch API. This option is suggested to
improve writes performances."
Review Comment:
```suggestion
help = "Use the JDBC batch API. This option is suggested to
improve write performance."
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]