This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.2 by this push:
new 931a3e7743 PHOENIX-7229 Leverage bloom filters for single key point
lookups (#1832)
931a3e7743 is described below
commit 931a3e7743c47696cb14fb94de23070f841e0bcf
Author: tkhurana <[email protected]>
AuthorDate: Sat Feb 24 21:20:15 2024 -0800
PHOENIX-7229 Leverage bloom filters for single key point lookups (#1832)
Co-authored-by: Viraj Jasani <[email protected]>
---
.../phoenix/iterate/BaseResultIterators.java | 17 +-
phoenix-core/pom.xml | 5 +
.../org/apache/phoenix/end2end/BloomFilterIT.java | 244 +++++++++++++++++++++
.../apache/phoenix/compile/WhereOptimizerTest.java | 50 +++++
.../java/org/apache/phoenix/util/TestUtil.java | 5 +
5 files changed, 320 insertions(+), 1 deletion(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 5fda525316..359cf2255b 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -37,6 +37,7 @@ import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.EOFException;
+import java.io.IOException;
import java.sql.SQLException;
import java.util.*;
import java.util.Arrays;
@@ -72,6 +73,7 @@ import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.coprocessorclient.HashJoinCacheNotFoundException;
import
org.apache.phoenix.coprocessorclient.UngroupedAggregateRegionObserverHelper;
+import org.apache.phoenix.exception.PhoenixIOException;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.execute.ScanPlan;
@@ -936,7 +938,20 @@ public abstract class BaseResultIterators extends
ExplainTable implements Result
if (!isLocalIndex && scanRanges.isPointLookup() &&
!scanRanges.useSkipScanFilter()) {
List<List<Scan>> parallelScans =
Lists.newArrayListWithExpectedSize(1);
List<Scan> scans = Lists.newArrayListWithExpectedSize(1);
- scans.add(context.getScan());
+ Scan scanFromContext = context.getScan();
+ if (scanRanges.getPointLookupCount() == 1) {
+ // leverage bloom filter for single key point lookup by
turning scan to
+ // Get Scan#isGetScan()
+ try {
+ scanFromContext = new Scan(context.getScan());
+ } catch (IOException e) {
+ LOGGER.error("Failure to construct point lookup scan", e);
+ throw new PhoenixIOException(e);
+ }
+ scanFromContext.withStopRow(scanFromContext.getStartRow(),
+ scanFromContext.includeStartRow());
+ }
+ scans.add(scanFromContext);
parallelScans.add(scans);
generateEstimates(scanRanges, table, GuidePostsInfo.NO_GUIDEPOST,
GuidePostsInfo.NO_GUIDEPOST.isEmptyGuidePost(),
parallelScans, estimates,
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 6b0738e365..36f994d6ae 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -227,6 +227,11 @@
<artifactId>hbase-hadoop2-compat</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop-compat</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BloomFilterIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BloomFilterIT.java
new file mode 100644
index 0000000000..555650d2ee
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BloomFilterIT.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.flush;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
+import org.apache.hadoop.hbase.regionserver.MetricsRegionServerWrapper;
+import org.apache.phoenix.thirdparty.com.google.common.base.MoreObjects;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class BloomFilterIT extends ParallelStatsDisabledIT {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BloomFilterIT.class);
+ private static class BloomFilterMetrics {
+ // total lookup requests
+ private long requestsCount;
+ // requests where key does not exist
+ private long negativeResultsCount;
+ // potential lookup requests rejected because no bloom filter present
in storefile
+ private long eligibleRequestsCount;
+
+ private BloomFilterMetrics() {
+ this.requestsCount = 0;
+ this.negativeResultsCount = 0;
+ this.eligibleRequestsCount = 0;
+ }
+
+ private BloomFilterMetrics(long requestsCount, long
negativeResultsCount, long eligibleRequestsCount) {
+ this.requestsCount = requestsCount;
+ this.negativeResultsCount = negativeResultsCount;
+ this.eligibleRequestsCount = eligibleRequestsCount;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ BloomFilterMetrics rhs = (BloomFilterMetrics)obj;
+ return (this.requestsCount == rhs.requestsCount &&
+ this.negativeResultsCount == rhs.negativeResultsCount &&
+ this.eligibleRequestsCount == rhs.eligibleRequestsCount);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("requestsCount", requestsCount)
+ .add("negativeResultsCount", negativeResultsCount)
+ .add("eligibleRequestsCount", eligibleRequestsCount)
+ .toString();
+ }
+ }
+ private BloomFilterMetrics beforeMetrics;
+
+ private BloomFilterMetrics getBloomFilterMetrics() {
+ HBaseTestingUtility util = getUtility();
+ HRegionServer regionServer = util.getHBaseCluster().getRegionServer(0);
+ MetricsRegionServer regionServerMetrics = regionServer.getMetrics();
+ MetricsRegionServerWrapper regionServerWrapper =
regionServerMetrics.getRegionServerWrapper();
+ long requestsCount = regionServerWrapper.getBloomFilterRequestsCount();
+ long negativeResultsCount =
regionServerWrapper.getBloomFilterNegativeResultsCount();
+ long eligibleRequestsCount =
regionServerWrapper.getBloomFilterEligibleRequestsCount();
+ return new BloomFilterMetrics(requestsCount, negativeResultsCount,
eligibleRequestsCount);
+ }
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ props.put(HConstants.REGIONSERVER_METRICS_PERIOD, Long.toString(1000));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Before
+ public void testSetup() {
+ beforeMetrics = getBloomFilterMetrics();
+ }
+
+ @Test
+ public void testPointLookup() throws Exception {
+ String tableName = generateUniqueName();
+ BloomFilterMetrics expectedMetrics = new BloomFilterMetrics();
+ //String ddl = String.format("CREATE TABLE %s (ID INTEGER NOT NULL
PRIMARY KEY, V1 VARCHAR) BLOOMFILTER='NONE'", tableName);
+ String ddl = String.format("CREATE TABLE %s (ID INTEGER NOT NULL
PRIMARY KEY, V1 VARCHAR)", tableName);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute(ddl);
+ populateTable(conn, tableName);
+ // flush the memstore to storefiles which will write the bloom
filter
+ flush(getUtility(), TableName.valueOf(tableName));
+ // negative key point lookup
+ String dql = String.format("SELECT * FROM %s where id = 7",
tableName);
+ try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+ assertFalse(rs.next());
+ expectedMetrics.requestsCount +=1;
+ expectedMetrics.negativeResultsCount +=1;
+ }
+ // key exists
+ dql = String.format("SELECT * FROM %s where id = 4", tableName);
+ try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+ assertTrue(rs.next());
+ assertEquals(4, rs.getInt(1));
+ // negative request shouldn't be incremented since key exists
+ expectedMetrics.requestsCount +=1;
+ }
+ // multiple keys point lookup
+ dql = String.format("SELECT * FROM %s where id IN (4,7)",
tableName);
+ try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+ assertTrue(rs.next());
+ assertEquals(4, rs.getInt(1));
+ assertFalse(rs.next());
+ // bloom filter won't be used since scan start/stop key is
different
+ }
+ verifyBloomFilterMetrics(expectedMetrics);
+ }
+ }
+
+ @Test
+ public void testPointLookupOnSaltedTable() throws Exception {
+ String tableName = generateUniqueName();
+ BloomFilterMetrics expectedMetrics = new BloomFilterMetrics();
+ String ddl = String.format("CREATE TABLE %s (ID INTEGER NOT NULL
PRIMARY KEY, V1 VARCHAR) SALT_BUCKETS=3", tableName);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute(ddl);
+ populateTable(conn, tableName);
+ flush(getUtility(), TableName.valueOf(tableName));
+ // negative key point lookup
+ String dql = String.format("SELECT * FROM %s where id = 7",
tableName);
+ try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+ assertFalse(rs.next());
+ expectedMetrics.requestsCount +=1;
+ expectedMetrics.negativeResultsCount +=1;
+ }
+ }
+ verifyBloomFilterMetrics(expectedMetrics);
+ }
+
+ @Test
+ public void testAlterBloomFilter() throws Exception {
+ String tableName = generateUniqueName();
+ BloomFilterMetrics expectedMetrics = new BloomFilterMetrics();
+ String ddl = String.format("CREATE TABLE %s (ID INTEGER NOT NULL
PRIMARY KEY, V1 VARCHAR) BLOOMFILTER='NONE'", tableName);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute(ddl);
+ populateTable(conn, tableName);
+ flush(getUtility(), TableName.valueOf(tableName));
+ String dql = String.format("SELECT * FROM %s where id = 7",
tableName);
+ try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+ assertFalse(rs.next());
+ // since bloom filter is not enabled
+ expectedMetrics.eligibleRequestsCount +=1;
+ verifyBloomFilterMetrics(expectedMetrics);
+ }
+ ddl = String.format("ALTER TABLE %s SET BLOOMFILTER='ROW'",
tableName);
+ conn.createStatement().execute(ddl);
+ // alter table changes the table descriptor and table region
re-opens reset metrics
+ beforeMetrics = getBloomFilterMetrics();
+ expectedMetrics = new BloomFilterMetrics();
+ // Insert 2 more rows
+ String dml = String.format("UPSERT INTO %s VALUES (100,
'val_100')", tableName);
+ conn.createStatement().execute(dml);
+ dml = String.format("UPSERT INTO %s VALUES (200, 'val_200')",
tableName);
+ conn.createStatement().execute(dml);
+ conn.commit();
+ // A new storefile should be created with bloom filter
+ flush(getUtility(), TableName.valueOf(tableName));
+ dql = String.format("SELECT * FROM %s where id = 150", tableName);
+ try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+ assertFalse(rs.next());
+ expectedMetrics.requestsCount +=1;
+ expectedMetrics.negativeResultsCount +=1;
+ verifyBloomFilterMetrics(expectedMetrics);
+ }
+ }
+ }
+
+ private void verifyBloomFilterMetrics(BloomFilterMetrics expectedMetrics)
throws InterruptedException {
+ long metricsDelay =
config.getLong(HConstants.REGIONSERVER_METRICS_PERIOD,
+ HConstants.DEFAULT_REGIONSERVER_METRICS_PERIOD);
+ // Ensure that the region server accumulates the metrics
+ Thread.sleep(metricsDelay + 1000);
+ BloomFilterMetrics afterMetrics = getBloomFilterMetrics();
+ LOGGER.info("Before={} After={} Expected={}", beforeMetrics,
afterMetrics, expectedMetrics);
+
+ BloomFilterMetrics deltaMetrics = new BloomFilterMetrics(
+ afterMetrics.requestsCount - beforeMetrics.requestsCount,
+ afterMetrics.negativeResultsCount -
beforeMetrics.negativeResultsCount,
+ afterMetrics.eligibleRequestsCount -
beforeMetrics.eligibleRequestsCount);
+
+ Assert.assertEquals(expectedMetrics, deltaMetrics);
+ }
+
+ private void populateTable(Connection conn, String tableName) throws
SQLException {
+ try(PreparedStatement ps = conn.prepareStatement(String.format("UPSERT
INTO %s VALUES (?, ?)", tableName))) {
+ for (int i = 0; i < 16; i = i + 2) {
+ ps.setInt(1, i);
+ ps.setString(2, "val_" + i);
+ ps.executeUpdate();
+ }
+ conn.commit();
+ }
+ }
+}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
index 20bd097e04..c1584c19c5 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
@@ -2502,6 +2502,56 @@ public class WhereOptimizerTest extends
BaseConnectionlessQueryTest {
assertArrayEquals(stopRow, scan.getStopRow());
}
+ @Test
+ public void testScanRangeForPointLookup() throws SQLException {
+ String tenantId = "000000000000001";
+ String entityId = "002333333333333";
+ String query = String.format("select * from atable where
organization_id='%s' and entity_id='%s'",
+ tenantId, entityId);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ QueryPlan optimizedPlan = TestUtil.getOptimizeQueryPlan(conn,
query);
+ byte[] startRow =
ByteUtil.concat(PVarchar.INSTANCE.toBytes(tenantId),
PVarchar.INSTANCE.toBytes(entityId));
+ byte[] stopRow = ByteUtil.nextKey(startRow);
+ validateScanRangesForPointLookup(optimizedPlan, startRow, stopRow);
+ }
+ }
+
+ @Test
+ public void testScanRangeForPointLookupRVC() throws SQLException {
+ String tenantId = "000000000000001";
+ String entityId = "002333333333333";
+ String query = String.format("select * from atable where
(organization_id, entity_id) IN (('%s','%s'))",
+ tenantId, entityId);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ QueryPlan optimizedPlan = TestUtil.getOptimizeQueryPlan(conn,
query);
+ byte[] startRow =
ByteUtil.concat(PVarchar.INSTANCE.toBytes(tenantId),
PVarchar.INSTANCE.toBytes(entityId));
+ byte[] stopRow = ByteUtil.nextKey(startRow);
+ validateScanRangesForPointLookup(optimizedPlan, startRow, stopRow);
+ }
+ }
+
+ private static void validateScanRangesForPointLookup(QueryPlan
optimizedPlan, byte[] startRow, byte[] stopRow) {
+ StatementContext context = optimizedPlan.getContext();
+ ScanRanges scanRanges = context.getScanRanges();
+ assertTrue(scanRanges.isPointLookup());
+ assertEquals(1, scanRanges.getPointLookupCount());
+ // scan from StatementContext has scan range [start, next(start)]
+ Scan scanFromContext = context.getScan();
+ assertArrayEquals(startRow, scanFromContext.getStartRow());
+ assertTrue(scanFromContext.includeStartRow());
+ assertArrayEquals(stopRow, scanFromContext.getStopRow());
+ assertFalse(scanFromContext.includeStopRow());
+
+ List<List<Scan>> scans = optimizedPlan.getScans();
+ assertEquals(1, scans.size());
+ assertEquals(1, scans.get(0).size());
+ Scan scanFromIterator = scans.get(0).get(0);
+ // scan from iterator has same start and stop row [start, start] i.e a
Get
+ assertTrue(scanFromIterator.isGetScan());
+ assertTrue(scanFromIterator.includeStartRow());
+ assertTrue(scanFromIterator.includeStopRow());
+ }
+
private static StatementContext compileStatementTenantSpecific(String
tenantId, String query, List<Object> binds) throws Exception {
PhoenixConnection pconn =
getTenantSpecificConnection("tenantId").unwrap(PhoenixConnection.class);
PhoenixPreparedStatement pstmt = new PhoenixPreparedStatement(pconn,
query);
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index fe0d8d1b2c..cf076f08ef 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -810,6 +810,11 @@ public class TestUtil {
conn.createStatement().execute(ddl);
}
+ public static void flush(HBaseTestingUtility utility, TableName table)
throws IOException {
+ Admin admin = utility.getAdmin();
+ admin.flush(table);
+ }
+
public static void majorCompact(HBaseTestingUtility utility, TableName
table)
throws IOException, InterruptedException {
long compactionRequestedSCN =
EnvironmentEdgeManager.currentTimeMillis();