poorbarcode commented on code in PR #16758:
URL: https://github.com/apache/pulsar/pull/16758#discussion_r948089858


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -259,138 +353,65 @@ private void internalAsyncAddData(T data, 
AddDataCallback callback, Object ctx){
 
     }
 
-    /**
-     * Trigger write to bookie once, If the conditions are not met, nothing 
will be done.
-     */
-    public void trigFlush(final boolean force, boolean byScheduleThreads){
-        singleThreadExecutorForWrite.execute(() -> doTrigFlush(force, 
byScheduleThreads));
-    }
-
-    private void doTrigFlush(boolean force, boolean byScheduleThreads){
-        try {
-            if (flushContext.asyncAddArgsList.isEmpty()) {
-                return;
-            }
-            if (force) {
-                doFlush();
-                return;
-            }
-            if (byScheduleThreads) {
-                doFlush();
-                return;
-            }
-            AsyncAddArgs firstAsyncAddArgs = 
flushContext.asyncAddArgsList.get(0);
-            if (System.currentTimeMillis() - firstAsyncAddArgs.addedTime >= 
batchedWriteMaxDelayInMillis) {
-                doFlush();
-                return;
-            }
-            if (this.flushContext.asyncAddArgsList.size() >= 
batchedWriteMaxRecords) {
-                doFlush();
-                return;
-            }
-            if (this.bytesSize >= batchedWriteMaxSize) {
-                doFlush();
-            }
-        } finally {
-            if (byScheduleThreads) {
-                nextTimingTrigger();
-            }
-        }
-    }
-
     private void doFlush(){
-        // Combine data.
-        ByteBuf prefix = PulsarByteBufAllocator.DEFAULT.buffer(4);
-        prefix.writeShort(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER);
-        prefix.writeShort(BATCHED_ENTRY_DATA_PREFIX_VERSION);
-        ByteBuf actualContent = this.dataSerializer.serialize(this.dataArray);
-        ByteBuf pairByteBuf = Unpooled.wrappedUnmodifiableBuffer(prefix, 
actualContent);
-        // We need to release this pairByteBuf after Managed ledger async add 
callback. Just holds by FlushContext.
-        this.flushContext.byteBuf = pairByteBuf;
-        // Flush.
+        // Combine data cached by flushContext, and write to BK.
+        ByteBuf prefixByteBuf = PulsarByteBufAllocator.DEFAULT.buffer(4);
+        prefixByteBuf.writeShort(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER);
+        prefixByteBuf.writeShort(BATCHED_ENTRY_DATA_PREFIX_VERSION);
+        ByteBuf contentByteBuf = dataSerializer.serialize(dataArray);
+        ByteBuf wholeByteBuf = 
Unpooled.wrappedUnmodifiableBuffer(prefixByteBuf, contentByteBuf);
+        flushContext.byteBuf = wholeByteBuf;
         if (State.CLOSING == state || State.CLOSED == state){
             failureCallbackByContextAndRecycle(flushContext, 
BUFFERED_WRITER_CLOSED_EXCEPTION);
         } else {
-            managedLedger.asyncAddEntry(pairByteBuf, this, this.flushContext);
-        }
-        // Clear buffers.ok
-        this.dataArray.clear();
-        this.flushContext = FlushContext.newInstance();
-        this.bytesSize = 0;
-    }
-
-    /**
-     * see {@link AsyncCallbacks.AddEntryCallback#addComplete(Position, 
ByteBuf, Object)}.
-     */
-    @Override
-    public void addComplete(Position position, ByteBuf entryData, Object ctx) {
-        final FlushContext flushContext = (FlushContext) ctx;
-        try {
-            final int batchSize = flushContext.asyncAddArgsList.size();
-            for (int batchIndex = 0; batchIndex < batchSize; batchIndex++) {
-                final AsyncAddArgs asyncAddArgs = 
flushContext.asyncAddArgsList.get(batchIndex);
-                final TxnBatchedPositionImpl txnBatchedPosition = new 
TxnBatchedPositionImpl(position, batchSize,
-                        batchIndex);
-                // Because this task already running at ordered task, so just 
"run".
-                try {
-                    asyncAddArgs.callback.addComplete(txnBatchedPosition, 
asyncAddArgs.ctx);
-                } catch (Exception e){
-                    log.error("After writing to the transaction batched log 
complete, the callback failed."
-                            + " managedLedger: " + managedLedger.getName(), e);
-                }
-            }
-        } finally {
-            flushContext.recycle();
+            managedLedger.asyncAddEntry(wholeByteBuf, 
bookKeeperBatchedWriteCallback, flushContext);
         }
+        dataArray.clear();
+        flushContext = FlushContext.newInstance();
+        bytesSize = 0;
     }
 
     /**
-     * see {@link 
AsyncCallbacks.AddEntryCallback#addFailed(ManagedLedgerException, Object)}.
-     */
-    @Override
-    public void addFailed(ManagedLedgerException exception, Object ctx) {
-        final FlushContext flushContext = (FlushContext) ctx;
-        failureCallbackByContextAndRecycle(flushContext, exception);
-    }
-
-    /**
-     * Cancel pending tasks and release resources.
+     * Release resources and cancel pending tasks.
      */
     @Override
     public void close() {
-        // If disabled batch feature, there is no closing state.
+        // If batch feature is disabled, there is nothing to close, so set the 
stat only.
         if (!batchEnabled) {
             STATE_UPDATER.compareAndSet(this, State.OPEN, State.CLOSED);
             return;
         }
-        // Prevent the reentrant.
+        // If other thread already called "close()", so do nothing.
         if (!STATE_UPDATER.compareAndSet(this, State.OPEN, State.CLOSING)){
-            // Other thread also calling "close()".
             return;
         }
         // Cancel pending tasks and release resources.
         singleThreadExecutorForWrite.execute(() -> {
-            if (state == State.CLOSED){
-                return;
-            }
-            // Failure callback to pending request.
-            // If some request has been flushed, Bookie triggers the callback.
-            failureCallbackByContextAndRecycle(this.flushContext, 
BUFFERED_WRITER_CLOSED_EXCEPTION);
-            // Cancel task that schedule at fixed rate trig flush.
-            if (timeout == null){
-                log.error("Cancel timeout-task that schedule at fixed rate 
trig flush failure. The field-timeout"
-                        + " is null. managedLedger: " + 
managedLedger.getName());
-            } else if (timeout.isCancelled()){
-                // TODO How decisions the timer-task has been finished ?
-                this.state = State.CLOSED;
-            } else {
-                if (this.timeout.cancel()) {
-                    this.state = State.CLOSED;
+            try {
+                if (state == State.CLOSED) {
+                    return;
+                }
+                // If some requests are flushed, BK will trigger these 
callbacks, and the remaining requests in should
+                // fail.
+                failureCallbackByContextAndRecycle(flushContext, 
BUFFERED_WRITER_CLOSED_EXCEPTION);
+                // Cancel the timing task.
+                if (timeout == null) {
+                    log.error("Cancel timeout-task that schedule at fixed rate 
trig flush failure. The field-timeout"
+                            + " is null. managedLedger: " + 
managedLedger.getName());
+                } else if (timeout.isCancelled()) {
+                    // TODO How decisions the timer-task has been finished ?
+                    STATE_UPDATER.set(this, State.CLOSED);
                 } else {
-                    // Cancel task failure, The state will stay at CLOSING.
-                    log.error("Cancel timeout-task that schedule at fixed rate 
trig flush failure. The state will"
-                            + " stay at CLOSING. managedLedger: " + 
managedLedger.getName());
+                    if (this.timeout.cancel()) {
+                        STATE_UPDATER.set(this, State.CLOSED);
+                    } else {
+                        // Cancel task failure, The state will stay at CLOSING.
+                        log.error("Cancel timeout-task that schedule at fixed 
rate trig flush failure. The state will"
+                                + " stay at CLOSING. managedLedger: " + 
managedLedger.getName());
+                    }
                 }
+            } catch (Exception e){
+                log.error("Close Txn log buffered writer fail", e);

Review Comment:
   Good idea, already fixed



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterMetricsStats.java:
##########
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Histogram;
+import java.io.Closeable;
+import lombok.Getter;
+
+/***
+ * Describes the working status of the {@link TxnLogBufferedWriter}, helps 
users tune the thresholds of
+ * {@link TxnLogBufferedWriter} for best performance.
+ * Note-1: When batch feature is turned off, no data is logged at this. In 
this scenario,users can see the
+ *   {@link org.apache.bookkeeper.mledger.ManagedLedgerMXBean}.
+ * Note-2: Even if enable batch feature. A batch has numerous triggers. The 
metrics in this class count each type of
+ *   trigger to allow you to diagnose what mostly causing a batch flush. The 
metric also includes a histogram for delay
+ *   of batch since 1st record entered, the size of batch in bytes number of 
records in batch. This will help you to
+ *   tune the parameters that control some of the batch flush triggers: 
maxDelay, maxRecords, maxSize.
+ *   Note that the 4th trigger - a single record larger than batch size - 
triggers a flush of the current batch, but
+ *   the big record itself is not written in batch hence is not included in 
the batch metrics written above (batch
+ *   size, batch delay, etc). The trigger is of course counted as other 
trigger types.
+ */
+public class TxnLogBufferedWriterMetricsStats implements Closeable {
+
+    static final double[] RECORD_COUNT_PER_ENTRY_BUCKETS = {10, 50, 100, 200, 
500, 1_000};
+
+    static final double[] BYTES_SIZE_PER_ENTRY_BUCKETS = {128, 512, 1_024, 
2_048, 4_096, 16_384,
+            102_400, 1_232_896};
+
+    static final double[] MAX_DELAY_TIME_BUCKETS = {1, 5, 10};
+
+    @Getter
+    private final String metricsPrefix;
+
+    private final String[] labelNames;
+
+    private final String[] labelValues;
+
+    /** Count of records in per transaction log batch. **/
+    private final Histogram recordsPerBatchMetric;
+    private final Histogram.Child recordsPerBatchHistogram;
+
+    /** Bytes size per transaction log batch. **/
+    private final Histogram batchSizeBytesMetric;
+    private final Histogram.Child batchSizeBytesHistogram;
+
+    /** The time of the oldest transaction log spent in the buffer before 
being sent. **/
+    private final Histogram oldestRecordInBatchDelayTimeSecondsMetric;
+    private final Histogram.Child oldestRecordInBatchDelayTimeSecondsHistogram;
+
+    /** The count of the triggering transaction log batch flush actions by 
"batchedWriteMaxRecords". **/
+    private final Counter batchFlushTriggeredByMaxRecordsMetric;
+    private final Counter.Child batchFlushTriggeredByMaxRecordsCounter;
+
+    /** The count of the triggering transaction log batch flush actions by 
"batchedWriteMaxSize". **/
+    private final Counter batchFlushTriggeredByMaxSizeMetric;
+    private final Counter.Child batchFlushTriggeredByMaxSizeCounter;
+
+    /** The count of the triggering transaction log batch flush actions by 
"batchedWriteMaxDelayInMillis". **/
+    private final Counter batchFlushTriggeredByMaxDelayMetric;
+    private final Counter.Child batchFlushTriggeredByMaxDelayCounter;
+
+    /**
+     * If {@link TxnLogBufferedWriter#asyncAddData(Object, 
TxnLogBufferedWriter.AddDataCallback, Object)} accept a
+     * request that param-data is too large (larger than 
"batchedWriteMaxSize"), then two flushes are executed:
+     *    1. Write the data cached in the queue to BK.
+     *    2. Direct write the large data to BK.
+     * This ensures the sequential nature of multiple writes to BK.
+     */
+    private final Counter batchFlushTriggeredByLargeSingleDataMetric;
+    private final Counter.Child batchFlushTriggeredByLargeSingleDataCounter;
+
+    public void close() {

Review Comment:
   Already fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to