Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 33a2ba772 -> 8fe0b86ce


PHOENIX-2935 IndexMetaData cache can expire when a delete and or query running 
on server


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8fe0b86c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8fe0b86c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8fe0b86c

Branch: refs/heads/4.x-HBase-0.98
Commit: 8fe0b86ce15712c796188bf5f358dcbcbcfd8b2b
Parents: 33a2ba7
Author: Ankit Singhal <ankitsingha...@gmail.com>
Authored: Wed Oct 12 14:13:17 2016 +0530
Committer: Ankit Singhal <ankitsingha...@gmail.com>
Committed: Wed Oct 12 14:13:17 2016 +0530

----------------------------------------------------------------------
 .../apache/phoenix/compile/DeleteCompiler.java  |  8 ++--
 .../apache/phoenix/compile/UpsertCompiler.java  | 44 +++++++++-----------
 .../UngroupedAggregateRegionObserver.java       | 22 +++++++---
 .../phoenix/index/PhoenixIndexMetaData.java     |  7 ++--
 4 files changed, 43 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/8fe0b86c/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 42efd68..e0881cf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
@@ -45,7 +46,6 @@ import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.execute.MutationState.RowMutationState;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.index.IndexMetaDataCacheClient;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -583,10 +583,10 @@ public class DeleteCompiler {
                         ServerCache cache = null;
                         try {
                             if (ptr.getLength() > 0) {
-                                IndexMetaDataCacheClient client = new 
IndexMetaDataCacheClient(connection, tableRef);
-                                cache = 
client.addIndexMetadataCache(context.getScanRanges(), ptr, txState);
-                                byte[] uuidValue = cache.getId();
+                                byte[] uuidValue = 
ServerCacheClient.generateId();
                                 
context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+                                
context.getScan().setAttribute(PhoenixIndexCodec.INDEX_MD, ptr.get());
+                                
context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
                             }
                             ResultIterator iterator = aggPlan.iterator();
                             try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8fe0b86c/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 0922de2..3f9e6b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
+import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -52,7 +52,6 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMaintainer;
-import org.apache.phoenix.index.IndexMetaDataCacheClient;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -726,32 +725,27 @@ public class UpsertCompiler {
                             table.getIndexMaintainers(ptr, 
context.getConnection());
                             byte[] txState = table.isTransactional() ? 
connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
 
-                            ServerCache cache = null;
+                            if (ptr.getLength() > 0) {
+                                byte[] uuidValue = 
ServerCacheClient.generateId();
+                                
scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+                                scan.setAttribute(PhoenixIndexCodec.INDEX_MD, 
ptr.get());
+                                
scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+                            }
+                            ResultIterator iterator = aggPlan.iterator();
                             try {
-                                if (ptr.getLength() > 0) {
-                                    IndexMetaDataCacheClient client = new 
IndexMetaDataCacheClient(connection, tableRef);
-                                    cache = 
client.addIndexMetadataCache(context.getScanRanges(), ptr, txState);
-                                    byte[] uuidValue = cache.getId();
-                                    
scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                }
-                                ResultIterator iterator = aggPlan.iterator();
-                                try {
-                                    Tuple row = iterator.next();
-                                    final long mutationCount = 
(Long)aggProjector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
-                                    return new MutationState(maxSize, 
connection) {
-                                        @Override
-                                        public long getUpdateCount() {
-                                            return mutationCount;
-                                        }
-                                    };
-                                } finally {
-                                    iterator.close();
-                                }
+                                Tuple row = iterator.next();
+                                final long mutationCount = 
(Long)aggProjector.getColumnProjector(0).getValue(row,
+                                        PLong.INSTANCE, ptr);
+                                return new MutationState(maxSize, connection) {
+                                    @Override
+                                    public long getUpdateCount() {
+                                        return mutationCount;
+                                    }
+                                };
                             } finally {
-                                if (cache != null) {
-                                    cache.close();
-                                }
+                                iterator.close();
                             }
+                            
                         }
     
                         @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8fe0b86c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 4c6a087..76bd288 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -177,8 +177,15 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
     }
 
-    private void commitBatch(HRegion region, List<Mutation> mutations, byte[] 
indexUUID,
-            long blockingMemstoreSize) throws IOException {
+    private void commitBatch(HRegion region, List<Mutation> mutations, byte[] 
indexUUID, long blockingMemstoreSize,
+            byte[] indexMaintainersPtr, byte[] txState) throws IOException {
+        if (indexMaintainersPtr != null) {
+            mutations.get(0).setAttribute(PhoenixIndexCodec.INDEX_MD, 
indexMaintainersPtr);
+        }
+
+        if (txState != null) {
+            mutations.get(0).setAttribute(BaseScannerRegionObserver.TX_STATE, 
txState);
+        }
       if (indexUUID != null) {
           for (Mutation m : mutations) {
               m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
@@ -290,6 +297,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         RegionScanner theScanner = s;
         
         byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
+        byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
         List<Expression> selectExpressions = null;
         byte[] upsertSelectTable = 
scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE);
         boolean isUpsert = false;
@@ -372,6 +380,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         }
         long rowCount = 0;
         final RegionScanner innerScanner = theScanner;
+        byte[] indexMaintainersPtr = 
scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
         boolean acquiredLock = false;
         try {
             if(needToWrite) {
@@ -595,13 +604,14 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                             // Commit in batches based on 
UPSERT_BATCH_SIZE_ATTRIB in config
                             if (!mutations.isEmpty() && batchSize > 0 &&
                                     mutations.size() % batchSize == 0) {
-                                commitBatch(region, mutations, indexUUID, 
blockingMemStoreSize);
+                                commitBatch(region, mutations, indexUUID, 
blockingMemStoreSize, indexMaintainersPtr,
+                                        txState);
                                 mutations.clear();
                             }
                             // Commit in batches based on 
UPSERT_BATCH_SIZE_ATTRIB in config
                             if (!indexMutations.isEmpty() && batchSize > 0 &&
                                     indexMutations.size() % batchSize == 0) {
-                                commitBatch(region, indexMutations, null, 
blockingMemStoreSize);
+                                commitBatch(region, indexMutations, null, 
blockingMemStoreSize, null, txState);
                                 indexMutations.clear();
                             }
                         } catch (ConstraintViolationException e) {
@@ -617,11 +627,11 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                     }
                 } while (hasMore);
                 if (!mutations.isEmpty()) {
-                    commitBatch(region,mutations, indexUUID, 
blockingMemStoreSize);
+                    commitBatch(region, mutations, indexUUID, 
blockingMemStoreSize, indexMaintainersPtr, txState);
                 }
 
                 if (!indexMutations.isEmpty()) {
-                    commitBatch(region,indexMutations, null, 
blockingMemStoreSize);
+                    commitBatch(region, indexMutations, null, 
blockingMemStoreSize, indexMaintainersPtr, txState);
                     indexMutations.clear();
                 }
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8fe0b86c/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
index 818713b..d22e957 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -74,9 +74,10 @@ public class PhoenixIndexMetaData implements IndexMetaData {
             TenantCache cache = GlobalCache.getTenantCache(env, tenantId);
             IndexMetaDataCache indexCache = 
(IndexMetaDataCache)cache.getServerCache(new ImmutableBytesPtr(uuid));
             if (indexCache == null) {
-                String msg = "key=" + ServerCacheClient.idToString(uuid) + " 
region=" + env.getRegion();
-                SQLException e = new 
SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_METADATA_NOT_FOUND)
-                        .setMessage(msg).build().buildException();
+                String msg = "key=" + ServerCacheClient.idToString(uuid) + " 
region=" + env.getRegion() + "host="
+                        + env.getRegionServerServices().getServerName();
+                SQLException e = new 
SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_METADATA_NOT_FOUND).setMessage(msg)
+                        .build().buildException();
                 ServerUtil.throwIOException("Index update failed", e); // will 
not return
             }
             return indexCache;

Reply via email to