Repository: phoenix Updated Branches: refs/heads/4.8-HBase-0.98 6e27b324b -> c87fdfb8d
PHOENIX-1751 Perform aggregations, sorting, etc, in the preScannerNext instead of postScannerOpen (Lars Hofhansl) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c87fdfb8 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c87fdfb8 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c87fdfb8 Branch: refs/heads/4.8-HBase-0.98 Commit: c87fdfb8d63b904b76cfb0998ac389e4b7f31231 Parents: 6e27b32 Author: James Taylor <[email protected]> Authored: Tue Aug 23 12:39:45 2016 -0700 Committer: James Taylor <[email protected]> Committed: Thu Aug 25 11:33:56 2016 -0700 ---------------------------------------------------------------------- .../apache/phoenix/end2end/RenewLeaseIT.java | 92 ++++++++++++++ .../coprocessor/BaseScannerRegionObserver.java | 125 +++++++++++++------ .../coprocessor/DelegateRegionScanner.java | 2 +- 3 files changed, 178 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c87fdfb8/phoenix-core/src/it/java/org/apache/phoenix/end2end/RenewLeaseIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RenewLeaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RenewLeaseIT.java new file mode 100644 index 0000000..12ea073 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RenewLeaseIT.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you maynot use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicablelaw or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import com.google.common.collect.Maps; + + +public class RenewLeaseIT extends BaseOwnClusterHBaseManagedTimeIT { + private static final long RPC_TIMEOUT = 2000; + private static volatile boolean SLEEP_NOW = false; + private static final String TABLE_NAME = "FOO_BAR"; + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1); + serverProps.put("hbase.coprocessor.region.classes", SleepingRegionObserver.class.getName()); + Map<String,String> clientProps = Maps.newHashMapWithExpectedSize(1); + // Must update config before starting server + clientProps.put("hbase.rpc.timeout", Long.toString(RPC_TIMEOUT)); + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); + } + + @Ignore("Requires fix for HBASE-16503") + @Test + public void testLeaseDoesNotTimeout() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(url, props); + conn.createStatement().execute("create table " + TABLE_NAME + "(k VARCHAR PRIMARY KEY)"); + SLEEP_NOW = true; + try { + ResultSet rs = conn.createStatement().executeQuery("select count(*) from " + TABLE_NAME); + assertTrue(rs.next()); + assertEquals(0, rs.getLong(1)); + } finally { + SLEEP_NOW = false; + } + } + + public static class SleepingRegionObserver extends SimpleRegionObserver { + @Override + public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c, + final InternalScanner s, final List<Result> results, + final int limit, final boolean hasMore) throws IOException { + try { + if (SLEEP_NOW && c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString().equals(TABLE_NAME)) { + Thread.sleep(RPC_TIMEOUT * 2); + } + } catch (InterruptedException e) { + throw new IOException(e); + } + return super.preScannerNext(c, s, results, limit, hasMore); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/c87fdfb8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index be4766f..5320971 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; @@ -56,13 +55,12 @@ import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; +import org.apache.tephra.Transaction; import org.cloudera.htrace.Span; import org.cloudera.htrace.Trace; import com.google.common.collect.ImmutableList; -import org.apache.tephra.Transaction; - abstract public class BaseScannerRegionObserver extends BaseRegionObserver { @@ -187,7 +185,89 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { return s; } - /** + private class RegionScannerHolder extends DelegateRegionScanner { + private final Scan scan; + private final ObserverContext<RegionCoprocessorEnvironment> c; + private boolean wasOverriden; + + public RegionScannerHolder(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan, final RegionScanner scanner) { + super(scanner); + this.c = c; + this.scan = scan; + } + + private void overrideDelegate() throws IOException { + if (wasOverriden) { + return; + } + boolean success = false; + // Save the current span. When done with the child span, reset the span back to + // what it was. Otherwise, this causes the thread local storing the current span + // to not be reset back to null causing catastrophic infinite loops + // and region servers to crash. See https://issues.apache.org/jira/browse/PHOENIX-1596 + // TraceScope can't be used here because closing the scope will end up calling + // currentSpan.stop() and that should happen only when we are closing the scanner. + final Span savedSpan = Trace.currentSpan(); + final Span child = Trace.startSpan(SCANNER_OPENED_TRACE_INFO, savedSpan).getSpan(); + try { + RegionScanner scanner = doPostScannerOpen(c, scan, delegate); + scanner = new DelegateRegionScanner(scanner) { + // This isn't very obvious but close() could be called in a thread + // that is different from the thread that created the scanner. + @Override + public void close() throws IOException { + try { + delegate.close(); + } finally { + if (child != null) { + child.stop(); + } + } + } + }; + this.delegate = scanner; + wasOverriden = true; + success = true; + } catch (Throwable t) { + ServerUtil.throwIOException(c.getEnvironment().getRegionInfo().getRegionNameAsString(), t); + } finally { + try { + if (!success && child != null) { + child.stop(); + } + } finally { + Trace.continueSpan(savedSpan); + } + } + } + + @Override + public boolean next(List<Cell> result, int maxRows) throws IOException { + overrideDelegate(); + return super.next(result, maxRows); + } + + @Override + public boolean next(List<Cell> result) throws IOException { + overrideDelegate(); + return super.next(result); + } + + @Override + public boolean nextRaw(List<Cell> result, int maxRows) throws IOException { + overrideDelegate(); + return super.nextRaw(result, maxRows); + } + + @Override + public boolean nextRaw(List<Cell> result) throws IOException { + overrideDelegate(); + return super.nextRaw(result); + } + } + + + /** * Wrapper for {@link #postScannerOpen(ObserverContext, Scan, RegionScanner)} that ensures no non IOException is thrown, * to prevent the coprocessor from becoming blacklisted. * @@ -200,42 +280,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { if (!isRegionObserverFor(scan)) { return s; } - boolean success = false; - // Save the current span. When done with the child span, reset the span back to - // what it was. Otherwise, this causes the thread local storing the current span - // to not be reset back to null causing catastrophic infinite loops - // and region servers to crash. See https://issues.apache.org/jira/browse/PHOENIX-1596 - // TraceScope can't be used here because closing the scope will end up calling - // currentSpan.stop() and that should happen only when we are closing the scanner. - final Span savedSpan = Trace.currentSpan(); - final Span child = Trace.startSpan(SCANNER_OPENED_TRACE_INFO, savedSpan).getSpan(); - try { - RegionScanner scanner = doPostScannerOpen(c, scan, s); - scanner = new DelegateRegionScanner(scanner) { - // This isn't very obvious but close() could be called in a thread - // that is different from the thread that created the scanner. - @Override - public void close() throws IOException { - try { - delegate.close(); - } finally { - if (child != null) { - child.stop(); - } - } - } - }; - success = true; - return scanner; - } finally { - try { - if (!success && child != null) { - child.stop(); - } - } finally { - Trace.continueSpan(savedSpan); - } - } + return new RegionScannerHolder(c, scan, s); } catch (Throwable t) { // If the exception is NotServingRegionException then throw it as // StaleRegionBoundaryCacheException to handle it by phoenix client other wise hbase http://git-wip-us.apache.org/repos/asf/phoenix/blob/c87fdfb8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java index f88a931..8cb6dac 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java @@ -25,7 +25,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner; public class DelegateRegionScanner implements RegionScanner { - protected final RegionScanner delegate; + protected RegionScanner delegate; public DelegateRegionScanner(RegionScanner scanner) { this.delegate = scanner;
