PHOENIX-2557 Track unfree memory for server-side cache
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/31a414c8 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/31a414c8 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/31a414c8 Branch: refs/heads/4.x-HBase-1.0 Commit: 31a414c84e64a1de366703cb1faa25c9e506d1a3 Parents: 139e0d7 Author: James Taylor <[email protected]> Authored: Sun Jan 3 15:51:40 2016 -0800 Committer: James Taylor <[email protected]> Committed: Mon Jan 4 09:36:30 2016 -0800 ---------------------------------------------------------------------- .../ConnectionQueryServicesTestImpl.java | 4 +- .../org/apache/phoenix/cache/GlobalCache.java | 23 ++- .../org/apache/phoenix/cache/TenantCache.java | 4 +- .../apache/phoenix/cache/TenantCacheImpl.java | 28 ++- .../coprocessor/MetaDataEndpointImpl.java | 5 +- .../coprocessor/ServerCachingEndpointImpl.java | 7 +- .../coprocessor/generated/MetaDataProtos.java | 171 +++++++++++++++---- .../phoenix/memory/GlobalMemoryManager.java | 6 + .../phoenix/query/ConnectionQueryServices.java | 2 +- .../query/ConnectionQueryServicesImpl.java | 39 +++-- .../query/ConnectionlessQueryServicesImpl.java | 9 +- .../query/DelegateConnectionQueryServices.java | 4 +- .../apache/phoenix/cache/TenantCacheTest.java | 100 +++++++++++ phoenix-protocol/src/main/MetaDataService.proto | 1 + 14 files changed, 332 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java index d941fd5..f5d7f18 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java @@ -81,7 +81,9 @@ public class ConnectionQueryServicesTestImpl extends ConnectionQueryServicesImpl Set<PhoenixConnection> connections = this.connections; this.connections = Sets.newHashSet(); SQLCloseables.closeAll(connections); - clearCache(); + long unfreedBytes = clearCache(); + // FIXME: once PHOENIX-2556 is fixed, comment this back in + // assertEquals(0,unfreedBytes); } finally { super.close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java index af5438c..7d04f5b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java @@ -22,6 +22,7 @@ import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_WAIT_MS_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_TENANT_MEMORY_PERC_ATTRIB; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -29,6 +30,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.memory.ChildMemoryManager; import org.apache.phoenix.memory.GlobalMemoryManager; @@ -36,6 +38,8 @@ import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PMetaDataEntity; import org.apache.phoenix.util.SizedUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; @@ -51,6 +55,7 @@ import com.google.common.cache.Weigher; * @since 0.1 */ public class GlobalCache extends TenantCacheImpl { + private static final Logger logger = LoggerFactory.getLogger(GlobalCache.class); private static volatile GlobalCache INSTANCE; private final Configuration config; @@ -59,8 +64,24 @@ public class GlobalCache extends TenantCacheImpl { // Cache for lastest PTable for a given Phoenix table private volatile Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache; - public void clearTenantCache() { + public long clearTenantCache() { + long unfreedBytes = getMemoryManager().getMaxMemory() - getMemoryManager().getAvailableMemory(); + if (unfreedBytes != 0 && logger.isDebugEnabled()) { + logger.debug("Found " + (getMemoryManager().getMaxMemory() - getMemoryManager().getAvailableMemory()) + " bytes not freed from global cache"); + } + removeAllServerCache(); + for (Map.Entry<ImmutableBytesWritable, TenantCache> entry : perTenantCacheMap.entrySet()) { + TenantCache cache = entry.getValue(); + long unfreedTenantBytes = cache.getMemoryManager().getMaxMemory() - cache.getMemoryManager().getAvailableMemory(); + if (unfreedTenantBytes != 0 && logger.isDebugEnabled()) { + ImmutableBytesWritable cacheId = entry.getKey(); + logger.debug("Found " + unfreedTenantBytes + " bytes not freed for tenant " + Bytes.toStringBinary(cacheId.get(), cacheId.getOffset(), cacheId.getLength())); + } + unfreedBytes += unfreedTenantBytes; + cache.removeAllServerCache(); + } perTenantCacheMap.clear(); + return unfreedBytes; } public Cache<ImmutableBytesPtr,PMetaDataEntity> getMetaDataCache() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java index c7cd58f..5c33967 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java @@ -21,7 +21,6 @@ import java.io.Closeable; import java.sql.SQLException; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; - import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.memory.MemoryManager; @@ -38,5 +37,6 @@ public interface TenantCache { MemoryManager getMemoryManager(); Closeable getServerCache(ImmutableBytesPtr cacheId); Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory) throws SQLException; - void removeServerCache(ImmutableBytesPtr cacheId) throws SQLException; + void removeServerCache(ImmutableBytesPtr cacheId); + void removeAllServerCache(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java index 6ef7a6f..658b4cc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java @@ -29,6 +29,7 @@ import org.apache.phoenix.memory.MemoryManager; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.util.Closeables; +import com.google.common.base.Ticker; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; @@ -45,11 +46,30 @@ import com.google.common.cache.RemovalNotification; public class TenantCacheImpl implements TenantCache { private final int maxTimeToLiveMs; private final MemoryManager memoryManager; + private final Ticker ticker; private volatile Cache<ImmutableBytesPtr, Closeable> serverCaches; public TenantCacheImpl(MemoryManager memoryManager, int maxTimeToLiveMs) { + this(memoryManager, maxTimeToLiveMs, Ticker.systemTicker()); + } + + public TenantCacheImpl(MemoryManager memoryManager, int maxTimeToLiveMs, Ticker ticker) { this.memoryManager = memoryManager; this.maxTimeToLiveMs = maxTimeToLiveMs; + this.ticker = ticker; + } + + public Ticker getTicker() { + return ticker; + } + + // For testing + public void cleanUp() { + synchronized(this) { + if (serverCaches != null) { + serverCaches.cleanUp(); + } + } } @Override @@ -64,6 +84,7 @@ public class TenantCacheImpl implements TenantCache { if (serverCaches == null) { serverCaches = CacheBuilder.newBuilder() .expireAfterAccess(maxTimeToLiveMs, TimeUnit.MILLISECONDS) + .ticker(getTicker()) .removalListener(new RemovalListener<ImmutableBytesPtr, Closeable>(){ @Override public void onRemoval(RemovalNotification<ImmutableBytesPtr, Closeable> notification) { @@ -99,7 +120,12 @@ public class TenantCacheImpl implements TenantCache { } @Override - public void removeServerCache(ImmutableBytesPtr cacheId) throws SQLException { + public void removeServerCache(ImmutableBytesPtr cacheId) { getServerCaches().invalidate(cacheId); } + + @Override + public void removeAllServerCache() { + getServerCaches().invalidateAll(); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index ea4a7e1..13876a0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -2667,7 +2667,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); metaDataCache.invalidateAll(); - cache.clearTenantCache(); + long unfreedBytes = cache.clearTenantCache(); + ClearCacheResponse.Builder builder = ClearCacheResponse.newBuilder(); + builder.setUnfreedBytes(unfreedBytes); + done.run(builder.build()); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java index 9f3bdb4..bf889d5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java @@ -18,7 +18,6 @@ package org.apache.phoenix.coprocessor; import java.io.IOException; -import java.sql.SQLException; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; @@ -91,11 +90,7 @@ public class ServerCachingEndpointImpl extends ServerCachingService implements C tenantId = new ImmutableBytesPtr(request.getTenantId().toByteArray()); } TenantCache tenantCache = GlobalCache.getTenantCache(this.env, tenantId); - try { - tenantCache.removeServerCache(new ImmutableBytesPtr(request.getCacheId().toByteArray())); - } catch (SQLException e) { - ProtobufUtil.setControllerException(controller, new IOException(e)); - } + tenantCache.removeServerCache(new ImmutableBytesPtr(request.getCacheId().toByteArray())); RemoveServerCacheResponse.Builder responseBuilder = RemoveServerCacheResponse.newBuilder(); responseBuilder.setReturn(true); RemoveServerCacheResponse result = responseBuilder.build(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java index a121d28..dc5726a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java @@ -8767,6 +8767,16 @@ public final class MetaDataProtos { public interface ClearCacheResponseOrBuilder extends com.google.protobuf.MessageOrBuilder { + + // optional int64 unfreedBytes = 1; + /** + * <code>optional int64 unfreedBytes = 1;</code> + */ + boolean hasUnfreedBytes(); + /** + * <code>optional int64 unfreedBytes = 1;</code> + */ + long getUnfreedBytes(); } /** * Protobuf type {@code ClearCacheResponse} @@ -8801,6 +8811,7 @@ public final class MetaDataProtos { com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { initFields(); + int mutable_bitField0_ = 0; com.google.protobuf.UnknownFieldSet.Builder unknownFields = com.google.protobuf.UnknownFieldSet.newBuilder(); try { @@ -8818,6 +8829,11 @@ public final class MetaDataProtos { } break; } + case 8: { + bitField0_ |= 0x00000001; + unfreedBytes_ = input.readInt64(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -8857,7 +8873,25 @@ public final class MetaDataProtos { return PARSER; } + private int bitField0_; + // optional int64 unfreedBytes = 1; + public static final int UNFREEDBYTES_FIELD_NUMBER = 1; + private long unfreedBytes_; + /** + * <code>optional int64 unfreedBytes = 1;</code> + */ + public boolean hasUnfreedBytes() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * <code>optional int64 unfreedBytes = 1;</code> + */ + public long getUnfreedBytes() { + return unfreedBytes_; + } + private void initFields() { + unfreedBytes_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -8871,6 +8905,9 @@ public final class MetaDataProtos { public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io.IOException { getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeInt64(1, unfreedBytes_); + } getUnknownFields().writeTo(output); } @@ -8880,6 +8917,10 @@ public final class MetaDataProtos { if (size != -1) return size; size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(1, unfreedBytes_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -8903,6 +8944,11 @@ public final class MetaDataProtos { org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse other = (org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse) obj; boolean result = true; + result = result && (hasUnfreedBytes() == other.hasUnfreedBytes()); + if (hasUnfreedBytes()) { + result = result && (getUnfreedBytes() + == other.getUnfreedBytes()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -8916,6 +8962,10 @@ public final class MetaDataProtos { } int hash = 41; hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasUnfreedBytes()) { + hash = (37 * hash) + UNFREEDBYTES_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getUnfreedBytes()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -9025,6 +9075,8 @@ public final class MetaDataProtos { public Builder clear() { super.clear(); + unfreedBytes_ = 0L; + bitField0_ = (bitField0_ & ~0x00000001); return this; } @@ -9051,6 +9103,13 @@ public final class MetaDataProtos { public org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse buildPartial() { org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse result = new org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.unfreedBytes_ = unfreedBytes_; + result.bitField0_ = to_bitField0_; onBuilt(); return result; } @@ -9066,6 +9125,9 @@ public final class MetaDataProtos { public Builder mergeFrom(org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse other) { if (other == org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse.getDefaultInstance()) return this; + if (other.hasUnfreedBytes()) { + setUnfreedBytes(other.getUnfreedBytes()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -9091,6 +9153,40 @@ public final class MetaDataProtos { } return this; } + private int bitField0_; + + // optional int64 unfreedBytes = 1; + private long unfreedBytes_ ; + /** + * <code>optional int64 unfreedBytes = 1;</code> + */ + public boolean hasUnfreedBytes() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * <code>optional int64 unfreedBytes = 1;</code> + */ + public long getUnfreedBytes() { + return unfreedBytes_; + } + /** + * <code>optional int64 unfreedBytes = 1;</code> + */ + public Builder setUnfreedBytes(long value) { + bitField0_ |= 0x00000001; + unfreedBytes_ = value; + onChanged(); + return this; + } + /** + * <code>optional int64 unfreedBytes = 1;</code> + */ + public Builder clearUnfreedBytes() { + bitField0_ = (bitField0_ & ~0x00000001); + unfreedBytes_ = 0L; + onChanged(); + return this; + } // @@protoc_insertion_point(builder_scope:ClearCacheResponse) } @@ -12285,42 +12381,43 @@ public final class MetaDataProtos { "ateIndexStateRequest\022\036\n\026tableMetadataMut" + "ations\030\001 \003(\014\022\025\n\rclientVersion\030\002 \001(\005\"*\n\021C", "learCacheRequest\022\025\n\rclientVersion\030\001 \001(\005\"" + - "\024\n\022ClearCacheResponse\"*\n\021GetVersionReque" + - "st\022\025\n\rclientVersion\030\001 \001(\005\"%\n\022GetVersionR" + - "esponse\022\017\n\007version\030\001 \002(\003\"\205\001\n\032ClearTableF" + - "romCacheRequest\022\020\n\010tenantId\030\001 \002(\014\022\022\n\nsch" + - "emaName\030\002 \002(\014\022\021\n\ttableName\030\003 \002(\014\022\027\n\017clie" + - "ntTimestamp\030\004 \002(\003\022\025\n\rclientVersion\030\005 \001(\005" + - "\"\035\n\033ClearTableFromCacheResponse*\365\002\n\014Muta" + - "tionCode\022\030\n\024TABLE_ALREADY_EXISTS\020\000\022\023\n\017TA" + - "BLE_NOT_FOUND\020\001\022\024\n\020COLUMN_NOT_FOUND\020\002\022\031\n", - "\025COLUMN_ALREADY_EXISTS\020\003\022\035\n\031CONCURRENT_T" + - "ABLE_MUTATION\020\004\022\027\n\023TABLE_NOT_IN_REGION\020\005" + - "\022\025\n\021NEWER_TABLE_FOUND\020\006\022\034\n\030UNALLOWED_TAB" + - "LE_MUTATION\020\007\022\021\n\rNO_PK_COLUMNS\020\010\022\032\n\026PARE" + - "NT_TABLE_NOT_FOUND\020\t\022\033\n\027FUNCTION_ALREADY" + - "_EXISTS\020\n\022\026\n\022FUNCTION_NOT_FOUND\020\013\022\030\n\024NEW" + - "ER_FUNCTION_FOUND\020\014\022\032\n\026FUNCTION_NOT_IN_R" + - "EGION\020\r2\304\005\n\017MetaDataService\022/\n\010getTable\022" + - "\020.GetTableRequest\032\021.MetaDataResponse\0227\n\014" + - "getFunctions\022\024.GetFunctionsRequest\032\021.Met", - "aDataResponse\0225\n\013createTable\022\023.CreateTab" + - "leRequest\032\021.MetaDataResponse\022;\n\016createFu" + - "nction\022\026.CreateFunctionRequest\032\021.MetaDat" + - "aResponse\0221\n\tdropTable\022\021.DropTableReques" + - "t\032\021.MetaDataResponse\0227\n\014dropFunction\022\024.D" + - "ropFunctionRequest\032\021.MetaDataResponse\0221\n" + - "\taddColumn\022\021.AddColumnRequest\032\021.MetaData" + - "Response\0223\n\ndropColumn\022\022.DropColumnReque" + - "st\032\021.MetaDataResponse\022?\n\020updateIndexStat" + - "e\022\030.UpdateIndexStateRequest\032\021.MetaDataRe", - "sponse\0225\n\nclearCache\022\022.ClearCacheRequest" + - "\032\023.ClearCacheResponse\0225\n\ngetVersion\022\022.Ge" + - "tVersionRequest\032\023.GetVersionResponse\022P\n\023" + - "clearTableFromCache\022\033.ClearTableFromCach" + - "eRequest\032\034.ClearTableFromCacheResponseBB" + - "\n(org.apache.phoenix.coprocessor.generat" + - "edB\016MetaDataProtosH\001\210\001\001\240\001\001" + "*\n\022ClearCacheResponse\022\024\n\014unfreedBytes\030\001 " + + "\001(\003\"*\n\021GetVersionRequest\022\025\n\rclientVersio" + + "n\030\001 \001(\005\"%\n\022GetVersionResponse\022\017\n\007version" + + "\030\001 \002(\003\"\205\001\n\032ClearTableFromCacheRequest\022\020\n" + + "\010tenantId\030\001 \002(\014\022\022\n\nschemaName\030\002 \002(\014\022\021\n\tt" + + "ableName\030\003 \002(\014\022\027\n\017clientTimestamp\030\004 \002(\003\022" + + "\025\n\rclientVersion\030\005 \001(\005\"\035\n\033ClearTableFrom" + + "CacheResponse*\365\002\n\014MutationCode\022\030\n\024TABLE_" + + "ALREADY_EXISTS\020\000\022\023\n\017TABLE_NOT_FOUND\020\001\022\024\n", + "\020COLUMN_NOT_FOUND\020\002\022\031\n\025COLUMN_ALREADY_EX" + + "ISTS\020\003\022\035\n\031CONCURRENT_TABLE_MUTATION\020\004\022\027\n" + + "\023TABLE_NOT_IN_REGION\020\005\022\025\n\021NEWER_TABLE_FO" + + "UND\020\006\022\034\n\030UNALLOWED_TABLE_MUTATION\020\007\022\021\n\rN" + + "O_PK_COLUMNS\020\010\022\032\n\026PARENT_TABLE_NOT_FOUND" + + "\020\t\022\033\n\027FUNCTION_ALREADY_EXISTS\020\n\022\026\n\022FUNCT" + + "ION_NOT_FOUND\020\013\022\030\n\024NEWER_FUNCTION_FOUND\020" + + "\014\022\032\n\026FUNCTION_NOT_IN_REGION\020\r2\304\005\n\017MetaDa" + + "taService\022/\n\010getTable\022\020.GetTableRequest\032" + + "\021.MetaDataResponse\0227\n\014getFunctions\022\024.Get", + "FunctionsRequest\032\021.MetaDataResponse\0225\n\013c" + + "reateTable\022\023.CreateTableRequest\032\021.MetaDa" + + "taResponse\022;\n\016createFunction\022\026.CreateFun" + + "ctionRequest\032\021.MetaDataResponse\0221\n\tdropT" + + "able\022\021.DropTableRequest\032\021.MetaDataRespon" + + "se\0227\n\014dropFunction\022\024.DropFunctionRequest" + + "\032\021.MetaDataResponse\0221\n\taddColumn\022\021.AddCo" + + "lumnRequest\032\021.MetaDataResponse\0223\n\ndropCo" + + "lumn\022\022.DropColumnRequest\032\021.MetaDataRespo" + + "nse\022?\n\020updateIndexState\022\030.UpdateIndexSta", + "teRequest\032\021.MetaDataResponse\0225\n\nclearCac" + + "he\022\022.ClearCacheRequest\032\023.ClearCacheRespo" + + "nse\0225\n\ngetVersion\022\022.GetVersionRequest\032\023." + + "GetVersionResponse\022P\n\023clearTableFromCach" + + "e\022\033.ClearTableFromCacheRequest\032\034.ClearTa" + + "bleFromCacheResponseBB\n(org.apache.phoen" + + "ix.coprocessor.generatedB\016MetaDataProtos" + + "H\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -12398,7 +12495,7 @@ public final class MetaDataProtos { internal_static_ClearCacheResponse_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ClearCacheResponse_descriptor, - new java.lang.String[] { }); + new java.lang.String[] { "UnfreedBytes", }); internal_static_GetVersionRequest_descriptor = getDescriptor().getMessageTypes().get(12); internal_static_GetVersionRequest_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java index e70b35f..74f2c5f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java @@ -73,6 +73,7 @@ public class GlobalMemoryManager implements MemoryManager { synchronized(sync) { while (usedMemoryBytes + minBytes > maxMemoryBytes) { // Only wait if minBytes not available try { + logger.debug("Waiting for " + (usedMemoryBytes + minBytes - maxMemoryBytes) + " bytes to be free"); long remainingWaitTimeMs = maxWaitMs - (System.currentTimeMillis() - startTimeMs); if (remainingWaitTimeMs <= 0) { // Ran out of time waiting for some memory to get freed up throw new InsufficientMemoryException("Requested memory of " + minBytes + " bytes could not be allocated. Using memory of " + usedMemoryBytes + " bytes from global pool of " + maxMemoryBytes + " bytes after waiting for " + maxWaitMs + "ms."); @@ -109,12 +110,15 @@ public class GlobalMemoryManager implements MemoryManager { private class GlobalMemoryChunk implements MemoryChunk { private volatile long size; + //private volatile String stack; private GlobalMemoryChunk(long size) { if (size < 0) { throw new IllegalStateException("Size of memory chunk must be greater than zero, but instead is " + size); } this.size = size; + // Useful for debugging where a piece of memory was allocated + // this.stack = ExceptionUtils.getStackTrace(new Throwable()); } @Override @@ -138,6 +142,7 @@ public class GlobalMemoryManager implements MemoryManager { } else { allocateBytes(nAdditionalBytes, nAdditionalBytes); size = nBytes; + //this.stack = ExceptionUtils.getStackTrace(new Throwable()); } } } @@ -150,6 +155,7 @@ public class GlobalMemoryManager implements MemoryManager { try { if (size > 0) { logger.warn("Orphaned chunk of " + size + " bytes found during finalize"); + //logger.warn("Orphaned chunk of " + size + " bytes found during finalize allocated here:\n" + stack); } freeMemory(); } finally { http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java index fc41706..d839fa3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java @@ -116,7 +116,7 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated public PTableStats getTableStats(byte[] physicalName, long clientTimeStamp) throws SQLException; - public void clearCache() throws SQLException; + public long clearCache() throws SQLException; public int getSequenceSaltBuckets(); TransactionSystemClient getTransactionSystemClient(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 1ea7985..29b9756 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -2505,27 +2505,35 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement * @throws SQLException */ @Override - public void clearCache() throws SQLException { + public long clearCache() throws SQLException { try { SQLException sqlE = null; HTableInterface htable = this.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); try { - htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW, - HConstants.EMPTY_END_ROW, new Batch.Call<MetaDataService, ClearCacheResponse>() { - @Override - public ClearCacheResponse call(MetaDataService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<ClearCacheResponse> rpcCallback = - new BlockingRpcCallback<ClearCacheResponse>(); - ClearCacheRequest.Builder builder = ClearCacheRequest.newBuilder(); - builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); - instance.clearCache(controller, builder.build(), rpcCallback); - if(controller.getFailedOn() != null) { - throw controller.getFailedOn(); + final Map<byte[], Long> results = + htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW, new Batch.Call<MetaDataService, Long>() { + @Override + public Long call(MetaDataService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback<ClearCacheResponse> rpcCallback = + new BlockingRpcCallback<ClearCacheResponse>(); + ClearCacheRequest.Builder builder = ClearCacheRequest.newBuilder(); + builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); + instance.clearCache(controller, builder.build(), rpcCallback); + if(controller.getFailedOn() != null) { + throw controller.getFailedOn(); + } + return rpcCallback.get().getUnfreedBytes(); } - return rpcCallback.get(); + }); + long unfreedBytes = 0; + for (Map.Entry<byte[],Long> result : results.entrySet()) { + if (result.getValue() != null) { + unfreedBytes += result.getValue(); } - }); + } + return unfreedBytes; } catch (IOException e) { throw ServerUtil.parseServerException(e); } catch (Throwable e) { @@ -2549,6 +2557,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } catch (Exception e) { throw new SQLException(ServerUtil.parseServerException(e)); } + return 0; } private void flushTable(byte[] tableName) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index e983a4c..f1ab319 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -83,13 +83,13 @@ import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.SequenceUtil; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import co.cask.tephra.TransactionManager; import co.cask.tephra.TransactionSystemClient; import co.cask.tephra.inmemory.InMemoryTxSystemClient; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - /** * @@ -512,7 +512,8 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple } @Override - public void clearCache() throws SQLException { + public long clearCache() throws SQLException { + return 0; } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index ca662be..b56ff85 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -251,8 +251,8 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple @Override - public void clearCache() throws SQLException { - getDelegate().clearCache(); + public long clearCache() throws SQLException { + return getDelegate().clearCache(); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java b/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java new file mode 100644 index 0000000..ac2a850 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.cache; + +import static org.junit.Assert.assertEquals; + +import java.io.Closeable; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.sql.SQLException; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.memory.GlobalMemoryManager; +import org.apache.phoenix.memory.MemoryManager.MemoryChunk; +import org.apache.phoenix.util.ByteUtil; +import org.junit.Test; + +import com.google.common.base.Ticker; + +public class TenantCacheTest { + + @Test + public void testInvalidateClosesMemoryChunk() throws SQLException { + int maxServerCacheTimeToLive = 10000; + long maxBytes = 1000; + int maxWaitMs = 1000; + GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes, maxWaitMs); + TenantCacheImpl newTenantCache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive); + ImmutableBytesPtr cacheId = new ImmutableBytesPtr(Bytes.toBytes("a")); + ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a")); + newTenantCache.addServerCache(cacheId, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory); + assertEquals(maxBytes-1, memoryManager.getAvailableMemory()); + newTenantCache.removeServerCache(cacheId); + assertEquals(maxBytes, memoryManager.getAvailableMemory()); + } + + @Test + public void testTimeoutClosesMemoryChunk() throws Exception { + int maxServerCacheTimeToLive = 10; + long maxBytes = 1000; + int maxWaitMs = 10; + GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes, maxWaitMs); + ManualTicker ticker = new ManualTicker(); + TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, ticker); + ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes("a")); + ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a")); + cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory); + assertEquals(maxBytes-1, memoryManager.getAvailableMemory()); + ticker.time += (maxServerCacheTimeToLive + 1) * 1000000; + cache.cleanUp(); + assertEquals(maxBytes, memoryManager.getAvailableMemory()); + } + + public static class ManualTicker extends Ticker { + public long time = 0; + + @Override + public long read() { + return time; + } + + } + + public static ServerCacheFactory cacheFactory = new ServerCacheFactory() { + + @Override + public void readFields(DataInput arg0) throws IOException { + } + + @Override + public void write(DataOutput arg0) throws IOException { + } + + @Override + public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk) + throws SQLException { + return chunk; + } + + }; +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-protocol/src/main/MetaDataService.proto ---------------------------------------------------------------------- diff --git a/phoenix-protocol/src/main/MetaDataService.proto b/phoenix-protocol/src/main/MetaDataService.proto index c265158..c631512 100644 --- a/phoenix-protocol/src/main/MetaDataService.proto +++ b/phoenix-protocol/src/main/MetaDataService.proto @@ -119,6 +119,7 @@ message ClearCacheRequest { } message ClearCacheResponse { + optional int64 unfreedBytes = 1; } message GetVersionRequest {
