[
https://issues.apache.org/jira/browse/BEAM-13015?focusedWorklogId=709273&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-709273
]
ASF GitHub Bot logged work on BEAM-13015:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 14/Jan/22 19:56
Start Date: 14/Jan/22 19:56
Worklog Time Spent: 10m
Work Description: lukecwik commented on a change in pull request #16495:
URL: https://github.com/apache/beam/pull/16495#discussion_r785134812
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -57,92 +65,58 @@ public static long weigh(Object o) {
/** An eviction listener that reduces the size of entries that are {@link
Shrinkable}. */
@VisibleForTesting
- static class ShrinkOnEviction implements RemovalListener<CompositeKey,
Object> {
+ static class ShrinkOnEviction implements RemovalListener<CompositeKey,
WeightedValue<Object>> {
private final
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache<
- CompositeKey, Object>
+ CompositeKey, WeightedValue<Object>>
cache;
+ private final LongAdder weightInBytes;
- ShrinkOnEviction(CacheBuilder<Object, Object> cacheBuilder) {
+ ShrinkOnEviction(
+ CacheBuilder<Object, WeightedValue<Object>> cacheBuilder, LongAdder
weightInBytes) {
this.cache = cacheBuilder.removalListener(this).build();
+ this.weightInBytes = weightInBytes;
}
public
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache<
- CompositeKey, Object>
+ CompositeKey, WeightedValue<Object>>
getCache() {
return cache;
}
@Override
- public void onRemoval(RemovalNotification<CompositeKey, Object>
removalNotification) {
+ public void onRemoval(
+ RemovalNotification<CompositeKey, WeightedValue<Object>>
removalNotification) {
+ weightInBytes.add(-removalNotification.getValue().getWeight());
if (removalNotification.wasEvicted()) {
- if (!(removalNotification.getValue() instanceof Cache.Shrinkable)) {
+ if (!(removalNotification.getValue().getValue() instanceof
Cache.Shrinkable)) {
return;
}
- Object updatedEntry = ((Shrinkable<?>)
removalNotification.getValue()).shrink();
+ Object updatedEntry = ((Shrinkable<?>)
removalNotification.getValue().getValue()).shrink();
if (updatedEntry != null) {
- cache.put(removalNotification.getKey(), updatedEntry);
+ cache.put(removalNotification.getKey(),
addWeightedValue(updatedEntry, weightInBytes));
}
}
}
}
/** A cache that never stores any values. */
public static <K, V> Cache<K, V> noop() {
- // We specifically use Guava cache since it allows for recursive
computeIfAbsent calls
- // preventing deadlock from occurring when a loading function mutates the
underlying cache
- return (Cache<K, V>)
- forCache(new
ShrinkOnEviction(CacheBuilder.newBuilder().maximumSize(0)).getCache());
+ return forMaximumBytes(0L);
}
/** A cache that never evicts any values. */
public static <K, V> Cache<K, V> eternal() {
- // We specifically use Guava cache since it allows for recursive
computeIfAbsent calls
- // preventing deadlock from occurring when a loading function mutates the
underlying cache
- return (Cache<K, V>)
- forCache(
- new
ShrinkOnEviction(CacheBuilder.newBuilder().maximumSize(Long.MAX_VALUE)).getCache());
+ return forMaximumBytes(Long.MAX_VALUE);
}
/**
* Uses the specified {@link PipelineOptions} to configure and return a
cache instance based upon
* parameters within {@link SdkHarnessOptions}.
*/
public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) {
- // We specifically use Guava cache since it allows for recursive
computeIfAbsent calls
- // preventing deadlock from occurring when a loading function mutates the
underlying cache
- return (Cache<K, V>)
- forCache(
- new ShrinkOnEviction(
- CacheBuilder.newBuilder()
- .maximumWeight(
-
options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb()
- * 1024L
- * 1024L
- / WEIGHT_RATIO)
- .weigher(
- new Weigher<Object, Object>() {
-
- @Override
- public int weigh(Object key, Object value) {
- long size;
- if (value instanceof Weighted) {
- size = Caches.weigh(key) + ((Weighted)
value).getWeight();
- } else {
- size = Caches.weigh(key) +
Caches.weigh(value);
- }
- size = size / WEIGHT_RATIO + 1;
- if (size >= Integer.MAX_VALUE) {
- LOG.warn(
- "Entry with size {} MiBs inserted into
the cache. This is larger than the maximum individual entry size of {} MiBs.
The cache will under report its memory usage by the difference. This may lead
to OutOfMemoryErrors.",
- (size / 1048576L) + 1,
- 2 * WEIGHT_RATIO * 1024);
- return Integer.MAX_VALUE;
- }
- return (int) size;
- }
- }))
- .getCache());
+ return forMaximumBytes(
+ options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() *
1048576L);
Review comment:
swapped to bit shift.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 709273)
Time Spent: 44h 20m (was: 44h 10m)
> Optimize Java SDK harness
> -------------------------
>
> Key: BEAM-13015
> URL: https://issues.apache.org/jira/browse/BEAM-13015
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-harness
> Reporter: Luke Cwik
> Assignee: Luke Cwik
> Priority: P2
> Time Spent: 44h 20m
> Remaining Estimate: 0h
>
> Use profiling tools to remove bundle processing overhead in the SDK harness.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)