PHOENIX-2357 Prevent queries from failing because of scanners running into lease expiration
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5fb51ca5 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5fb51ca5 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5fb51ca5 Branch: refs/heads/4.x-HBase-1.0 Commit: 5fb51ca5ebd97493e299ffc1e3d7c35fb68be612 Parents: 7f555aa Author: Samarth <[email protected]> Authored: Tue Jan 5 23:05:06 2016 -0800 Committer: Samarth <[email protected]> Committed: Tue Jan 5 23:05:06 2016 -0800 ---------------------------------------------------------------------- .../DelayedTableResultIteratorFactory.java | 71 +++++++ .../iterate/MockParallelIteratorFactory.java | 2 +- .../phoenix/iterate/MockResultIterator.java | 66 +++++++ .../iterate/MockTableResultIterator.java | 66 ------- .../iterate/RenewLeaseOnlyTableIterator.java | 63 ++++++ .../phoenix/iterate/ScannerLeaseRenewalIT.java | 176 +++++++++++++++++ .../phoenix/iterate/BaseResultIterators.java | 2 +- .../phoenix/iterate/ChunkedResultIterator.java | 3 +- .../DefaultTableResultIteratorFactory.java | 35 ++++ .../phoenix/iterate/ParallelIterators.java | 11 +- .../phoenix/iterate/ScanningResultIterator.java | 19 +- .../apache/phoenix/iterate/SerialIterators.java | 7 +- .../phoenix/iterate/TableResultIterator.java | 147 +++++++++----- .../iterate/TableResultIteratorFactory.java | 29 +++ .../apache/phoenix/jdbc/PhoenixConnection.java | 40 +++- .../phoenix/jdbc/PhoenixDatabaseMetaData.java | 3 +- .../phoenix/mapreduce/PhoenixRecordReader.java | 3 +- .../phoenix/query/ConnectionQueryServices.java | 8 +- .../query/ConnectionQueryServicesImpl.java | 196 +++++++++++++++++-- .../query/ConnectionlessQueryServicesImpl.java | 10 + .../query/DelegateConnectionQueryServices.java | 11 ++ .../org/apache/phoenix/query/QueryServices.java | 5 + .../phoenix/query/QueryServicesOptions.java | 25 ++- .../java/org/apache/phoenix/query/BaseTest.java | 2 +- .../phoenix/query/QueryServicesTestImpl.java | 4 +- .../phoenix/query/ScannerLeaseRenewalTest.java | 132 +++++++++++++ .../apache/phoenix/util/MetaDataUtilTest.java | 1 + pom.xml | 2 +- 28 files changed, 974 insertions(+), 165 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java new file mode 100644 index 0000000..6545a45 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import java.sql.SQLException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.monitoring.CombinableMetric; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.tuple.Tuple; + +public class DelayedTableResultIteratorFactory implements TableResultIteratorFactory { + + private final long delay; + + public DelayedTableResultIteratorFactory(long delay) { + this.delay = delay; + } + + @Override + public TableResultIterator newIterator(StatementContext context, TableRef tableRef, Scan scan, + CombinableMetric scanMetrics, long renewLeaseThreshold) throws SQLException { + return new DelayedTableResultIterator(context, tableRef, scan, scanMetrics, renewLeaseThreshold); + } + + private class DelayedTableResultIterator extends TableResultIterator { + public DelayedTableResultIterator (StatementContext context, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold) throws SQLException { + super(context.getConnection().getMutationState(), tableRef, scan, scanMetrics, renewLeaseThreshold); + } + + @Override + public synchronized void initScanner() throws SQLException { + super.initScanner(); + } + + @Override + public Tuple next() throws SQLException { + delay(); + return super.next(); + } + + private void delay() { + try { + new CountDownLatch(1).await(delay, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java index 1161bae..d8a08e6 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java @@ -34,7 +34,7 @@ public class MockParallelIteratorFactory implements ParallelIteratorFactory { @Override public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName) throws SQLException { - return new MockTableResultIterator(String.valueOf(counter.incrementAndGet()), table); + return new MockResultIterator(String.valueOf(counter.incrementAndGet()), table); } public void setTable(PTable table) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockResultIterator.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockResultIterator.java new file mode 100644 index 0000000..b26dfcf --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockResultIterator.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.tuple.ResultTuple; +import org.apache.phoenix.schema.tuple.Tuple; + +/** + * Mock result iterator that returns its id as a string in a {@code Tuple} when {@link #next()} and {@link #peek()} are called. + */ +public class MockResultIterator implements PeekingResultIterator { + + private final Tuple tuple; + + public MockResultIterator(String id, PTable table) { + TupleProjector projector = new TupleProjector(table); + List<Cell> result = new ArrayList<>(); + result.add(new KeyValue(Bytes.toBytes(id), SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(id))); + this.tuple = projector.projectResults(new ResultTuple(Result.create(result))); + } + + @Override + public Tuple next() throws SQLException { + return tuple; + } + + @Override + public void explain(List<String> planSteps) {} + + @Override + public void close() throws SQLException {} + + @Override + public Tuple peek() throws SQLException { + return tuple; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockTableResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockTableResultIterator.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockTableResultIterator.java deleted file mode 100644 index f0da101..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockTableResultIterator.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.iterate; - -import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; -import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; - -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.execute.TupleProjector; -import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.tuple.ResultTuple; -import org.apache.phoenix.schema.tuple.Tuple; - -/** - * Mock result iterator that returns its id as a string in a {@code Tuple} when {@link #next()} and {@link #peek()} are called. - */ -public class MockTableResultIterator implements PeekingResultIterator { - - private final Tuple tuple; - - public MockTableResultIterator(String id, PTable table) { - TupleProjector projector = new TupleProjector(table); - List<Cell> result = new ArrayList<>(); - result.add(new KeyValue(Bytes.toBytes(id), SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(id))); - this.tuple = projector.projectResults(new ResultTuple(Result.create(result))); - } - - @Override - public Tuple next() throws SQLException { - return tuple; - } - - @Override - public void explain(List<String> planSteps) {} - - @Override - public void close() throws SQLException {} - - @Override - public Tuple peek() throws SQLException { - return tuple; - } - -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/it/java/org/apache/phoenix/iterate/RenewLeaseOnlyTableIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/RenewLeaseOnlyTableIterator.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RenewLeaseOnlyTableIterator.java new file mode 100644 index 0000000..5fa4126 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RenewLeaseOnlyTableIterator.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.CLOSED; +import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NOT_RENEWED; +import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED; +import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED; + +import java.sql.SQLException; + +public class RenewLeaseOnlyTableIterator extends TableResultIterator { + + private final int numberOfLeaseRenewals; + private final int thresholdNotReachedAt; + private final int doNotRenewLeaseAt; + private int counter = 0; + private RenewLeaseStatus lastRenewLeaseStatus; + + public RenewLeaseOnlyTableIterator(int renewLeaseCount, int skipRenewLeaseAt, int doNotRenewLeaseAt) throws SQLException { + super(); + checkArgument(renewLeaseCount >= skipRenewLeaseAt); + this.numberOfLeaseRenewals = renewLeaseCount; + this.thresholdNotReachedAt = skipRenewLeaseAt; + this.doNotRenewLeaseAt = doNotRenewLeaseAt; + } + + @Override + public RenewLeaseStatus renewLease() { + counter++; + if (counter == thresholdNotReachedAt) { + lastRenewLeaseStatus = THRESHOLD_NOT_REACHED; + } else if (counter == doNotRenewLeaseAt) { + lastRenewLeaseStatus = NOT_RENEWED; + } else if (counter <= numberOfLeaseRenewals) { + lastRenewLeaseStatus = RENEWED; + } else { + lastRenewLeaseStatus = CLOSED; + } + return lastRenewLeaseStatus; + } + + public RenewLeaseStatus getLastRenewLeaseStatus() { + return lastRenewLeaseStatus; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/it/java/org/apache/phoenix/iterate/ScannerLeaseRenewalIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/ScannerLeaseRenewalIT.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/ScannerLeaseRenewalIT.java new file mode 100644 index 0000000..7b42a77 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/ScannerLeaseRenewalIT.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; +import static org.apache.phoenix.query.QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB; +import static org.apache.phoenix.query.QueryServices.RENEW_LEASE_ENABLED; +import static org.apache.phoenix.query.QueryServices.RENEW_LEASE_THREAD_POOL_SIZE; +import static org.apache.phoenix.query.QueryServices.RENEW_LEASE_THRESHOLD_MILLISECONDS; +import static org.apache.phoenix.query.QueryServices.RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Properties; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.query.ConnectionQueryServices.Feature; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.PropertiesUtil; +import org.junit.AfterClass; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(NeedsOwnMiniClusterTest.class) +public class ScannerLeaseRenewalIT { + private static HBaseTestingUtility hbaseTestUtil; + private static String zkQuorum; + // If tests are failing because of scanner expiration errors in mini cluster startup, + // increase this timeout. It would end up increasing the duration the tests run too, though. + private static final long LEASE_TIMEOUT_PERIOD_MILLIS = + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD / 10; + private static String url; + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = HBaseConfiguration.create(); + hbaseTestUtil = new HBaseTestingUtility(conf); + conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, LEASE_TIMEOUT_PERIOD_MILLIS); + hbaseTestUtil.startMiniCluster(); + // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver + zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort(); + url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum; + + Properties driverProps = PropertiesUtil.deepCopy(TEST_PROPERTIES); + driverProps.put(RENEW_LEASE_THREAD_POOL_SIZE, Long.toString(4)); + + // if this property is false, tests will fail with UnknownScannerException errors. + driverProps.put(RENEW_LEASE_ENABLED, Boolean.toString(true)); + + driverProps.put(RENEW_LEASE_THRESHOLD_MILLISECONDS, + Long.toString(LEASE_TIMEOUT_PERIOD_MILLIS / 2)); + driverProps.put(RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS, + Long.toString(LEASE_TIMEOUT_PERIOD_MILLIS / 4)); + driverProps.put(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + Long.toString(LEASE_TIMEOUT_PERIOD_MILLIS)); + // use round robin iterator + driverProps.put(FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false)); + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + try (PhoenixConnection phxConn = DriverManager.getConnection(url, driverProps).unwrap(PhoenixConnection.class)) { + // run test methods only if we are at the hbase version that supports lease renewal. + Assume.assumeTrue(phxConn.getQueryServices().supportsFeature(Feature.RENEW_LEASE)); + } + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + try { + DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); + } finally { + hbaseTestUtil.shutdownMiniCluster(); + } + } + + @Test + public void testRenewLeasePreventsSelectQueryFromFailing() throws Exception { + String tableName = "testRenewLeasePreventsSelectQueryFromFailing"; + int numRecords = 5; + try (Connection conn = DriverManager.getConnection(url)) { + conn.createStatement().execute( + "CREATE TABLE " + tableName + " (PK1 INTEGER NOT NULL PRIMARY KEY, KV1 VARCHAR)"); + int i = 0; + String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)"; + Random random = new Random(); + PreparedStatement stmt = conn.prepareStatement(upsert); + while (i < numRecords) { + stmt.setInt(1, random.nextInt()); + stmt.setString(2, "KV" + random.nextInt()); + stmt.executeUpdate(); + i++; + } + conn.commit(); + } + + try (PhoenixConnection phxConn = + DriverManager.getConnection(url).unwrap(PhoenixConnection.class)) { + String sql = "SELECT * FROM " + tableName; + // at every next call wait for this period. This will cause lease to expire. + long delayOnNext = 2 * LEASE_TIMEOUT_PERIOD_MILLIS; + phxConn.setTableResultIteratorFactory(new DelayedTableResultIteratorFactory(delayOnNext)); + Statement s = phxConn.createStatement(); + s.setFetchSize(2); + ResultSet rs = s.executeQuery(sql); + int count = 0; + while (rs.next()) { + count++; + } + assertEquals(numRecords, count); + } + } + + @Test + public void testRenewLeasePreventsUpsertSelectFromFailing() throws Exception { + String table1 = "testRenewLeasePreventsUpsertSelectFromFailing"; + String table2 = "testRenewLeasePreventsUpsertSelectFromFailing2"; + + try (Connection conn = DriverManager.getConnection(url)) { + conn.createStatement().execute( + "CREATE TABLE " + table1 + " (PK1 INTEGER NOT NULL PRIMARY KEY, KV1 VARCHAR)"); + conn.createStatement().execute( + "CREATE TABLE " + table2 + " (PK1 INTEGER NOT NULL PRIMARY KEY, KV1 VARCHAR)"); + int numRecords = 5; + int i = 0; + String upsert = "UPSERT INTO " + table1 + " VALUES (?, ?)"; + Random random = new Random(); + PreparedStatement stmt = conn.prepareStatement(upsert); + while (i < numRecords) { + stmt.setInt(1, random.nextInt()); + stmt.setString(2, "KV" + random.nextInt()); + stmt.executeUpdate(); + i++; + } + conn.commit(); + } + + try (PhoenixConnection phxConn = + DriverManager.getConnection(url).unwrap(PhoenixConnection.class)) { + String upsertSelect = "UPSERT INTO " + table2 + " SELECT PK1, KV1 FROM " + table1; + // at every next call wait for this period. This will cause lease to expire. + long delayAfterInit = 2 * LEASE_TIMEOUT_PERIOD_MILLIS; + phxConn.setTableResultIteratorFactory(new DelayedTableResultIteratorFactory( + delayAfterInit)); + Statement s = phxConn.createStatement(); + s.setFetchSize(2); + s.executeUpdate(upsertSelect); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 715e7c4..519b38f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -788,7 +788,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result abstract protected String getName(); abstract protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, - Queue<PeekingResultIterator> allIterators, int estFlattenedSize); + Queue<PeekingResultIterator> allIterators, int estFlattenedSize) throws SQLException; @Override public int size() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java index ef8c4a6..a17258a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java @@ -126,8 +126,9 @@ public class ChunkedResultIterator implements PeekingResultIterator { scan.setStartRow(ByteUtil.copyKeyBytesIfNecessary(lastKey)); if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Get next chunked result iterator over " + tableRef.getTable().getName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan))); String tableName = tableRef.getTable().getPhysicalName().getString(); + long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds(); ResultIterator singleChunkResultIterator = new SingleChunkResultIterator( - new TableResultIterator(mutationState, tableRef, scan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName)), chunkSize); + new TableResultIterator(mutationState, tableRef, scan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), renewLeaseThreshold), chunkSize); resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName); } return resultIterator; http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java new file mode 100644 index 0000000..6a0c78f --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import java.sql.SQLException; + +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.monitoring.CombinableMetric; +import org.apache.phoenix.schema.TableRef; + +public class DefaultTableResultIteratorFactory implements TableResultIteratorFactory { + + @Override + public TableResultIterator newIterator(StatementContext context, TableRef tableRef, Scan scan, + CombinableMetric scanMetrics, long renewLeaseThreshold) throws SQLException { + return new TableResultIterator(context.getConnection().getMutationState(), tableRef, scan, scanMetrics, renewLeaseThreshold); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index 265831c..64c97c7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -67,7 +67,7 @@ public class ParallelIterators extends BaseResultIterators { @Override protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, - final Queue<PeekingResultIterator> allIterators, int estFlattenedSize) { + final Queue<PeekingResultIterator> allIterators, int estFlattenedSize) throws SQLException { // Pre-populate nestedFutures lists so that we can shuffle the scans // and add the future to the right nested list. By shuffling the scans // we get better utilization of the cluster since our thread executor @@ -93,24 +93,25 @@ public class ParallelIterators extends BaseResultIterators { int numScans = scanLocations.size(); context.getOverallQueryMetrics().updateNumParallelScans(numScans); GLOBAL_NUM_PARALLEL_SCANS.update(numScans); + final long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds(); for (ScanLocator scanLocation : scanLocations) { final Scan scan = scanLocation.getScan(); final CombinableMetric scanMetrics = readMetrics.allotMetric(MetricType.SCAN_BYTES, physicalTableName); final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(readMetrics, physicalTableName); + final TableResultIterator tableResultItr = context.getConnection().getTableResultIteratorFactory().newIterator(context, tableRef, scan, scanMetrics, renewLeaseThreshold); + context.getConnection().addIterator(tableResultItr); Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() { @Override public PeekingResultIterator call() throws Exception { long startTime = System.currentTimeMillis(); - ResultIterator scanner = new TableResultIterator(mutationState, tableRef, scan, scanMetrics); + tableResultItr.initScanner(); if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan))); } - PeekingResultIterator iterator = iteratorFactory.newIterator(context, scanner, scan, physicalTableName); - + PeekingResultIterator iterator = iteratorFactory.newIterator(context, tableResultItr, scan, physicalTableName); // Fill the scanner's cache. This helps reduce latency since we are parallelizing the I/O needed. iterator.peek(); - allIterators.add(iterator); return iterator; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java index 604e1a7..7f865ed 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java @@ -38,12 +38,12 @@ import org.apache.phoenix.util.ServerUtil; public class ScanningResultIterator implements ResultIterator { private final ResultScanner scanner; private final CombinableMetric scanMetrics; - + public ScanningResultIterator(ResultScanner scanner, CombinableMetric scanMetrics) { this.scanner = scanner; this.scanMetrics = scanMetrics; } - + @Override public void close() throws SQLException { scanner.close(); @@ -70,11 +70,11 @@ public class ScanningResultIterator implements ResultIterator { public void explain(List<String> planSteps) { } - @Override - public String toString() { - return "ScanningResultIterator [scanner=" + scanner + "]"; - } - + @Override + public String toString() { + return "ScanningResultIterator [scanner=" + scanner + "]"; + } + private void calculateScanSize(Result result) { if (GlobalClientMetrics.isMetricsEnabled() || scanMetrics != NoOpRequestMetric.INSTANCE) { if (result != null) { @@ -89,4 +89,9 @@ public class ScanningResultIterator implements ResultIterator { } } } + + + public ResultScanner getScanner() { + return scanner; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java index e0936b2..a221ba2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java @@ -29,7 +29,7 @@ import java.util.concurrent.Future; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.QueryPlan; -import org.apache.phoenix.iterate.TableResultIterator.ScannerCreation; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.job.JobManager.JobCallable; import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder; import org.apache.phoenix.trace.util.Tracing; @@ -75,12 +75,15 @@ public class SerialIterators extends BaseResultIterators { overallScan.setStopRow(lastScan.getStopRow()); final String tableName = tableRef.getTable().getPhysicalName().getString(); final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(context.getReadMetricsQueue(), tableName); + final PhoenixConnection conn = context.getConnection(); + final long renewLeaseThreshold = conn.getQueryServices().getRenewLeaseThresholdMilliSeconds(); Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() { @Override public PeekingResultIterator call() throws Exception { List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(scans.size()); for (final Scan scan : scans) { - ResultIterator scanner = new TableResultIterator(mutationState, tableRef, scan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), ScannerCreation.DELAYED); + TableResultIterator scanner = new TableResultIterator(mutationState, tableRef, scan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), renewLeaseThreshold); + conn.addIterator(scanner); concatIterators.add(iteratorFactory.newIterator(context, scanner, scan, tableName)); } PeekingResultIterator concatIterator = ConcatResultIterator.newIterator(concatIterators); http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index 7d07a4a..d97a535 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -17,22 +17,31 @@ */ package org.apache.phoenix.iterate; +import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.CLOSED; +import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NOT_RENEWED; +import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED; +import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED; +import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.UNINITIALIZED; + import java.io.IOException; import java.sql.SQLException; import java.util.List; +import javax.annotation.concurrent.GuardedBy; + +import org.apache.hadoop.hbase.client.AbstractClientScanner; import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.execute.MutationState; -import org.apache.phoenix.schema.PTable; import org.apache.phoenix.monitoring.CombinableMetric; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.ServerUtil; +import com.google.common.annotations.VisibleForTesting; + /** * @@ -43,84 +52,116 @@ import org.apache.phoenix.util.ServerUtil; * @since 0.1 */ public class TableResultIterator implements ResultIterator { - public enum ScannerCreation {IMMEDIATE, DELAYED}; - private final Scan scan; private final HTableInterface htable; - private volatile ResultIterator delegate; private final CombinableMetric scanMetrics; - - public TableResultIterator(MutationState mutationState, Scan scan, TableRef tableRef, CombinableMetric scanMetrics) throws SQLException { - this(mutationState, tableRef, scan, scanMetrics); - } + private static final ResultIterator UNINITIALIZED_SCANNER = ResultIterator.EMPTY_ITERATOR; + private final long renewLeaseThreshold; - /* - * Delay the creation of the underlying HBase ResultScanner if creationMode is DELAYED. - * Though no rows are returned when the scanner is created, it still makes several RPCs - * to open the scanner. In queries run serially (i.e. SELECT ... LIMIT 1), we do not - * want to be hit with this cost when it's likely we'll never execute those scanners. - */ - private ResultIterator getDelegate(boolean isClosing) throws SQLException { - ResultIterator delegate = this.delegate; - if (delegate == null) { - synchronized (this) { - delegate = this.delegate; - if (delegate == null) { - try { - this.delegate = delegate = isClosing ? ResultIterator.EMPTY_ITERATOR : new ScanningResultIterator(htable.getScanner(scan), scanMetrics); - } catch (IOException e) { - Closeables.closeQuietly(htable); - throw ServerUtil.parseServerException(e); - } - } - } - } - return delegate; - } - - public TableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics) throws SQLException { - this(mutationState, tableRef, scan, scanMetrics, ScannerCreation.IMMEDIATE); + @GuardedBy("this") + private ResultIterator scanIterator; + + @GuardedBy("this") + private boolean closed = false; + + @GuardedBy("this") + private long renewLeaseTime = 0; + + @VisibleForTesting // Exposed for testing. DON'T USE ANYWHERE ELSE! + TableResultIterator() { + this.scanMetrics = null; + this.renewLeaseThreshold = 0; + this.htable = null; + this.scan = null; } - public TableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, ScannerCreation creationMode) throws SQLException { + public static enum RenewLeaseStatus { + RENEWED, CLOSED, UNINITIALIZED, THRESHOLD_NOT_REACHED, NOT_RENEWED + }; + + + public TableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold) throws SQLException { this.scan = scan; this.scanMetrics = scanMetrics; PTable table = tableRef.getTable(); htable = mutationState.getHTable(table); - if (creationMode == ScannerCreation.IMMEDIATE) { - getDelegate(false); - } + this.scanIterator = UNINITIALIZED_SCANNER; + this.renewLeaseThreshold = renewLeaseThreshold; } @Override - public void close() throws SQLException { + public synchronized void close() throws SQLException { + closed = true; // ok to say closed even if the below code throws an exception try { - getDelegate(true).close(); + scanIterator.close(); } finally { try { + scanIterator = UNINITIALIZED_SCANNER; htable.close(); } catch (IOException e) { throw ServerUtil.parseServerException(e); } } } + + @Override + public synchronized Tuple next() throws SQLException { + initScanner(); + Tuple t = scanIterator.next(); + return t; + } + + public synchronized void initScanner() throws SQLException { + if (closed) { + return; + } + ResultIterator delegate = this.scanIterator; + if (delegate == UNINITIALIZED_SCANNER) { + try { + this.scanIterator = + new ScanningResultIterator(htable.getScanner(scan), scanMetrics); + } catch (IOException e) { + Closeables.closeQuietly(htable); + throw ServerUtil.parseServerException(e); + } + } + } @Override - public Tuple next() throws SQLException { - return getDelegate(false).next(); + public String toString() { + return "TableResultIterator [htable=" + htable + ", scan=" + scan + "]"; + } + + public synchronized RenewLeaseStatus renewLease() { + if (closed) { + return CLOSED; + } + if (scanIterator == UNINITIALIZED_SCANNER) { + return UNINITIALIZED; + } + long delay = now() - renewLeaseTime; + if (delay < renewLeaseThreshold) { + return THRESHOLD_NOT_REACHED; + } + if (scanIterator instanceof ScanningResultIterator + && ((ScanningResultIterator)scanIterator).getScanner() instanceof AbstractClientScanner) { + // Need this explicit cast because HBase's ResultScanner doesn't have this method exposed. + boolean leaseRenewed = ((AbstractClientScanner)((ScanningResultIterator)scanIterator).getScanner()).renewLease(); + if (leaseRenewed) { + renewLeaseTime = now(); + return RENEWED; + } + } + return NOT_RENEWED; + } + + private static long now() { + return System.currentTimeMillis(); } @Override public void explain(List<String> planSteps) { - try { - getDelegate(false).explain(planSteps); - } catch (SQLException e) { - throw new RuntimeException(e); - } + scanIterator.explain(planSteps); } - @Override - public String toString() { - return "TableResultIterator [htable=" + htable + ", scan=" + scan + "]"; - } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java new file mode 100644 index 0000000..083daf1 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import java.sql.SQLException; + +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.monitoring.CombinableMetric; +import org.apache.phoenix.schema.TableRef; + +public interface TableResultIteratorFactory { + public TableResultIterator newIterator(StatementContext context, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold) throws SQLException; +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index d547fb4..b7bfc34 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -108,8 +108,14 @@ import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.Lists; import co.cask.tephra.TransactionContext; - - +import java.lang.ref.WeakReference; +import java.util.concurrent.LinkedBlockingQueue; +import javax.annotation.Nonnull; +import org.apache.phoenix.iterate.DefaultTableResultIteratorFactory; +import org.apache.phoenix.iterate.TableResultIterator; +import org.apache.phoenix.iterate.TableResultIteratorFactory; +import com.google.common.annotations.VisibleForTesting; +import static com.google.common.base.Preconditions.checkNotNull; /** * * JDBC Connection implementation of Phoenix. @@ -141,7 +147,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea private final String timestampPattern; private int statementExecutionCounter; private TraceScope traceScope = null; - private boolean isClosed = false; + private volatile boolean isClosed = false; private Sampler<?> sampler; private boolean readOnly = false; private Consistency consistency = Consistency.STRONG; @@ -150,6 +156,9 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea private final boolean isDescVarLengthRowKeyUpgrade; private ParallelIteratorFactory parallelIteratorFactory; + private final LinkedBlockingQueue<WeakReference<TableResultIterator>> scannerQueue; + private TableResultIteratorFactory tableResultIteratorFactory; + static { Tracing.addTraceMetricsSource(); } @@ -278,6 +287,8 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea // setup tracing, if its enabled this.sampler = Tracing.getConfiguredSampler(this); this.customTracingAnnotations = getImmutableCustomTracingAnnotations(); + this.scannerQueue = new LinkedBlockingQueue<>(); + this.tableResultIteratorFactory = new DefaultTableResultIteratorFactory(); } private static void checkScn(Long scnParam) throws SQLException { @@ -1002,4 +1013,27 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea public void setIteratorFactory(ParallelIteratorFactory parallelIteratorFactory) { this.parallelIteratorFactory = parallelIteratorFactory; } + + public void addIterator(@Nonnull TableResultIterator itr) { + if (services.supportsFeature(ConnectionQueryServices.Feature.RENEW_LEASE)) { + checkNotNull(itr); + scannerQueue.add(new WeakReference<TableResultIterator>(itr)); + } + } + + public LinkedBlockingQueue<WeakReference<TableResultIterator>> getScanners() { + return scannerQueue; + } + + @VisibleForTesting + @Nonnull + public TableResultIteratorFactory getTableResultIteratorFactory() { + return tableResultIteratorFactory; + } + + @VisibleForTesting + public void setTableResultIteratorFactory(TableResultIteratorFactory factory) { + checkNotNull(factory); + this.tableResultIteratorFactory = factory; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index a4748b3..21bd591 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -288,7 +288,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { private final ResultSet emptyResultSet; public static final int MAX_LOCAL_SI_VERSION_DISALLOW = VersionUtil.encodeVersion("0", "98", "8"); public static final int MIN_LOCAL_SI_VERSION_DISALLOW = VersionUtil.encodeVersion("0", "98", "6"); - + public static final int MIN_RENEW_LEASE_VERSION = VersionUtil.encodeVersion("1", "0", "4"); + // Version below which we should turn off essential column family. public static final int ESSENTIAL_FAMILY_VERSION_THRESHOLD = VersionUtil.encodeVersion("0", "94", "7"); // Version below which we should disallow usage of mutable secondary indexing. http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java index d2f91b3..2df76fc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java @@ -107,9 +107,10 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null StatementContext ctx = queryPlan.getContext(); ReadMetricQueue readMetrics = ctx.getReadMetricsQueue(); String tableName = queryPlan.getTableRef().getTable().getPhysicalName().getString(); + long renewScannerLeaseThreshold = queryPlan.getContext().getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds(); for (Scan scan : scans) { final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext().getConnection().getMutationState(), - queryPlan.getTableRef(), scan, readMetrics.allotMetric(SCAN_BYTES, tableName)); + queryPlan.getTableRef(), scan, readMetrics.allotMetric(SCAN_BYTES, tableName), renewScannerLeaseThreshold); PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator); iterators.add(peekingResultIterator); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java index d839fa3..25d7ff4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java @@ -23,6 +23,8 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import co.cask.tephra.TransactionSystemClient; + import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -44,8 +46,6 @@ import org.apache.phoenix.schema.SequenceAllocation; import org.apache.phoenix.schema.SequenceKey; import org.apache.phoenix.schema.stats.PTableStats; -import co.cask.tephra.TransactionSystemClient; - public interface ConnectionQueryServices extends QueryServices, MetaDataMutated { public static final int INITIAL_META_DATA_TABLE_CAPACITY = 100; @@ -108,7 +108,7 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated */ public KeyValueBuilder getKeyValueBuilder(); - public enum Feature {LOCAL_INDEX}; + public enum Feature {LOCAL_INDEX, RENEW_LEASE}; public boolean supportsFeature(Feature feature); public String getUserName(); @@ -120,4 +120,6 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated public int getSequenceSaltBuckets(); TransactionSystemClient getTransactionSystemClient(); + public long getRenewLeaseThresholdMilliSeconds(); + public boolean isRenewingLeasesEnabled(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index daa7809..0472d3a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -16,16 +16,21 @@ * limitations under the License. */ package org.apache.phoenix.query; - +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.hadoop.hbase.HColumnDescriptor.TTL; import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERSION; import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION; import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_ENABLED; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS; import static org.apache.phoenix.util.UpgradeUtil.upgradeTo4_5_0; import java.io.IOException; +import java.lang.ref.WeakReference; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; @@ -35,17 +40,31 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; +import java.util.Random; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.concurrent.GuardedBy; +import co.cask.tephra.TransactionSystemClient; +import co.cask.tephra.TxConstants; +import co.cask.tephra.distributed.PooledClientProvider; +import co.cask.tephra.distributed.TransactionServiceClient; +import co.cask.tephra.hbase10.coprocessor.TransactionProcessor; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -117,6 +136,8 @@ import org.apache.phoenix.hbase.index.util.VersionUtil; import org.apache.phoenix.index.PhoenixIndexBuilder; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.index.PhoenixTransactionalIndexer; +import org.apache.phoenix.iterate.TableResultIterator; +import org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; @@ -173,27 +194,17 @@ import org.apache.twill.zookeeper.ZKClients; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import co.cask.tephra.TransactionSystemClient; -import co.cask.tephra.TxConstants; -import co.cask.tephra.distributed.PooledClientProvider; -import co.cask.tephra.distributed.TransactionServiceClient; -import co.cask.tephra.hbase10.coprocessor.TransactionProcessor; - +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Throwables; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import co.cask.tephra.TransactionSystemClient; -import co.cask.tephra.TxConstants; -import co.cask.tephra.distributed.PooledClientProvider; -import co.cask.tephra.distributed.TransactionServiceClient; -import co.cask.tephra.hbase10.coprocessor.TransactionProcessor; - public class ConnectionQueryServicesImpl extends DelegateQueryServices implements ConnectionQueryServices { private static final Logger logger = LoggerFactory.getLogger(ConnectionQueryServicesImpl.class); private static final int INITIAL_CHILD_SERVICES_CAPACITY = 100; @@ -235,6 +246,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // setting this member variable guarded by "connectionCountLock" private volatile ConcurrentMap<SequenceKey,Sequence> sequenceMap = Maps.newConcurrentMap(); private KeyValueBuilder kvBuilder; + private final int renewLeaseTaskFrequency; + private final int renewLeasePoolSize; + private final int renewLeaseThreshold; + // List of queues instead of a single queue to provide reduced contention via lock striping + private final List<LinkedBlockingQueue<WeakReference<PhoenixConnection>>> connectionQueues; + private ScheduledExecutorService renewLeaseExecutor; + private final boolean renewLeaseEnabled; + private static interface FeatureSupported { boolean isSupported(ConnectionQueryServices services); @@ -247,7 +266,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement int hbaseVersion = services.getLowestClusterHBaseVersion(); return hbaseVersion < PhoenixDatabaseMetaData.MIN_LOCAL_SI_VERSION_DISALLOW || hbaseVersion > PhoenixDatabaseMetaData.MAX_LOCAL_SI_VERSION_DISALLOW; } - }); + }, + Feature.RENEW_LEASE, new FeatureSupported() { + @Override + public boolean isSupported(ConnectionQueryServices services) { + int hbaseVersion = services.getLowestClusterHBaseVersion(); + return hbaseVersion >= PhoenixDatabaseMetaData.MIN_RENEW_LEASE_VERSION; + } + }); private PMetaData newEmptyMetaData() { long maxSizeBytes = props.getLong(QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB, @@ -301,6 +327,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement .expireAfterWrite(halfStatsUpdateFreq, TimeUnit.MILLISECONDS) .build(); this.returnSequenceValues = props.getBoolean(QueryServices.RETURN_SEQUENCE_VALUES_ATTRIB, QueryServicesOptions.DEFAULT_RETURN_SEQUENCE_VALUES); + this.renewLeaseEnabled = config.getBoolean(RENEW_LEASE_ENABLED, DEFAULT_RENEW_LEASE_ENABLED); + this.renewLeasePoolSize = config.getInt(RENEW_LEASE_THREAD_POOL_SIZE, DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE); + this.renewLeaseThreshold = config.getInt(RENEW_LEASE_THRESHOLD_MILLISECONDS, DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS); + this.renewLeaseTaskFrequency = config.getInt(RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS, DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS); + List<LinkedBlockingQueue<WeakReference<PhoenixConnection>>> list = Lists.newArrayListWithCapacity(renewLeasePoolSize); + for (int i = 0; i < renewLeasePoolSize; i++) { + LinkedBlockingQueue<WeakReference<PhoenixConnection>> queue = new LinkedBlockingQueue<WeakReference<PhoenixConnection>>(); + list.add(queue); + } + connectionQueues = ImmutableList.copyOf(list); } @Override @@ -415,6 +451,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } finally { try { childServices.clear(); + if (renewLeaseExecutor != null) { + renewLeaseExecutor.shutdownNow(); + } synchronized (latestMetaDataLock) { latestMetaData = null; latestMetaDataLock.notifyAll(); @@ -2244,6 +2283,17 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement String tableName, long timestamp, String columns) throws SQLException { return addColumn(oldMetaConnection, tableName, timestamp, columns, true); } + + private static class RenewLeaseThreadFactory implements ThreadFactory { + private final AtomicInteger counter = new AtomicInteger(1); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName("Phoenix-Scanner-Renewlease-" + counter.getAndIncrement()); + return t; + } + } @Override public void init(final String url, final Properties props) throws SQLException { @@ -2441,8 +2491,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } catch (NewerTableAlreadyExistsException e) { } catch (TableAlreadyExistsException e) { } - - + scheduleRenewLeaseTasks(); } catch (Exception e) { if (e instanceof SQLException) { initializationException = (SQLException)e; @@ -2479,6 +2528,18 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } + private void scheduleRenewLeaseTasks() { + if (isRenewingLeasesEnabled()) { + renewLeaseExecutor = + Executors.newScheduledThreadPool(renewLeasePoolSize, + new RenewLeaseThreadFactory()); + for (LinkedBlockingQueue<WeakReference<PhoenixConnection>> q : connectionQueues) { + renewLeaseExecutor.scheduleAtFixedRate(new RenewLeaseTask(q), 0, + renewLeaseTaskFrequency, TimeUnit.MILLISECONDS); + } + } + } + private static int getSaltBuckets(TableAlreadyExistsException e) { PTable table = e.getTable(); Integer sequenceSaltBuckets = table == null ? null : table.getBucketNum(); @@ -2957,12 +3018,17 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement @Override public void addConnection(PhoenixConnection connection) throws SQLException { + connectionQueues.get(getQueueIndex(connection)).add(new WeakReference<PhoenixConnection>(connection)); if (returnSequenceValues) { synchronized (connectionCountLock) { connectionCount++; } } } + + private int getQueueIndex(PhoenixConnection conn) { + return ThreadLocalRandom.current().nextInt(renewLeasePoolSize); + } @Override public void removeConnection(PhoenixConnection connection) throws SQLException { @@ -3153,4 +3219,102 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement }, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES); return result; } + + @VisibleForTesting + static class RenewLeaseTask implements Runnable { + + private final LinkedBlockingQueue<WeakReference<PhoenixConnection>> connectionsQueue; + private final Random random = new Random(); + private static final int MAX_WAIT_TIME = 1000; + + RenewLeaseTask(LinkedBlockingQueue<WeakReference<PhoenixConnection>> queue) { + this.connectionsQueue = queue; + } + + private void waitForRandomDuration() { + try { + new CountDownLatch(1).await(random.nextInt(MAX_WAIT_TIME), MILLISECONDS); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + logger.warn("Exception encountered while waiting before renewing leases: " + ex); + } + } + + @Override + public void run() { + int numConnections = connectionsQueue.size(); + boolean wait = true; + // We keep adding items to the end of the queue. So to stop the loop, iterate only up to + // whatever the current count is. + while (numConnections > 0) { + if (wait) { + // wait for some random duration to prevent all threads from renewing lease at + // the same time. + waitForRandomDuration(); + wait = false; + } + // It is guaranteed that this poll won't hang indefinitely because this is the + // only thread that removes items from the queue. + WeakReference<PhoenixConnection> connRef = connectionsQueue.poll(); + PhoenixConnection conn = connRef.get(); + try { + if (conn != null && !conn.isClosed()) { + LinkedBlockingQueue<WeakReference<TableResultIterator>> scannerQueue = + conn.getScanners(); + // We keep adding items to the end of the queue. So to stop the loop, + // iterate only up to whatever the current count is. + int numScanners = scannerQueue.size(); + int renewed = 0; + long start = System.currentTimeMillis(); + while (numScanners > 0) { + WeakReference<TableResultIterator> ref = scannerQueue.poll(); + TableResultIterator scanningItr = ref.get(); + if (scanningItr != null) { + RenewLeaseStatus status = scanningItr.renewLease(); + switch (status) { + case RENEWED: + renewed++; + // add it back at the tail + scannerQueue.offer(new WeakReference<TableResultIterator>( + scanningItr)); + logger.info("Lease renewed for scanner: " + scanningItr); + break; + case UNINITIALIZED: + case THRESHOLD_NOT_REACHED: + // add it back at the tail + scannerQueue.offer(new WeakReference<TableResultIterator>( + scanningItr)); + break; + // if lease wasn't renewed or scanner was closed, don't add the + // scanner back to the queue. + case CLOSED: + case NOT_RENEWED: + break; + } + } + numScanners--; + } + if (renewed > 0) { + logger.info("Renewed leases for " + renewed + " scanner/s in " + + (System.currentTimeMillis() - start) + " ms "); + } + connectionsQueue.offer(connRef); + } + } catch (Exception e) { + logger.warn("Exception encountered when renewing lease: " + e); + } + numConnections--; + } + } + } + + @Override + public long getRenewLeaseThresholdMilliSeconds() { + return renewLeaseThreshold; + } + + @Override + public boolean isRenewingLeasesEnabled() { + return supportsFeature(ConnectionQueryServices.Feature.RENEW_LEASE) && renewLeaseEnabled; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index f1ab319..199b010 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -567,4 +567,14 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple throws SQLException { return new MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS, 0, null); } + + @Override + public long getRenewLeaseThresholdMilliSeconds() { + return 0; + } + + @Override + public boolean isRenewingLeasesEnabled() { + return false; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index b56ff85..84f3e74 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -265,6 +265,7 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple return getDelegate().getTransactionSystemClient(); } + @Override public MetaDataMutationResult createFunction(List<Mutation> functionData, PFunction function, boolean temporary) throws SQLException { return getDelegate().createFunction(functionData, function, temporary); @@ -293,4 +294,14 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple throws SQLException { return getDelegate().dropFunction(tableMetadata, ifExists); } + + @Override + public long getRenewLeaseThresholdMilliSeconds() { + return getDelegate().getRenewLeaseThresholdMilliSeconds(); + } + + @Override + public boolean isRenewingLeasesEnabled() { + return getDelegate().isRenewingLeasesEnabled(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 8f5f9a1..5fd7403 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -196,6 +196,11 @@ public interface QueryServices extends SQLCloseable { public static final String QUERY_SERVER_DNS_NAMESERVER_ATTRIB = "phoenix.queryserver.dns.nameserver"; public static final String QUERY_SERVER_DNS_INTERFACE_ATTRIB = "phoenix.queryserver.dns.interface"; public static final String QUERY_SERVER_HBASE_SECURITY_CONF_ATTRIB = "hbase.security.authentication"; + + public static final String RENEW_LEASE_ENABLED = "phoenix.scanner.lease.renew.enabled"; + public static final String RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS = "phoenix.scanner.lease.renew.interval"; + public static final String RENEW_LEASE_THRESHOLD_MILLISECONDS = "phoenix.scanner.lease.threshold"; + public static final String RENEW_LEASE_THREAD_POOL_SIZE = "phoenix.scanner.lease.pool.size"; /** * Get executor service used for parallel scans http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index b256ff3..def0a18 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.query; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; import static org.apache.phoenix.query.QueryServices.ALLOW_ONLINE_TABLE_SCHEMA_UPDATE; import static org.apache.phoenix.query.QueryServices.ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE; import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME; @@ -53,8 +54,12 @@ import static org.apache.phoenix.query.QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDA import static org.apache.phoenix.query.QueryServices.QUEUE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.REGIONSERVER_INFO_PORT_ATTRIB; import static org.apache.phoenix.query.QueryServices.REGIONSERVER_LEASE_PERIOD_ATTRIB; +import static org.apache.phoenix.query.QueryServices.RENEW_LEASE_ENABLED; +import static org.apache.phoenix.query.QueryServices.RENEW_LEASE_THREAD_POOL_SIZE; +import static org.apache.phoenix.query.QueryServices.RENEW_LEASE_THRESHOLD_MILLISECONDS; import static org.apache.phoenix.query.QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB; import static org.apache.phoenix.query.QueryServices.RPC_TIMEOUT_ATTRIB; +import static org.apache.phoenix.query.QueryServices.RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS; import static org.apache.phoenix.query.QueryServices.RUN_UPDATE_STATS_ASYNC; import static org.apache.phoenix.query.QueryServices.SCAN_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.SCAN_RESULT_CHUNK_SIZE; @@ -224,6 +229,13 @@ public class QueryServicesOptions { // doesn't depend on phoenix-core. public static final String DEFAULT_QUERY_SERVER_SERIALIZATION = "PROTOBUF"; public static final int DEFAULT_QUERY_SERVER_HTTP_PORT = 8765; + public static final boolean DEFAULT_RENEW_LEASE_ENABLED = true; + public static final int DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS = + DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD / 2; + public static final int DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS = + (3 * DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD) / 4; + public static final int DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE = 10; + @SuppressWarnings("serial") public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new HashSet<String>() { { @@ -233,7 +245,7 @@ public class QueryServicesOptions { add("credential"); } }; - + private final Configuration config; private QueryServicesOptions(Configuration config) { @@ -292,7 +304,11 @@ public class QueryServicesOptions { .setIfUnset(USE_BYTE_BASED_REGEX_ATTRIB, DEFAULT_USE_BYTE_BASED_REGEX) .setIfUnset(FORCE_ROW_KEY_ORDER_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER) .setIfUnset(COLLECT_REQUEST_LEVEL_METRICS, DEFAULT_REQUEST_LEVEL_METRICS_ENABLED) - .setIfUnset(ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE, DEFAULT_ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE); + .setIfUnset(ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE, DEFAULT_ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE) + .setIfUnset(ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE, DEFAULT_ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE) + .setIfUnset(RENEW_LEASE_THRESHOLD_MILLISECONDS, DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS) + .setIfUnset(RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS, DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS) + .setIfUnset(RENEW_LEASE_THREAD_POOL_SIZE, DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE); ; // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set @@ -595,4 +611,9 @@ public class QueryServicesOptions { config.setBoolean(COMMIT_STATS_ASYNC, flag); return this; } + + public QueryServicesOptions setEnableRenewLease(boolean enable) { + config.setBoolean(RENEW_LEASE_ENABLED, enable); + return this; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 0c1c71b..3f16065 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -773,7 +773,7 @@ public abstract class BaseTest { ensureTableCreated(url, tableName, null, null); } - protected static void ensureTableCreated(String url, String tableName, byte[][] splits) throws SQLException { + public static void ensureTableCreated(String url, String tableName, byte[][] splits) throws SQLException { ensureTableCreated(url, tableName, splits, null); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java index 215110c..037e8e7 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java @@ -57,6 +57,7 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { public static final String DEFAULT_EXTRA_JDBC_ARGUMENTS = PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM; private static final boolean DEFAULT_RUN_UPDATE_STATS_ASYNC = false; private static final boolean DEFAULT_COMMIT_STATS_ASYNC = false; + public static final boolean DEFAULT_RENEW_LEASE_ENABLED = false; /** @@ -97,7 +98,8 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { .setForceRowKeyOrder(DEFAULT_FORCE_ROWKEY_ORDER) .setExtraJDBCArguments(DEFAULT_EXTRA_JDBC_ARGUMENTS) .setRunUpdateStatsAsync(DEFAULT_RUN_UPDATE_STATS_ASYNC) - .setCommitStatsAsync(DEFAULT_COMMIT_STATS_ASYNC); + .setCommitStatsAsync(DEFAULT_COMMIT_STATS_ASYNC) + .setEnableRenewLease(DEFAULT_RENEW_LEASE_ENABLED); } public QueryServicesTestImpl(ReadOnlyProps defaultProps, ReadOnlyProps overrideProps) {
