Repository: phoenix Updated Branches: refs/heads/4.8-HBase-1.1 f34309567 -> dba8c92a0
PHOENIX-3203 Tenant cache lookup in Global Cache fails in certain conditions Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/dba8c92a Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/dba8c92a Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/dba8c92a Branch: refs/heads/4.8-HBase-1.1 Commit: dba8c92a079e227ad16cf56d39402eb6f1fcef88 Parents: f343095 Author: Thomas D'Silva <[email protected]> Authored: Thu Aug 25 15:25:18 2016 -0700 Committer: Thomas D'Silva <[email protected]> Committed: Fri Aug 26 11:20:52 2016 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/index/MutableIndexIT.java | 25 +++++++++++++------- .../org/apache/phoenix/cache/GlobalCache.java | 4 ++-- .../cache/aggcache/SpillableGroupByCache.java | 5 ++-- .../phoenix/coprocessor/GroupByCache.java | 5 ++-- .../GroupedAggregateRegionObserver.java | 15 ++++++------ .../coprocessor/HashJoinRegionScanner.java | 5 ++-- .../phoenix/coprocessor/ScanRegionObserver.java | 9 ++++--- .../phoenix/index/PhoenixIndexMetaData.java | 3 +-- .../java/org/apache/phoenix/util/ScanUtil.java | 5 ++-- 9 files changed, 40 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/dba8c92a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java index 29057db..6a49076 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java @@ -824,9 +824,9 @@ public class MutableIndexIT extends BaseHBaseManagedTimeIT { conn.createStatement().execute( "CREATE TABLE IF NOT EXISTS " + fullTableName + "(TENANT_ID CHAR(15) NOT NULL,"+ - "TYPE VARCHAR(25) NOT NULL,"+ + "TYPE VARCHAR(25),"+ "ENTITY_ID CHAR(15) NOT NULL,"+ - "CONSTRAINT PK_CONSTRAINT PRIMARY KEY (TENANT_ID, TYPE, ENTITY_ID)) MULTI_TENANT=TRUE " + "CONSTRAINT PK_CONSTRAINT PRIMARY KEY (TENANT_ID, ENTITY_ID)) MULTI_TENANT=TRUE " + (!tableDDLOptions.isEmpty() ? "," + tableDDLOptions : "") ); // create index conn.createStatement().execute("CREATE INDEX IF NOT EXISTS " + indexName + " ON " + fullTableName + " (ENTITY_ID, TYPE)"); @@ -836,14 +836,23 @@ public class MutableIndexIT extends BaseHBaseManagedTimeIT { props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, "tenant1"); // connection is tenant-specific try (Connection tenantConn = DriverManager.getConnection(getUrl(), props)) { - for (int i=0; i<2; ++i) { - PreparedStatement stmt = tenantConn.prepareStatement(dml); - stmt.setString(1, "00000000000000" + String.valueOf(i)); - stmt.setString(2, String.valueOf(i)); - assertEquals(1,stmt.executeUpdate()); - } + // upsert one row + upsertRow(dml, tenantConn, 0); + tenantConn.commit(); + ResultSet rs = tenantConn.createStatement().executeQuery("SELECT ENTITY_ID FROM " + fullTableName + " ORDER BY TYPE LIMIT 5"); + assertTrue(rs.next()); + // upsert two rows which ends up using the tenant cache + upsertRow(dml, tenantConn, 1); + upsertRow(dml, tenantConn, 2); tenantConn.commit(); } } } + +private void upsertRow(String dml, Connection tenantConn, int i) throws SQLException { + PreparedStatement stmt = tenantConn.prepareStatement(dml); + stmt.setString(1, "00000000000000" + String.valueOf(i)); + stmt.setString(2, String.valueOf(i)); + assertEquals(1,stmt.executeUpdate()); +} } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/dba8c92a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java index 319597c..0c3a87a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java @@ -136,7 +136,7 @@ public class GlobalCache extends TenantCacheImpl { * @param tenantId the tenant ID or null if not applicable. * @return TenantCache */ - public static TenantCache getTenantCache(RegionCoprocessorEnvironment env, ImmutableBytesWritable tenantId) { + public static TenantCache getTenantCache(RegionCoprocessorEnvironment env, ImmutableBytesPtr tenantId) { GlobalCache globalCache = GlobalCache.getInstance(env); TenantCache tenantCache = tenantId == null ? globalCache : globalCache.getChildTenantCache(tenantId); return tenantCache; @@ -165,7 +165,7 @@ public class GlobalCache extends TenantCacheImpl { * @param tenantId the ID that identifies the tenant * @return the existing or newly created TenantCache */ - public TenantCache getChildTenantCache(ImmutableBytesWritable tenantId) { + public TenantCache getChildTenantCache(ImmutableBytesPtr tenantId) { TenantCache tenantCache = perTenantCacheMap.get(tenantId); if (tenantCache == null) { int maxTenantMemoryPerc = config.getInt(MAX_TENANT_MEMORY_PERC_ATTRIB, QueryServicesOptions.DEFAULT_MAX_TENANT_MEMORY_PERC); http://git-wip-us.apache.org/repos/asf/phoenix/blob/dba8c92a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java index 8edeb3a..dc0ae21 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java @@ -35,7 +35,6 @@ import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.RegionScanner; @@ -126,7 +125,7 @@ public class SpillableGroupByCache implements GroupByCache { * @param aggs * @param ctxt */ - public SpillableGroupByCache(final RegionCoprocessorEnvironment env, ImmutableBytesWritable tenantId, + public SpillableGroupByCache(final RegionCoprocessorEnvironment env, ImmutableBytesPtr tenantId, ServerAggregators aggs, final int estSizeNum) { totalNumElements = 0; this.aggregators = aggs; @@ -217,7 +216,7 @@ public class SpillableGroupByCache implements GroupByCache { * implements an implicit put() of a new key/value tuple and loads it into the cache */ @Override - public Aggregator[] cache(ImmutableBytesWritable cacheKey) { + public Aggregator[] cache(ImmutableBytesPtr cacheKey) { ImmutableBytesPtr key = new ImmutableBytesPtr(cacheKey); Aggregator[] rowAggregators = cache.get(key); if (rowAggregators == null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/dba8c92a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupByCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupByCache.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupByCache.java index 38c4ca0..68d07a9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupByCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupByCache.java @@ -19,10 +19,9 @@ package org.apache.phoenix.coprocessor; import java.io.Closeable; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.RegionScanner; - import org.apache.phoenix.expression.aggregator.Aggregator; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; /** * @@ -34,6 +33,6 @@ import org.apache.phoenix.expression.aggregator.Aggregator; */ public interface GroupByCache extends Closeable { long size(); - Aggregator[] cache(ImmutableBytesWritable key); + Aggregator[] cache(ImmutableBytesPtr key); RegionScanner getScanner(RegionScanner s); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/dba8c92a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java index 2d7c291..2c194c9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; @@ -138,7 +137,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns); viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan); } - ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); + ImmutableBytesPtr tempPtr = new ImmutableBytesPtr(); innerScanner = getWrappedScanner(c, innerScanner, offset, scan, dataColumns, tupleProjector, c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr); @@ -239,7 +238,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { private int estDistVals; - InMemoryGroupByCache(RegionCoprocessorEnvironment env, ImmutableBytesWritable tenantId, byte[] customAnnotations, ServerAggregators aggregators, int estDistVals) { + InMemoryGroupByCache(RegionCoprocessorEnvironment env, ImmutableBytesPtr tenantId, byte[] customAnnotations, ServerAggregators aggregators, int estDistVals) { int estValueSize = aggregators.getEstimatedByteSize(); long estSize = sizeOfUnorderedGroupByMap(estDistVals, estValueSize); TenantCache tenantCache = GlobalCache.getTenantCache(env, tenantId); @@ -257,7 +256,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { } @Override - public Aggregator[] cache(ImmutableBytesWritable cacheKey) { + public Aggregator[] cache(ImmutableBytesPtr cacheKey) { ImmutableBytesPtr key = new ImmutableBytesPtr(cacheKey); Aggregator[] rowAggregators = aggregateMap.get(key); if (rowAggregators == null) { @@ -348,7 +347,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { private GroupByCacheFactory() { } - GroupByCache newCache(RegionCoprocessorEnvironment env, ImmutableBytesWritable tenantId, byte[] customAnnotations, ServerAggregators aggregators, int estDistVals) { + GroupByCache newCache(RegionCoprocessorEnvironment env, ImmutableBytesPtr tenantId, byte[] customAnnotations, ServerAggregators aggregators, int estDistVals) { Configuration conf = env.getConfiguration(); boolean spillableEnabled = conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE); @@ -414,7 +413,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { hasMore = scanner.nextRaw(results); if (!results.isEmpty()) { result.setKeyValues(results); - ImmutableBytesWritable key = + ImmutableBytesPtr key = TupleUtil.getConcatenatedValue(result, expressions); Aggregator[] rowAggregators = groupByCache.cache(key); // Aggregate values here @@ -457,7 +456,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { } return new BaseRegionScanner(scanner) { private long rowCount = 0; - private ImmutableBytesWritable currentKey = null; + private ImmutableBytesPtr currentKey = null; @Override public boolean next(List<Cell> results) throws IOException { @@ -465,7 +464,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { boolean atLimit; boolean aggBoundary = false; MultiKeyValueTuple result = new MultiKeyValueTuple(); - ImmutableBytesWritable key = null; + ImmutableBytesPtr key = null; Aggregator[] rowAggregators = aggregators.getAggregators(); // If we're calculating no aggregate functions, we can exit at the // start of a new row. Otherwise, we have to wait until an agg http://git-wip-us.apache.org/repos/asf/phoenix/blob/dba8c92a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java index bd9c5ec..4340886 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.util.Bytes; @@ -65,7 +64,7 @@ public class HashJoinRegionScanner implements RegionScanner { private ValueBitSet[] tempSrcBitSet; @SuppressWarnings("unchecked") - public HashJoinRegionScanner(RegionScanner scanner, TupleProjector projector, HashJoinInfo joinInfo, ImmutableBytesWritable tenantId, RegionCoprocessorEnvironment env) throws IOException { + public HashJoinRegionScanner(RegionScanner scanner, TupleProjector projector, HashJoinInfo joinInfo, ImmutableBytesPtr tenantId, RegionCoprocessorEnvironment env) throws IOException { this.env = env; this.scanner = scanner; this.projector = projector; @@ -198,7 +197,7 @@ public class HashJoinRegionScanner implements RegionScanner { for (Iterator<Tuple> iter = resultQueue.iterator(); iter.hasNext();) { Tuple t = iter.next(); postFilter.reset(); - ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); + ImmutableBytesPtr tempPtr = new ImmutableBytesPtr(); try { if (!postFilter.evaluate(t, tempPtr)) { iter.remove(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/dba8c92a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index 48e3704..ade88db 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.cache.TenantCache; @@ -47,6 +46,7 @@ import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.OrderByExpression; import org.apache.phoenix.expression.function.ArrayIndexFunction; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.iterate.OffsetResultIterator; import org.apache.phoenix.iterate.OrderedResultIterator; @@ -64,12 +64,11 @@ import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; +import org.apache.tephra.Transaction; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import org.apache.tephra.Transaction; - /** * @@ -225,7 +224,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { dataColumns, tupleProjector, dataRegion, indexMaintainer, tx, viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null, ptr); - final ImmutableBytesWritable tenantId = ScanUtil.getTenantId(scan); + final ImmutableBytesPtr tenantId = ScanUtil.getTenantId(scan); if (j != null) { innerScanner = new HashJoinRegionScanner(innerScanner, p, j, tenantId, c.getEnvironment()); } @@ -311,7 +310,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { * getting the first Tuple (which forces running through the entire region) * since after this everything is held in memory */ - private RegionScanner getTopNScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final RegionScanner s, final OrderedResultIterator iterator, ImmutableBytesWritable tenantId) throws Throwable { + private RegionScanner getTopNScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final RegionScanner s, final OrderedResultIterator iterator, ImmutableBytesPtr tenantId) throws Throwable { final Tuple firstTuple; TenantCache tenantCache = GlobalCache.getTenantCache(c.getEnvironment(), tenantId); long estSize = iterator.getEstimatedByteSize(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/dba8c92a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java index 2679f1c..818713b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.cache.IndexMetaDataCache; import org.apache.phoenix.cache.ServerCacheClient; @@ -71,7 +70,7 @@ public class PhoenixIndexMetaData implements IndexMetaData { }; } else { byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB); - ImmutableBytesWritable tenantId = tenantIdBytes == null ? null : new ImmutableBytesWritable(tenantIdBytes); + ImmutableBytesPtr tenantId = tenantIdBytes == null ? null : new ImmutableBytesPtr(tenantIdBytes); TenantCache cache = GlobalCache.getTenantCache(env, tenantId); IndexMetaDataCache indexCache = (IndexMetaDataCache)cache.getServerCache(new ImmutableBytesPtr(uuid)); if (indexCache == null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/dba8c92a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java index d7f6f2f..b0e8a99 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -55,6 +55,7 @@ import org.apache.phoenix.execute.DescVarLengthFastByteComparisons; import org.apache.phoenix.filter.BooleanExpressionFilter; import org.apache.phoenix.filter.DistinctPrefixFilter; import org.apache.phoenix.filter.SkipScanFilter; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.KeyRange.Bound; @@ -115,13 +116,13 @@ public class ScanUtil { // Use getTenantId and pass in column name to match against // in as PSchema attribute. If column name matches in // KeyExpressions, set on scan as attribute - public static ImmutableBytesWritable getTenantId(Scan scan) { + public static ImmutableBytesPtr getTenantId(Scan scan) { // Create Scan with special aggregation column over which to aggregate byte[] tenantId = scan.getAttribute(PhoenixRuntime.TENANT_ID_ATTRIB); if (tenantId == null) { return null; } - return new ImmutableBytesWritable(tenantId); + return new ImmutableBytesPtr(tenantId); } public static void setCustomAnnotations(Scan scan, byte[] annotations) {
