This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1a9f28b0ef889b02be0ba8e5e93158f9322b88c7 Author: liudezhi <[email protected]> AuthorDate: Mon Dec 27 11:01:40 2021 +0800 Optimize the debug log that affects performance, and unify the style (#13498) (cherry picked from commit fb4e2c8c9075506188c79ecf8fae96883a56d948) --- .../apache/pulsar/broker/service/AbstractReplicator.java | 6 ++++-- .../apache/pulsar/functions/windowing/WindowManager.java | 13 +++++++++---- .../windowing/triggers/WatermarkTimeTriggerPolicy.java | 14 ++++++++++---- .../pulsar/functions/worker/FunctionMetaDataManager.java | 5 +++-- .../org/apache/pulsar/sql/presto/PulsarRecordCursor.java | 4 +++- 5 files changed, 29 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index a404f10..0af749c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -182,8 +182,10 @@ public abstract class AbstractReplicator { if (failIfHasBacklog && getNumberOfEntriesInBacklog() > 0) { CompletableFuture<Void> disconnectFuture = new CompletableFuture<>(); disconnectFuture.completeExceptionally(new TopicBusyException("Cannot close a replicator with backlog")); - log.debug("[{}][{} -> {}] Replicator disconnect failed since topic has backlog", topicName, localCluster, - remoteCluster); + if (log.isDebugEnabled()) { + log.debug("[{}][{} -> {}] Replicator disconnect failed since topic has backlog", topicName, localCluster + , remoteCluster); + } return disconnectFuture; } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java index 06e0b88..9f7b5bb 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java @@ -105,7 +105,9 @@ public class WindowManager<T> implements TriggerHandler { public void add(Event<T> windowEvent) { // watermark events are not added to the queue. if (windowEvent.isWatermark()) { - log.debug(String.format("Got watermark event with ts %d", windowEvent.getTimestamp())); + if (log.isDebugEnabled()) { + log.debug("Got watermark event with ts {}", windowEvent.getTimestamp()); + } } else { queue.add(windowEvent); } @@ -145,8 +147,9 @@ public class WindowManager<T> implements TriggerHandler { prevWindowEvents.clear(); if (!events.isEmpty()) { prevWindowEvents.addAll(windowEvents); - log.debug(String.format("invoking windowLifecycleListener onActivation, [%d] events in " - + "window.", events.size())); + if (log.isDebugEnabled()) { + log.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size()); + } windowLifecycleListener.onActivation(events, newEvents, expired, evictionPolicy.getContext().getReferenceTime()); } else { @@ -216,7 +219,9 @@ public class WindowManager<T> implements TriggerHandler { lock.unlock(); } eventsSinceLastExpiry.set(0); - log.debug(String.format("[%d] events expired from window.", eventsToExpire.size())); + if (log.isDebugEnabled()) { + log.debug("[{}] events expired from window.", eventsToExpire.size()); + } if (!eventsToExpire.isEmpty()) { log.debug("invoking windowLifecycleListener.onExpiry"); windowLifecycleListener.onExpiry(eventsToExpire); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/triggers/WatermarkTimeTriggerPolicy.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/triggers/WatermarkTimeTriggerPolicy.java index 22722bf..a2bfca6 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/triggers/WatermarkTimeTriggerPolicy.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/triggers/WatermarkTimeTriggerPolicy.java @@ -80,7 +80,9 @@ public class WatermarkTimeTriggerPolicy<T> implements TriggerPolicy<T, Long> { private void handleWaterMarkEvent(Event<T> event) { long watermarkTs = event.getTimestamp(); long windowEndTs = nextWindowEndTs; - log.debug(String.format("Window end ts %d Watermark ts %d", windowEndTs, watermarkTs)); + if (log.isDebugEnabled()) { + log.debug("Window end ts {} Watermark ts {}", windowEndTs, watermarkTs); + } while (windowEndTs <= watermarkTs) { long currentCount = windowManager.getEventCount(windowEndTs); evictionPolicy.setContext(new DefaultEvictionContext(windowEndTs, currentCount)); @@ -93,10 +95,14 @@ public class WatermarkTimeTriggerPolicy<T> implements TriggerPolicy<T, Long> { * window intervals based on event ts. */ long ts = getNextAlignedWindowTs(windowEndTs, watermarkTs); - log.debug(String.format("Next aligned window end ts %d", ts)); + if (log.isDebugEnabled()) { + log.debug("Next aligned window end ts {}", ts); + } if (ts == Long.MAX_VALUE) { - log.debug(String.format("No events to process between %d and watermark ts %d", - windowEndTs, watermarkTs)); + if (log.isDebugEnabled()) { + log.debug("No events to process between {} and watermark ts {}", + windowEndTs, watermarkTs); + } break; } windowEndTs = ts; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java index eddb469..3d8c07e 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java @@ -413,8 +413,9 @@ public class FunctionMetaDataManager implements AutoCloseable { String functionName, long version) throws IllegalArgumentException { boolean needsScheduling = false; - - log.debug("Process deregister request: {}/{}/{}/{}", tenant, namespace, functionName, version); + if (log.isDebugEnabled()) { + log.debug("Process deregister request: {}/{}/{}/{}", tenant, namespace, functionName, version); + } // Check if we still have this function. Maybe already deleted by someone else if (this.containsFunctionMetaData(tenant, namespace, functionName)) { diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java index 558b87b..99be9f9 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java @@ -437,7 +437,9 @@ public class PulsarRecordCursor implements RecordCursor { @Override public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - log.debug(exception, "Failed to read entries from topic %s", topicName.toString()); + if (log.isDebugEnabled()) { + log.debug(exception, "Failed to read entries from topic %s", topicName.toString()); + } outstandingReadsRequests.incrementAndGet(); //set read latency stats for failed
