Repository: phoenix
Updated Branches:
  refs/heads/master a0ae8025e -> 9cc77c810


PHOENIX-1751 Perform aggregations, sorting, etc, in the preScannerNext instead 
of postScannerOpen (Lars Hofhansl)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9cc77c81
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9cc77c81
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9cc77c81

Branch: refs/heads/master
Commit: 9cc77c8104245a138a62ba251e926e14a74d8c11
Parents: a0ae802
Author: James Taylor <[email protected]>
Authored: Tue Aug 23 12:39:45 2016 -0700
Committer: James Taylor <[email protected]>
Committed: Wed Aug 24 09:21:28 2016 -0700

----------------------------------------------------------------------
 .../apache/phoenix/end2end/RenewLeaseIT.java    |  90 +++++++++++++
 .../coprocessor/BaseScannerRegionObserver.java  | 126 +++++++++++++------
 .../coprocessor/DelegateRegionScanner.java      |   2 +-
 3 files changed, 176 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/9cc77c81/phoenix-core/src/it/java/org/apache/phoenix/end2end/RenewLeaseIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RenewLeaseIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RenewLeaseIT.java
new file mode 100644
index 0000000..fa0bc8e
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RenewLeaseIT.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+
+public class RenewLeaseIT extends BaseOwnClusterHBaseManagedTimeIT {
+    private static final long RPC_TIMEOUT = 2000;
+    private static volatile boolean SLEEP_NOW = false;
+    private static final String TABLE_NAME = "FOO_BAR";
+    
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1);
+        serverProps.put("hbase.coprocessor.region.classes", 
SleepingRegionObserver.class.getName());
+        Map<String,String> clientProps = Maps.newHashMapWithExpectedSize(1);
+        // Must update config before starting server
+        clientProps.put("hbase.rpc.timeout", Long.toString(RPC_TIMEOUT));
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
new ReadOnlyProps(clientProps.entrySet().iterator()));
+    }
+    
+    @Test
+    public void testLeaseDoesNotTimeout() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(url, props);
+        conn.createStatement().execute("create table " + TABLE_NAME + "(k 
VARCHAR PRIMARY KEY)");
+        SLEEP_NOW = true;
+        try {
+            ResultSet rs = conn.createStatement().executeQuery("select 
count(*) from " + TABLE_NAME);
+            assertTrue(rs.next());
+            assertEquals(0, rs.getLong(1));
+        } finally {
+            SLEEP_NOW = false;
+        }
+    }
+    
+    public static class SleepingRegionObserver extends SimpleRegionObserver {
+        @Override
+        public boolean preScannerNext(final 
ObserverContext<RegionCoprocessorEnvironment> c,
+                final InternalScanner s, final List<Result> results,
+                final int limit, final boolean hasMore) throws IOException {
+            try {
+                if (SLEEP_NOW && 
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString().equals(TABLE_NAME))
 {
+                    Thread.sleep(RPC_TIMEOUT * 2);
+                }
+            } catch (InterruptedException e) {
+                throw new IOException(e);
+            }
+            return super.preScannerNext(c, s, results, limit, hasMore);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9cc77c81/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 39ac6fe..4fa1399 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
@@ -44,7 +43,6 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
-import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
@@ -60,11 +58,10 @@ import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.tephra.Transaction;
 
 import com.google.common.collect.ImmutableList;
 
-import org.apache.tephra.Transaction;
-
 
 abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
 
@@ -189,7 +186,89 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
         return s;
     }
 
-    /**
+    private class RegionScannerHolder extends DelegateRegionScanner {
+            private final Scan scan;
+            private final ObserverContext<RegionCoprocessorEnvironment> c;
+            private boolean wasOverriden;
+            
+            public 
RegionScannerHolder(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan, 
final RegionScanner scanner) {
+                super(scanner);
+                this.c = c;
+                this.scan = scan;
+            }
+    
+            private void overrideDelegate() throws IOException {
+                if (wasOverriden) {
+                    return;
+                }
+                boolean success = false;
+                // Save the current span. When done with the child span, reset 
the span back to
+                // what it was. Otherwise, this causes the thread local 
storing the current span
+                // to not be reset back to null causing catastrophic infinite 
loops
+                // and region servers to crash. See 
https://issues.apache.org/jira/browse/PHOENIX-1596
+                // TraceScope can't be used here because closing the scope 
will end up calling
+                // currentSpan.stop() and that should happen only when we are 
closing the scanner.
+                final Span savedSpan = Trace.currentSpan();
+                final Span child = Trace.startSpan(SCANNER_OPENED_TRACE_INFO, 
savedSpan).getSpan();
+                try {
+                    RegionScanner scanner = doPostScannerOpen(c, scan, 
delegate);
+                    scanner = new DelegateRegionScanner(scanner) {
+                        // This isn't very obvious but close() could be called 
in a thread
+                        // that is different from the thread that created the 
scanner.
+                        @Override
+                        public void close() throws IOException {
+                            try {
+                                delegate.close();
+                            } finally {
+                                if (child != null) {
+                                    child.stop();
+                                }
+                            }
+                        }
+                    };
+                    this.delegate = scanner;
+                    wasOverriden = true;
+                    success = true;
+                } catch (Throwable t) {
+                    
ServerUtil.throwIOException(c.getEnvironment().getRegionInfo().getRegionNameAsString(),
 t);
+                } finally {
+                    try {
+                        if (!success && child != null) {
+                            child.stop();
+                        }
+                    } finally {
+                        Trace.continueSpan(savedSpan);
+                    }
+                }
+            }
+            
+            @Override
+            public boolean next(List<Cell> result, ScannerContext 
scannerContext) throws IOException {
+                overrideDelegate();
+                return super.next(result, scannerContext);
+            }
+
+            @Override
+            public boolean next(List<Cell> result) throws IOException {
+                overrideDelegate();
+                return super.next(result);
+            }
+
+            @Override
+            public boolean nextRaw(List<Cell> result, ScannerContext 
scannerContext) throws IOException {
+                overrideDelegate();
+                return super.nextRaw(result, scannerContext);
+            }
+            
+            @Override
+            public boolean nextRaw(List<Cell> result) throws IOException {
+                overrideDelegate();
+                return super.nextRaw(result);
+            }
+        }
+        
+
+        /**
      * Wrapper for {@link #postScannerOpen(ObserverContext, Scan, 
RegionScanner)} that ensures no non IOException is thrown,
      * to prevent the coprocessor from becoming blacklisted.
      *
@@ -202,42 +281,7 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
             if (!isRegionObserverFor(scan)) {
                 return s;
             }
-            boolean success = false;
-            // Save the current span. When done with the child span, reset the 
span back to
-            // what it was. Otherwise, this causes the thread local storing 
the current span
-            // to not be reset back to null causing catastrophic infinite loops
-            // and region servers to crash. See 
https://issues.apache.org/jira/browse/PHOENIX-1596
-            // TraceScope can't be used here because closing the scope will 
end up calling
-            // currentSpan.stop() and that should happen only when we are 
closing the scanner.
-            final Span savedSpan = Trace.currentSpan();
-            final Span child = Trace.startSpan(SCANNER_OPENED_TRACE_INFO, 
savedSpan).getSpan();
-            try {
-                RegionScanner scanner = doPostScannerOpen(c, scan, s);
-                scanner = new DelegateRegionScanner(scanner) {
-                    // This isn't very obvious but close() could be called in 
a thread
-                    // that is different from the thread that created the 
scanner.
-                    @Override
-                    public void close() throws IOException {
-                        try {
-                            delegate.close();
-                        } finally {
-                            if (child != null) {
-                                child.stop();
-                            }
-                        }
-                    }
-                };
-                success = true;
-                return scanner;
-            } finally {
-                try {
-                    if (!success && child != null) {
-                        child.stop();
-                    }
-                } finally {
-                    Trace.continueSpan(savedSpan);
-                }
-            }
+            return new RegionScannerHolder(c, scan, s);
         } catch (Throwable t) {
             // If the exception is NotServingRegionException then throw it as
             // StaleRegionBoundaryCacheException to handle it by phoenix 
client other wise hbase

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9cc77c81/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
index 43c35a8..0ddabed 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext;
 
 public class DelegateRegionScanner implements RegionScanner {
 
-    protected final RegionScanner delegate;
+    protected RegionScanner delegate;
 
     public DelegateRegionScanner(RegionScanner scanner) {
         this.delegate = scanner;

Reply via email to