Repository: phoenix Updated Branches: refs/heads/4.8-HBase-1.1 9f8499d36 -> 66005d1cb
PHOENIX-1751 Perform aggregations, sorting, etc, in the preScannerNext instead of postScannerOpen (Lars Hofhansl) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/66005d1c Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/66005d1c Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/66005d1c Branch: refs/heads/4.8-HBase-1.1 Commit: 66005d1cb623a86bcc90e2182953d6b76a00aef4 Parents: 9f8499d Author: James Taylor <[email protected]> Authored: Tue Aug 23 12:39:45 2016 -0700 Committer: James Taylor <[email protected]> Committed: Thu Aug 25 00:39:37 2016 -0700 ---------------------------------------------------------------------- .../apache/phoenix/end2end/RenewLeaseIT.java | 90 +++++++++++++ .../coprocessor/BaseScannerRegionObserver.java | 126 +++++++++++++------ .../coprocessor/DelegateRegionScanner.java | 2 +- 3 files changed, 176 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/66005d1c/phoenix-core/src/it/java/org/apache/phoenix/end2end/RenewLeaseIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RenewLeaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RenewLeaseIT.java new file mode 100644 index 0000000..fa0bc8e --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RenewLeaseIT.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you maynot use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicablelaw or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Maps; + + +public class RenewLeaseIT extends BaseOwnClusterHBaseManagedTimeIT { + private static final long RPC_TIMEOUT = 2000; + private static volatile boolean SLEEP_NOW = false; + private static final String TABLE_NAME = "FOO_BAR"; + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1); + serverProps.put("hbase.coprocessor.region.classes", SleepingRegionObserver.class.getName()); + Map<String,String> clientProps = Maps.newHashMapWithExpectedSize(1); + // Must update config before starting server + clientProps.put("hbase.rpc.timeout", Long.toString(RPC_TIMEOUT)); + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); + } + + @Test + public void testLeaseDoesNotTimeout() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(url, props); + conn.createStatement().execute("create table " + TABLE_NAME + "(k VARCHAR PRIMARY KEY)"); + SLEEP_NOW = true; + try { + ResultSet rs = conn.createStatement().executeQuery("select count(*) from " + TABLE_NAME); + assertTrue(rs.next()); + assertEquals(0, rs.getLong(1)); + } finally { + SLEEP_NOW = false; + } + } + + public static class SleepingRegionObserver extends SimpleRegionObserver { + @Override + public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c, + final InternalScanner s, final List<Result> results, + final int limit, final boolean hasMore) throws IOException { + try { + if (SLEEP_NOW && c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString().equals(TABLE_NAME)) { + Thread.sleep(RPC_TIMEOUT * 2); + } + } catch (InterruptedException e) { + throw new IOException(e); + } + return super.preScannerNext(c, s, results, limit, hasMore); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/66005d1c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 39ac6fe..4fa1399 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; @@ -44,7 +43,6 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.util.Bytes; import org.apache.htrace.Span; import org.apache.htrace.Trace; -import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; @@ -60,11 +58,10 @@ import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; +import org.apache.tephra.Transaction; import com.google.common.collect.ImmutableList; -import org.apache.tephra.Transaction; - abstract public class BaseScannerRegionObserver extends BaseRegionObserver { @@ -189,7 +186,89 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { return s; } - /** + private class RegionScannerHolder extends DelegateRegionScanner { + private final Scan scan; + private final ObserverContext<RegionCoprocessorEnvironment> c; + private boolean wasOverriden; + + public RegionScannerHolder(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan, final RegionScanner scanner) { + super(scanner); + this.c = c; + this.scan = scan; + } + + private void overrideDelegate() throws IOException { + if (wasOverriden) { + return; + } + boolean success = false; + // Save the current span. When done with the child span, reset the span back to + // what it was. Otherwise, this causes the thread local storing the current span + // to not be reset back to null causing catastrophic infinite loops + // and region servers to crash. See https://issues.apache.org/jira/browse/PHOENIX-1596 + // TraceScope can't be used here because closing the scope will end up calling + // currentSpan.stop() and that should happen only when we are closing the scanner. + final Span savedSpan = Trace.currentSpan(); + final Span child = Trace.startSpan(SCANNER_OPENED_TRACE_INFO, savedSpan).getSpan(); + try { + RegionScanner scanner = doPostScannerOpen(c, scan, delegate); + scanner = new DelegateRegionScanner(scanner) { + // This isn't very obvious but close() could be called in a thread + // that is different from the thread that created the scanner. + @Override + public void close() throws IOException { + try { + delegate.close(); + } finally { + if (child != null) { + child.stop(); + } + } + } + }; + this.delegate = scanner; + wasOverriden = true; + success = true; + } catch (Throwable t) { + ServerUtil.throwIOException(c.getEnvironment().getRegionInfo().getRegionNameAsString(), t); + } finally { + try { + if (!success && child != null) { + child.stop(); + } + } finally { + Trace.continueSpan(savedSpan); + } + } + } + + @Override + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { + overrideDelegate(); + return super.next(result, scannerContext); + } + + @Override + public boolean next(List<Cell> result) throws IOException { + overrideDelegate(); + return super.next(result); + } + + @Override + public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { + overrideDelegate(); + return super.nextRaw(result, scannerContext); + } + + @Override + public boolean nextRaw(List<Cell> result) throws IOException { + overrideDelegate(); + return super.nextRaw(result); + } + } + + + /** * Wrapper for {@link #postScannerOpen(ObserverContext, Scan, RegionScanner)} that ensures no non IOException is thrown, * to prevent the coprocessor from becoming blacklisted. * @@ -202,42 +281,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { if (!isRegionObserverFor(scan)) { return s; } - boolean success = false; - // Save the current span. When done with the child span, reset the span back to - // what it was. Otherwise, this causes the thread local storing the current span - // to not be reset back to null causing catastrophic infinite loops - // and region servers to crash. See https://issues.apache.org/jira/browse/PHOENIX-1596 - // TraceScope can't be used here because closing the scope will end up calling - // currentSpan.stop() and that should happen only when we are closing the scanner. - final Span savedSpan = Trace.currentSpan(); - final Span child = Trace.startSpan(SCANNER_OPENED_TRACE_INFO, savedSpan).getSpan(); - try { - RegionScanner scanner = doPostScannerOpen(c, scan, s); - scanner = new DelegateRegionScanner(scanner) { - // This isn't very obvious but close() could be called in a thread - // that is different from the thread that created the scanner. - @Override - public void close() throws IOException { - try { - delegate.close(); - } finally { - if (child != null) { - child.stop(); - } - } - } - }; - success = true; - return scanner; - } finally { - try { - if (!success && child != null) { - child.stop(); - } - } finally { - Trace.continueSpan(savedSpan); - } - } + return new RegionScannerHolder(c, scan, s); } catch (Throwable t) { // If the exception is NotServingRegionException then throw it as // StaleRegionBoundaryCacheException to handle it by phoenix client other wise hbase http://git-wip-us.apache.org/repos/asf/phoenix/blob/66005d1c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java index 43c35a8..0ddabed 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java @@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext; public class DelegateRegionScanner implements RegionScanner { - protected final RegionScanner delegate; + protected RegionScanner delegate; public DelegateRegionScanner(RegionScanner scanner) { this.delegate = scanner;
