Github user maryannxue commented on a diff in the pull request: https://github.com/apache/phoenix/pull/298#discussion_r183130270 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java --- @@ -77,57 +153,164 @@ public MemoryManager getMemoryManager() { return memoryManager; } - private Cache<ImmutableBytesPtr,Closeable> getServerCaches() { + private Cache<ImmutableBytesPtr,CacheEntry> getServerCaches() { /* Delay creation of this map until it's needed */ if (serverCaches == null) { synchronized(this) { if (serverCaches == null) { - serverCaches = CacheBuilder.newBuilder() - .expireAfterAccess(maxTimeToLiveMs, TimeUnit.MILLISECONDS) - .ticker(getTicker()) - .removalListener(new RemovalListener<ImmutableBytesPtr, Closeable>(){ - @Override - public void onRemoval(RemovalNotification<ImmutableBytesPtr, Closeable> notification) { - Closeables.closeAllQuietly(Collections.singletonList(notification.getValue())); - } - }) - .build(); + serverCaches = buildCache(maxTimeToLiveMs, false); } } } return serverCaches; } + + private Cache<ImmutableBytesPtr,CacheEntry> getPersistentServerCaches() { + /* Delay creation of this map until it's needed */ + if (persistentServerCaches == null) { + synchronized(this) { + if (persistentServerCaches == null) { + persistentServerCaches = buildCache(maxPersistenceTimeToLiveMs, true); + } + } + } + return persistentServerCaches; + } + + private Cache<ImmutableBytesPtr, CacheEntry> buildCache(final int ttl, final boolean isPersistent) { + return CacheBuilder.newBuilder() + .expireAfterAccess(ttl, TimeUnit.MILLISECONDS) + .ticker(getTicker()) + .removalListener(new RemovalListener<ImmutableBytesPtr, CacheEntry>(){ + @Override + public void onRemoval(RemovalNotification<ImmutableBytesPtr, CacheEntry> notification) { + if (isPersistent || !notification.getValue().getUsePersistentCache()) { + Closeables.closeAllQuietly(Collections.singletonList(notification.getValue())); + } + } + }) + .build(); + } - @Override + private void evictInactiveEntries(long bytesNeeded) { + CacheEntry[] entries = getPersistentServerCaches().asMap().values().toArray(new CacheEntry[]{}); + Arrays.sort(entries); + long available = this.getMemoryManager().getAvailableMemory(); + for (int i = 0; i < entries.length && available < bytesNeeded; i++) { + CacheEntry entry = entries[i]; + if (!entry.isLive()) { + getServerCaches().invalidate(entry.getCacheId()); + getPersistentServerCaches().invalidate(entry.getCacheId()); + available = this.getMemoryManager().getAvailableMemory(); + } + } + } + + private CacheEntry maybeGet(ImmutableBytesPtr cacheId) { + maybePromote(cacheId); + CacheEntry entry = getServerCaches().getIfPresent(cacheId); + return entry; + } + + private void maybePromote(ImmutableBytesPtr cacheId) { + CacheEntry entry = getPersistentServerCaches().getIfPresent(cacheId); + if (entry == null) { + return; + } + getServerCaches().put(cacheId, entry); + } + + private void maybeDemote(ImmutableBytesPtr cacheId) { + CacheEntry entry = getServerCaches().getIfPresent(cacheId); + if (entry == null) { + return; + } + entry.decrementLiveQueryCount(); + if (!entry.isLive()) { + getServerCaches().invalidate(cacheId); + } + } + + public void debugPrintCaches() { + System.out.println("Live cache:" + getServerCaches()); + for (ImmutableBytesPtr key : getServerCaches().asMap().keySet()) { + System.out.println("- " + Hex.encodeHexString(key.get()) + + " -> " + getServerCaches().getIfPresent(key).size + + " lq:" + getServerCaches().getIfPresent(key).liveQueriesCount + + " " + Hex.encodeHexString(getServerCaches().getIfPresent(key).cachePtr.get())); + } + System.out.println("Persistent cache:" + getPersistentServerCaches()); + for (ImmutableBytesPtr key : getPersistentServerCaches().asMap().keySet()) { + System.out.println("- " + Hex.encodeHexString(key.get()) + + " -> " + getPersistentServerCaches().getIfPresent(key).size + + " " + Hex.encodeHexString(getPersistentServerCaches().getIfPresent(key).cachePtr.get())); + } + } + + @Override public Closeable getServerCache(ImmutableBytesPtr cacheId) { getServerCaches().cleanUp(); - return getServerCaches().getIfPresent(cacheId); + CacheEntry entry = maybeGet(cacheId); + if (entry == null) { + return null; + } + return entry.closeable; } @Override - public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer, int clientVersion) throws SQLException { + public boolean checkServerCache(ImmutableBytesPtr cacheId, boolean shouldIncrementLiveQueryCount) { getServerCaches().cleanUp(); - MemoryChunk chunk = this.getMemoryManager().allocate(cachePtr.getLength() + txState.length); + CacheEntry entry = maybeGet(cacheId); + if (entry != null && shouldIncrementLiveQueryCount) { + entry.incrementLiveQueryCount(); + } + return entry != null; + } + + @Override + public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer, boolean usePersistentCache, int clientVersion) throws SQLException { + getServerCaches().cleanUp(); + long available = this.getMemoryManager().getAvailableMemory(); + int size = cachePtr.getLength() + txState.length; + if (size > available) { + evictInactiveEntries(Math.max(size - available + EVICTION_MARGIN_BYTES, EVICTION_MARGIN_BYTES)); + } + MemoryChunk chunk = this.getMemoryManager().allocate(size); boolean success = false; try { - Closeable element = cacheFactory.newCache(cachePtr, txState, chunk, useProtoForIndexMaintainer, clientVersion); - getServerCaches().put(cacheId, element); + CacheEntry entry; + synchronized(this) { + entry = maybeGet(cacheId); + if (entry == null) { + entry = new CacheEntry( + cacheId, cachePtr, cacheFactory, txState, chunk, + usePersistentCache, useProtoForIndexMaintainer, + clientVersion); + getServerCaches().put(cacheId, entry); + available = this.getMemoryManager().getAvailableMemory(); + if (usePersistentCache) { + getPersistentServerCaches().put(cacheId, entry); + } + } + entry.incrementLiveQueryCount(); + } success = true; - return element; + return entry; } finally { if (!success) { Closeables.closeAllQuietly(Collections.singletonList(chunk)); } - } + } } - + @Override - public void removeServerCache(ImmutableBytesPtr cacheId) { - getServerCaches().invalidate(cacheId); + synchronized public void removeServerCache(ImmutableBytesPtr cacheId) { --- End diff -- Is there any reason why we need to "demote" the cache instead of invalidating it here?
---