asafm commented on code in PR #16758:
URL: https://github.com/apache/pulsar/pull/16758#discussion_r957510523
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##########
@@ -368,6 +369,31 @@ public MetadataStore
createConfigurationMetadataStore(PulsarMetadataEventSynchro
.build());
}
+ private static void ensureConfigIsAppropriate(ServiceConfiguration
configuration){
+ if (configuration.isTransactionPendingAckBatchedWriteEnabled()){
+ if (configuration.getTransactionPendingAckBatchedWriteMaxRecords()
< 10){
+ throw new IllegalArgumentException("Txn pending ack batched
write max records suggestion at least 10");
Review Comment:
1. When the user gets this error, they need to know exactly which property
failed them. I think it's a good idea to say explicitly the property name in
the error message.
2. When you use the word "suggestion" you make it sound it sound it's not
required to have this value above 10.
How about:
"Configuration field '{}' value must be greater than 10 (value configured
was '{}')"
Use the same idea of phrasing for the rest of the error messages
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##########
@@ -368,6 +369,31 @@ public MetadataStore
createConfigurationMetadataStore(PulsarMetadataEventSynchro
.build());
}
+ private static void ensureConfigIsAppropriate(ServiceConfiguration
configuration){
Review Comment:
1. The name is too generic. Other developers might say "Oh, this is where we
make sure our config is appropriate, let's add our validation code in this
method" - this method will quickly spin out of control and become one big messy
hairball.
2. The location of this logic inside a class which is key to Pulsar and
already too big.
3. It doesn't follow the previous validation code examples before, in this
constructor.
I suggest the following:
* Move this method into a new class called
`TransactionBatchedWriteValidator` (or perhaps a better name) in the package
`org.apache.pulsar.broker.validator`
* rename method to `validate` (since class name already encapsulate the full
meaning)
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -374,46 +373,42 @@ private void doFlush(){
/**
* Release resources and cancel pending tasks.
*/
- @Override
- public void close() {
+ public CompletableFuture<Void> close() {
// If batch feature is disabled, there is nothing to close, so set the
stat only.
if (!batchEnabled) {
STATE_UPDATER.compareAndSet(this, State.OPEN, State.CLOSED);
- return;
+ return CompletableFuture.completedFuture(null);
}
// If other thread already called "close()", so do nothing.
if (!STATE_UPDATER.compareAndSet(this, State.OPEN, State.CLOSING)){
- return;
+ return CompletableFuture.completedFuture(null);
}
+ CompletableFuture closeFuture = new CompletableFuture();
// Cancel pending tasks and release resources.
singleThreadExecutorForWrite.execute(() -> {
try {
if (state == State.CLOSED) {
+ closeFuture.complete(null);
Review Comment:
What is the scenario in which you can reach state = CLOSED? Only one thread
will be able to pass OPEN -> CLOSING and submit the close lambda on the single
thread executor. This is the only lambda that can convert CLOSING -> CLOSED.
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -374,46 +373,42 @@ private void doFlush(){
/**
* Release resources and cancel pending tasks.
*/
- @Override
- public void close() {
+ public CompletableFuture<Void> close() {
// If batch feature is disabled, there is nothing to close, so set the
stat only.
if (!batchEnabled) {
STATE_UPDATER.compareAndSet(this, State.OPEN, State.CLOSED);
- return;
+ return CompletableFuture.completedFuture(null);
}
// If other thread already called "close()", so do nothing.
if (!STATE_UPDATER.compareAndSet(this, State.OPEN, State.CLOSING)){
- return;
+ return CompletableFuture.completedFuture(null);
}
+ CompletableFuture closeFuture = new CompletableFuture();
// Cancel pending tasks and release resources.
singleThreadExecutorForWrite.execute(() -> {
try {
if (state == State.CLOSED) {
+ closeFuture.complete(null);
return;
}
// If some requests are flushed, BK will trigger these
callbacks, and the remaining requests in should
// fail.
- failureCallbackByContextAndRecycle(flushContext,
BUFFERED_WRITER_CLOSED_EXCEPTION);
+ failureCallbackByContextAndRecycle(flushContext,
Review Comment:
This should be `new TxLogBufferedWriterException("Transaction log buffered
write is closed")`. Fenced Exception and ManagedLedger are completely unrelated.
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -374,46 +373,42 @@ private void doFlush(){
/**
* Release resources and cancel pending tasks.
*/
- @Override
- public void close() {
+ public CompletableFuture<Void> close() {
// If batch feature is disabled, there is nothing to close, so set the
stat only.
if (!batchEnabled) {
STATE_UPDATER.compareAndSet(this, State.OPEN, State.CLOSED);
- return;
+ return CompletableFuture.completedFuture(null);
}
// If other thread already called "close()", so do nothing.
if (!STATE_UPDATER.compareAndSet(this, State.OPEN, State.CLOSING)){
- return;
+ return CompletableFuture.completedFuture(null);
}
+ CompletableFuture closeFuture = new CompletableFuture();
// Cancel pending tasks and release resources.
singleThreadExecutorForWrite.execute(() -> {
try {
if (state == State.CLOSED) {
+ closeFuture.complete(null);
return;
}
// If some requests are flushed, BK will trigger these
callbacks, and the remaining requests in should
// fail.
- failureCallbackByContextAndRecycle(flushContext,
BUFFERED_WRITER_CLOSED_EXCEPTION);
+ failureCallbackByContextAndRecycle(flushContext,
+ new
ManagedLedgerException.ManagedLedgerFencedException(
+ new Exception("Transaction log buffered write has
closed")
+ ));
// Cancel the timing task.
- if (timeout == null) {
- log.error("Cancel timeout-task that schedule at fixed rate
trig flush failure. The field-timeout"
- + " is null. managedLedger: " +
managedLedger.getName());
- } else if (timeout.isCancelled()) {
- // TODO How decisions the timer-task has been finished ?
- STATE_UPDATER.set(this, State.CLOSED);
- } else {
- if (this.timeout.cancel()) {
- STATE_UPDATER.set(this, State.CLOSED);
- } else {
- // Cancel task failure, The state will stay at CLOSING.
- log.error("Cancel timeout-task that schedule at fixed
rate trig flush failure. The state will"
- + " stay at CLOSING. managedLedger: " +
managedLedger.getName());
- }
+ if (!timeout.isCancelled()){
+ this.timeout.cancel();
}
+ STATE_UPDATER.set(this, State.CLOSED);
+ closeFuture.complete(null);
} catch (Exception e){
log.error("Close Txn log buffered writer fail", e);
+ closeFuture.completeExceptionally(e);
Review Comment:
You can remove the log line above it since you're informing about the
exception in the CompletableFuture.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##########
@@ -368,6 +369,31 @@ public MetadataStore
createConfigurationMetadataStore(PulsarMetadataEventSynchro
.build());
}
+ private static void ensureConfigIsAppropriate(ServiceConfiguration
configuration){
+ if (configuration.isTransactionPendingAckBatchedWriteEnabled()){
+ if (configuration.getTransactionPendingAckBatchedWriteMaxRecords()
< 10){
+ throw new IllegalArgumentException("Txn pending ack batched
write max records suggestion at least 10");
+ }
+ if (configuration.getTransactionPendingAckBatchedWriteMaxSize() <
1024 * 128){
Review Comment:
@codelipenghui I don't have enough context to judge - are the values
validated here make sense?
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -198,33 +223,102 @@ public void asyncAddData(T data, AddDataCallback
callback, Object ctx){
AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis(), byteBuf));
return;
}
- singleThreadExecutorForWrite.execute(() -> internalAsyncAddData(data,
callback, ctx));
+ singleThreadExecutorForWrite.execute(() -> {
+ internalAsyncAddData(data, callback, ctx);
+ });
}
+ /**
+ * Append data to queue, if reach {@link #batchedWriteMaxRecords} or
{@link #batchedWriteMaxSize}, do flush. And if
+ * accept a request that {@param data} is too large (larger than {@link
#batchedWriteMaxSize}), then two flushes
+ * are executed:
+ * 1. Write the data cached in the queue to BK.
+ * 2. Direct write the large data to BK, this flush event will not
record to Metrics.
+ * This ensures the sequential nature of multiple writes to BK.
+ */
private void internalAsyncAddData(T data, AddDataCallback callback, Object
ctx){
- if (state == State.CLOSING || state == State.CLOSED){
- callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
+ // Avoid missing callback, do failed callback when error occur before
add data to the array.
+ boolean shouldCompensateCallBackWhenWriteFail = false;
+ try {
+ if (state == State.CLOSING || state == State.CLOSED){
+ callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
+ return;
+ }
+ int dataLength = dataSerializer.getSerializedSize(data);
+ if (dataLength >= batchedWriteMaxSize){
+ trigFlushByLargeSingleData();
+ ByteBuf byteBuf = dataSerializer.serialize(data);
+ shouldCompensateCallBackWhenWriteFail = false;
+ managedLedger.asyncAddEntry(byteBuf,
DisabledBatchCallback.INSTANCE,
+ AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis(), byteBuf));
+ return;
+ }
+ dataArray.add(data);
+ flushContext.addCallback(callback, ctx);
+ bytesSize += dataLength;
+ shouldCompensateCallBackWhenWriteFail = false;
+ trigFlushIfReachMaxRecordsOrMaxSize();
+ } catch (Exception e){
+ if (shouldCompensateCallBackWhenWriteFail){
+ log.error("Failed to add data asynchronously, and do failed
callback when error occur before add"
+ + " data to the array.", e);
+ callback.addFailed(new ManagedLedgerInterceptException(e),
ctx);
Review Comment:
I'm looking at this now and I'm thinking the following: The code that called
BufferedWriter, asked you to add a record to the buffer, right? It also
provided you with a callback so you can notify it whether it has failed or
succeeded. Inside this code:
```java
} catch (Exception e){
if (shouldCompensateCallBackWhenWriteFail){
callback.addFailed(new ManagedLedgerInterceptException(e),
ctx);
} else {
log.error("Failed to add data asynchronously", e);
}
}
```
I read and re-read the code and the way it's written is super hard to think
exactly what will happen, at every scenario. maybe one of the flush methods,
maybe it's the doFlush - you have no idea in what state it failed, and whether
you should call the callback or not.
My suggestion is to avoid that boolean at all.
Perhaps it's best to wrap single sections with try catch and act accordingly.
At the end you have:
- add record
- flush if needed
If add record failed, you call the callback.
the flush needs to be self contained with try catch, and inside you know
exactly when something fail and you can call the callbacks of each record.
I would only leave the outer try catch in the internal-add to log error
since it simply should never happen, but if it does (like OOME) you know where
it failed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]