Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.1 5002aa462 -> 0a78f055b
PHOENIX-2935 IndexMetaData cache can expire when a delete and or query running on server Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/0a78f055 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/0a78f055 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/0a78f055 Branch: refs/heads/4.x-HBase-1.1 Commit: 0a78f055b40148d294260cace0a7108b7af1c2c7 Parents: 5002aa4 Author: Ankit Singhal <ankitsingha...@gmail.com> Authored: Wed Oct 12 14:13:46 2016 +0530 Committer: Ankit Singhal <ankitsingha...@gmail.com> Committed: Wed Oct 12 14:13:46 2016 +0530 ---------------------------------------------------------------------- .../apache/phoenix/compile/DeleteCompiler.java | 8 +-- .../apache/phoenix/compile/UpsertCompiler.java | 51 ++++++++------------ .../UngroupedAggregateRegionObserver.java | 22 ++++++--- .../phoenix/index/PhoenixIndexMetaData.java | 7 +-- 4 files changed, 44 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/0a78f055/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index 42efd68..e0881cf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -32,6 +32,7 @@ import java.util.Set; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; @@ -45,7 +46,6 @@ import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.execute.MutationState.RowMutationState; import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; -import org.apache.phoenix.index.IndexMetaDataCacheClient; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.jdbc.PhoenixConnection; @@ -583,10 +583,10 @@ public class DeleteCompiler { ServerCache cache = null; try { if (ptr.getLength() > 0) { - IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); - cache = client.addIndexMetadataCache(context.getScanRanges(), ptr, txState); - byte[] uuidValue = cache.getId(); + byte[] uuidValue = ServerCacheClient.generateId(); context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); + context.getScan().setAttribute(PhoenixIndexCodec.INDEX_MD, ptr.get()); + context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState); } ResultIterator iterator = aggPlan.iterator(); try { http://git-wip-us.apache.org/repos/asf/phoenix/blob/0a78f055/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 26855aa..3f9e6b2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -33,14 +33,10 @@ import java.util.Set; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.client.HRegionLocator; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.cache.ServerCacheClient.ServerCache; +import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; @@ -56,7 +52,6 @@ import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.IndexMaintainer; -import org.apache.phoenix.index.IndexMetaDataCacheClient; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.jdbc.PhoenixConnection; @@ -87,14 +82,13 @@ import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnImpl; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTable.ViewType; import org.apache.phoenix.schema.PTableImpl; +import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.ReadOnlyTableException; import org.apache.phoenix.schema.SortOrder; -import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.TypeMismatchException; import org.apache.phoenix.schema.tuple.Tuple; @@ -731,32 +725,27 @@ public class UpsertCompiler { table.getIndexMaintainers(ptr, context.getConnection()); byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY; - ServerCache cache = null; + if (ptr.getLength() > 0) { + byte[] uuidValue = ServerCacheClient.generateId(); + scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); + scan.setAttribute(PhoenixIndexCodec.INDEX_MD, ptr.get()); + scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); + } + ResultIterator iterator = aggPlan.iterator(); try { - if (ptr.getLength() > 0) { - IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); - cache = client.addIndexMetadataCache(context.getScanRanges(), ptr, txState); - byte[] uuidValue = cache.getId(); - scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - } - ResultIterator iterator = aggPlan.iterator(); - try { - Tuple row = iterator.next(); - final long mutationCount = (Long)aggProjector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr); - return new MutationState(maxSize, connection) { - @Override - public long getUpdateCount() { - return mutationCount; - } - }; - } finally { - iterator.close(); - } + Tuple row = iterator.next(); + final long mutationCount = (Long)aggProjector.getColumnProjector(0).getValue(row, + PLong.INSTANCE, ptr); + return new MutationState(maxSize, connection) { + @Override + public long getUpdateCount() { + return mutationCount; + } + }; } finally { - if (cache != null) { - cache.close(); - } + iterator.close(); } + } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/0a78f055/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index d8d313d..0f175c5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -177,8 +177,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver this.kvBuilder = GenericKeyValueBuilder.INSTANCE; } - private void commitBatch(Region region, List<Mutation> mutations, byte[] indexUUID, - long blockingMemstoreSize) throws IOException { + private void commitBatch(Region region, List<Mutation> mutations, byte[] indexUUID, long blockingMemstoreSize, + byte[] indexMaintainersPtr, byte[] txState) throws IOException { + if (indexMaintainersPtr != null) { + mutations.get(0).setAttribute(PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr); + } + + if (txState != null) { + mutations.get(0).setAttribute(BaseScannerRegionObserver.TX_STATE, txState); + } if (indexUUID != null) { for (Mutation m : mutations) { m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID); @@ -291,6 +298,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver RegionScanner theScanner = s; byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID); + byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE); List<Expression> selectExpressions = null; byte[] upsertSelectTable = scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE); boolean isUpsert = false; @@ -373,6 +381,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } long rowCount = 0; final RegionScanner innerScanner = theScanner; + byte[] indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_MD); boolean acquiredLock = false; try { if(needToWrite) { @@ -596,13 +605,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config if (!mutations.isEmpty() && batchSize > 0 && mutations.size() % batchSize == 0) { - commitBatch(region, mutations, indexUUID, blockingMemStoreSize); + commitBatch(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, + txState); mutations.clear(); } // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config if (!indexMutations.isEmpty() && batchSize > 0 && indexMutations.size() % batchSize == 0) { - commitBatch(region, indexMutations, null, blockingMemStoreSize); + commitBatch(region, indexMutations, null, blockingMemStoreSize, null, txState); indexMutations.clear(); } } catch (ConstraintViolationException e) { @@ -618,11 +628,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } while (hasMore); if (!mutations.isEmpty()) { - commitBatch(region,mutations, indexUUID, blockingMemStoreSize); + commitBatch(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState); } if (!indexMutations.isEmpty()) { - commitBatch(region,indexMutations, null, blockingMemStoreSize); + commitBatch(region, indexMutations, null, blockingMemStoreSize, indexMaintainersPtr, txState); indexMutations.clear(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0a78f055/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java index 818713b..d22e957 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java @@ -74,9 +74,10 @@ public class PhoenixIndexMetaData implements IndexMetaData { TenantCache cache = GlobalCache.getTenantCache(env, tenantId); IndexMetaDataCache indexCache = (IndexMetaDataCache)cache.getServerCache(new ImmutableBytesPtr(uuid)); if (indexCache == null) { - String msg = "key=" + ServerCacheClient.idToString(uuid) + " region=" + env.getRegion(); - SQLException e = new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_METADATA_NOT_FOUND) - .setMessage(msg).build().buildException(); + String msg = "key=" + ServerCacheClient.idToString(uuid) + " region=" + env.getRegion() + "host=" + + env.getRegionServerServices().getServerName(); + SQLException e = new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_METADATA_NOT_FOUND).setMessage(msg) + .build().buildException(); ServerUtil.throwIOException("Index update failed", e); // will not return } return indexCache;