This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb Author: Daniel Kulp <dk...@apache.org> AuthorDate: Wed Apr 28 14:16:31 2021 -0400 [BEAM-12247] Reduce memory/string creations in InMemoryTimerInternals --- .../beam/runners/core/InMemoryTimerInternals.java | 46 ++++++++-------------- 1 file changed, 16 insertions(+), 30 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java index 8be9081..d0b3bed 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java @@ -63,6 +63,9 @@ public class InMemoryTimerInternals implements TimerInternals { /** Current synchronized processing time. */ private Instant synchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; + /** Class.getSimpleName() cached to avoid allocations for tracing. */ + private static final String SIMPLE_NAME = InMemoryTimerInternals.class.getSimpleName(); + @Override public @Nullable Instant currentOutputWatermarkTime() { return outputWatermarkTime; @@ -125,17 +128,12 @@ public class InMemoryTimerInternals implements TimerInternals { @Deprecated @Override public void setTimer(TimerData timerData) { - WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), timerData); + WindowTracing.trace("{}.setTimer: {}", SIMPLE_NAME, timerData); - @Nullable - TimerData existing = - existingTimers.get( - timerData.getNamespace(), timerData.getTimerId() + '+' + timerData.getTimerFamilyId()); + @Nullable String colKey = timerData.getTimerId() + '+' + timerData.getTimerFamilyId(); + TimerData existing = existingTimers.get(timerData.getNamespace(), colKey); if (existing == null) { - existingTimers.put( - timerData.getNamespace(), - timerData.getTimerId() + '+' + timerData.getTimerFamilyId(), - timerData); + existingTimers.put(timerData.getNamespace(), colKey, timerData); timersForDomain(timerData.getDomain()).add(timerData); } else { checkArgument( @@ -149,10 +147,7 @@ public class InMemoryTimerInternals implements TimerInternals { NavigableSet<TimerData> timers = timersForDomain(timerData.getDomain()); timers.remove(existing); timers.add(timerData); - existingTimers.put( - timerData.getNamespace(), - timerData.getTimerId() + '+' + timerData.getTimerFamilyId(), - timerData); + existingTimers.put(timerData.getNamespace(), colKey, timerData); } } } @@ -216,7 +211,7 @@ public class InMemoryTimerInternals implements TimerInternals { newInputWatermark); WindowTracing.trace( "{}.advanceInputWatermark: from {} to {}", - getClass().getSimpleName(), + SIMPLE_NAME, inputWatermarkTime, newInputWatermark); inputWatermarkTime = newInputWatermark; @@ -229,7 +224,7 @@ public class InMemoryTimerInternals implements TimerInternals { if (newOutputWatermark.isAfter(inputWatermarkTime)) { WindowTracing.trace( "{}.advanceOutputWatermark: clipping output watermark from {} to {}", - getClass().getSimpleName(), + SIMPLE_NAME, newOutputWatermark, inputWatermarkTime); adjustedOutputWatermark = inputWatermarkTime; @@ -244,7 +239,7 @@ public class InMemoryTimerInternals implements TimerInternals { adjustedOutputWatermark); WindowTracing.trace( "{}.advanceOutputWatermark: from {} to {}", - getClass().getSimpleName(), + SIMPLE_NAME, outputWatermarkTime, adjustedOutputWatermark); outputWatermarkTime = adjustedOutputWatermark; @@ -259,10 +254,7 @@ public class InMemoryTimerInternals implements TimerInternals { processingTime, newProcessingTime); WindowTracing.trace( - "{}.advanceProcessingTime: from {} to {}", - getClass().getSimpleName(), - processingTime, - newProcessingTime); + "{}.advanceProcessingTime: from {} to {}", SIMPLE_NAME, processingTime, newProcessingTime); processingTime = newProcessingTime; } @@ -277,7 +269,7 @@ public class InMemoryTimerInternals implements TimerInternals { newSynchronizedProcessingTime); WindowTracing.trace( "{}.advanceProcessingTime: from {} to {}", - getClass().getSimpleName(), + SIMPLE_NAME, synchronizedProcessingTime, newSynchronizedProcessingTime); synchronizedProcessingTime = newSynchronizedProcessingTime; @@ -288,10 +280,7 @@ public class InMemoryTimerInternals implements TimerInternals { TimerData timer = removeNextTimer(inputWatermarkTime, TimeDomain.EVENT_TIME); if (timer != null) { WindowTracing.trace( - "{}.removeNextEventTimer: firing {} at {}", - getClass().getSimpleName(), - timer, - inputWatermarkTime); + "{}.removeNextEventTimer: firing {} at {}", SIMPLE_NAME, timer, inputWatermarkTime); } return timer; } @@ -301,10 +290,7 @@ public class InMemoryTimerInternals implements TimerInternals { TimerData timer = removeNextTimer(processingTime, TimeDomain.PROCESSING_TIME); if (timer != null) { WindowTracing.trace( - "{}.removeNextProcessingTimer: firing {} at {}", - getClass().getSimpleName(), - timer, - processingTime); + "{}.removeNextProcessingTimer: firing {} at {}", SIMPLE_NAME, timer, processingTime); } return timer; } @@ -316,7 +302,7 @@ public class InMemoryTimerInternals implements TimerInternals { if (timer != null) { WindowTracing.trace( "{}.removeNextSynchronizedProcessingTimer: firing {} at {}", - getClass().getSimpleName(), + SIMPLE_NAME, timer, synchronizedProcessingTime); }