poorbarcode commented on code in PR #16758:
URL: https://github.com/apache/pulsar/pull/16758#discussion_r939677905
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -259,64 +336,24 @@ private void internalAsyncAddData(T data, AddDataCallback
callback, Object ctx){
}
- /**
- * Trigger write to bookie once, If the conditions are not met, nothing
will be done.
- */
- public void trigFlush(final boolean force, boolean byScheduleThreads){
- singleThreadExecutorForWrite.execute(() -> doTrigFlush(force,
byScheduleThreads));
- }
-
- private void doTrigFlush(boolean force, boolean byScheduleThreads){
- try {
- if (flushContext.asyncAddArgsList.isEmpty()) {
- return;
- }
- if (force) {
- doFlush();
- return;
- }
- if (byScheduleThreads) {
- doFlush();
- return;
- }
- AsyncAddArgs firstAsyncAddArgs =
flushContext.asyncAddArgsList.get(0);
- if (System.currentTimeMillis() - firstAsyncAddArgs.addedTime >=
batchedWriteMaxDelayInMillis) {
- doFlush();
- return;
- }
- if (this.flushContext.asyncAddArgsList.size() >=
batchedWriteMaxRecords) {
- doFlush();
- return;
- }
- if (this.bytesSize >= batchedWriteMaxSize) {
- doFlush();
- }
- } finally {
- if (byScheduleThreads) {
- nextTimingTrigger();
- }
- }
- }
-
private void doFlush(){
- // Combine data.
- ByteBuf prefix = PulsarByteBufAllocator.DEFAULT.buffer(4);
- prefix.writeShort(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER);
- prefix.writeShort(BATCHED_ENTRY_DATA_PREFIX_VERSION);
- ByteBuf actualContent = this.dataSerializer.serialize(this.dataArray);
- ByteBuf pairByteBuf = Unpooled.wrappedUnmodifiableBuffer(prefix,
actualContent);
+ // Combine data cached by flushContext, and write to BK.
+ ByteBuf prefixByteFuf = PulsarByteBufAllocator.DEFAULT.buffer(4);
+ prefixByteFuf.writeShort(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER);
+ prefixByteFuf.writeShort(BATCHED_ENTRY_DATA_PREFIX_VERSION);
+ ByteBuf contentByteBuf = dataSerializer.serialize(dataArray);
+ ByteBuf wholeByteBuf =
Unpooled.wrappedUnmodifiableBuffer(prefixByteFuf, contentByteBuf);
Review Comment:
> Why Unpooled and not PulsarByteBufAllocator?
Method `Unpooled.wrappedUnmodifiableBuffer` returns just a value object,
this is much lighter than `Pooledxxx.compositeBuffer' returns
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]