This is an automated email from the ASF dual-hosted git repository.
jisaac pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push:
new 510a5647d4 PHOENIX-6751 Force using range scan vs skip scan when using
large IN clause (#1496)
510a5647d4 is described below
commit 510a5647d4ef487935cda4624c96befb28e4b410
Author: Jacob Isaac <[email protected]>
AuthorDate: Wed Sep 7 16:06:45 2022 -0700
PHOENIX-6751 Force using range scan vs skip scan when using large IN clause
(#1496)
Co-authored-by: Jacob Isaac <[email protected]>
---
.../java/org/apache/phoenix/end2end/InListIT.java | 517 ++++++++++++++++++++-
.../org/apache/phoenix/compile/WhereOptimizer.java | 63 ++-
.../org/apache/phoenix/query/QueryServices.java | 3 +
.../apache/phoenix/query/QueryServicesOptions.java | 5 +-
4 files changed, 572 insertions(+), 16 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java
index c64426cd77..54e5acd30c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java
@@ -25,16 +25,32 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.sql.Connection;
+import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.sql.Timestamp;
+import java.sql.Types;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
-
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.types.PDecimal;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PTimestamp;
+import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.ExplainPlanAttributes;
import org.apache.phoenix.compile.QueryPlan;
@@ -43,21 +59,26 @@ import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TypeMismatchException;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PInteger;
-import org.apache.phoenix.thirdparty.com.google.common.base.Function;
-import org.apache.phoenix.thirdparty.com.google.common.base.Joiner;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.thirdparty.com.google.common.base.Function;
+import org.apache.phoenix.thirdparty.com.google.common.base.Joiner;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.After;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
@Category(ParallelStatsDisabledTest.class)
+@RunWith(Parameterized.class)
public class InListIT extends ParallelStatsDisabledIT {
- private static final String TENANT_SPECIFIC_URL1 = getUrl() + ';' +
TENANT_ID_ATTRIB + "=tenant1";
private static boolean isInitialized = false;
private static String tableName = generateUniqueName();
private static String tableName2 = generateUniqueName();
@@ -67,6 +88,34 @@ public class InListIT extends ParallelStatsDisabledIT {
private static String viewName2 = generateUniqueName();
private static String prefix = generateUniqueName();
+ boolean checkMaxSkipScanCardinality = true;
+
+ public InListIT(boolean param) throws Exception {
+ // Setup max skip scan size appropriate for the tests.
+ checkMaxSkipScanCardinality = param;
+ Map<String, String> DEFAULT_PROPERTIES = new HashMap<String, String>()
{{
+ put(QueryServices.MAX_IN_LIST_SKIP_SCAN_SIZE,
checkMaxSkipScanCardinality ? String.valueOf(15) : String.valueOf(-1));
+ }};
+ ReadOnlyProps props = new ReadOnlyProps(ReadOnlyProps.EMPTY_PROPS,
DEFAULT_PROPERTIES.entrySet().iterator());
+ initAndRegisterTestDriver(getUrl(), props);
+
+ }
+
+ @Parameterized.Parameters(name="checkMaxSkipScanCardinality = {0}")
+ public static synchronized Collection<Boolean[]> data() {
+ return Arrays.asList(new Boolean[][] {
+ { false },{ true }
+ });
+ }
+
+ @BeforeClass
+ public static final void doSetup() throws Exception {
+
+ setUpTestDriver(ReadOnlyProps.EMPTY_PROPS, ReadOnlyProps.EMPTY_PROPS);
+ TENANT_SPECIFIC_URL1 = getUrl() + ';' + TENANT_ID_ATTRIB + "=tenant1";
+ TENANT_URL = getUrl() + ";" + PhoenixRuntime.TENANT_ID_ATTRIB + '=' +
TENANT_ID;
+ }
+
@Before
public void setup() throws Exception {
if(isInitialized){
@@ -201,14 +250,91 @@ public class InListIT extends ParallelStatsDisabledIT {
}
}
+
+ private String getType(PDataType pkType) {
+ String pkTypeStr = "VARCHAR(25)";
+ switch (pkType.getSqlType()) {
+ case Types.VARCHAR:
+ pkTypeStr = "VARCHAR(25)";
+ break;
+ case Types.CHAR:
+ pkTypeStr = "CHAR(15)";
+ break;
+ case Types.DECIMAL:
+ pkTypeStr = "DECIMAL(8,2)";
+ break;
+ case Types.INTEGER:
+ pkTypeStr = "INTEGER";
+ break;
+ case Types.BIGINT:
+ pkTypeStr = "BIGINT";
+ break;
+ case Types.DATE:
+ pkTypeStr = "DATE";
+ break;
+ case Types.TIMESTAMP:
+ pkTypeStr = "TIMESTAMP";
+ break;
+ default:
+ pkTypeStr = "VARCHAR(25)";
+ }
+ return pkTypeStr;
+ }
+
+ private static void createBaseTable(String baseTable) throws SQLException {
+
+ try (Connection globalConnection =
DriverManager.getConnection(getUrl())) {
+ try (Statement cstmt = globalConnection.createStatement()) {
+ String CO_BASE_TBL_TEMPLATE = "CREATE TABLE IF NOT EXISTS
%s(OID CHAR(15) NOT NULL,KP CHAR(3) NOT NULL,ROW_ID VARCHAR, COL1 VARCHAR,COL2
VARCHAR,COL3 VARCHAR,CREATED_DATE DATE,CREATED_BY CHAR(15),LAST_UPDATE
DATE,LAST_UPDATE_BY CHAR(15),SYSTEM_MODSTAMP DATE CONSTRAINT pk PRIMARY KEY
(OID,KP)) MULTI_TENANT=true,COLUMN_ENCODED_BYTES=0";
+ cstmt.execute(String.format(CO_BASE_TBL_TEMPLATE, baseTable));
+ }
+ }
+ return;
+ }
+
+ private void dropTenantViewData(int tenant, String tenantView) throws
SQLException {
+
+ String tenantConnectionUrl = String.format("%s;%s=%s%06d", getUrl(),
TENANT_ID_ATTRIB, TENANT_PREFIX, tenant);
+ try (Connection tenantConnection =
DriverManager.getConnection(tenantConnectionUrl)) {
+ tenantConnection.setAutoCommit(true);
+ try (Statement cstmt = tenantConnection.createStatement()) {
+ cstmt.execute(String.format("DELETE FROM %s", tenantView));
+ }
+ }
+ return;
+ }
+
+ private void createTenantView(int tenant, String baseTable, String
tenantView, String partition,
+ PDataType pkType1, SortOrder pk1Order,
+ PDataType pkType2, SortOrder pk2Order,
+ PDataType pkType3, SortOrder pk3Order) throws SQLException {
+
+ String pkType1Str = getType(pkType1);
+ String pkType2Str = getType(pkType2);
+ String pkType3Str = getType(pkType3);
+ createBaseTable(baseTable);
+
+ String tenantConnectionUrl = String.format("%s;%s=%s%06d", getUrl(),
TENANT_ID_ATTRIB, TENANT_PREFIX, tenant);
+ try (Connection tenantConnection =
DriverManager.getConnection(tenantConnectionUrl)) {
+ try (Statement cstmt = tenantConnection.createStatement()) {
+ String TENANT_VIEW_TEMPLATE = "CREATE VIEW IF NOT EXISTS
%s(ID1 %s not null,ID2 %s not null,ID3 %s not null,COL4 VARCHAR,COL5
VARCHAR,COL6 VARCHAR CONSTRAINT pk PRIMARY KEY (ID1 %s, ID2 %s, ID3 %s)) "
+ + "AS SELECT * FROM %s WHERE KP = '%s'";
+ cstmt.execute(String.format(TENANT_VIEW_TEMPLATE, tenantView,
pkType1Str, pkType2Str, pkType3Str,
pk1Order.name(),pk2Order.name(),pk3Order.name(), baseTable, partition));
+ }
+ }
+ return;
+ }
+
private static String getTableName(boolean isMultiTenant, PDataType
pkType, int saltBuckets) {
return prefix+"init_in_test_" + pkType.getSqlTypeName() + saltBuckets
+ (isMultiTenant ?
"_multi" :
"_single");
}
+ private static String TENANT_SPECIFIC_URL1;
+ private static final String TENANT_PREFIX = "Txt00tst1";
private static final String TENANT_ID = "ABC";
- private static final String TENANT_URL = getUrl() + ";" +
PhoenixRuntime.TENANT_ID_ATTRIB + '=' + TENANT_ID;
+ private static String TENANT_URL;
// the different combinations to check each test against
private static final List<Boolean> TENANCIES = Arrays.asList(false, true);
@@ -1762,4 +1888,383 @@ public class InListIT extends ParallelStatsDisabledIT {
assertEquals(2, r.getInt(1));
}
}
+
+ @Test
+ public void testWithVariousPKTypes() throws Exception {
+
+ SortOrder[][] sortOrders = new SortOrder[][] {
+ {SortOrder.ASC, SortOrder.ASC, SortOrder.ASC},
+ {SortOrder.ASC, SortOrder.ASC, SortOrder.DESC},
+ {SortOrder.ASC, SortOrder.DESC, SortOrder.ASC},
+ {SortOrder.ASC, SortOrder.DESC, SortOrder.DESC},
+ {SortOrder.DESC, SortOrder.ASC, SortOrder.ASC},
+ {SortOrder.DESC, SortOrder.ASC, SortOrder.DESC},
+ {SortOrder.DESC, SortOrder.DESC, SortOrder.ASC},
+ {SortOrder.DESC, SortOrder.DESC, SortOrder.DESC}
+ };
+
+ PDataType[] testTSVarVarPKTypes = new PDataType[] {
PTimestamp.INSTANCE, PVarchar.INSTANCE, PVarchar.INSTANCE};
+ String baseTableName = String.format("TEST_ENTITY.CUSTOM_T%d", 1);
+ int tenantId = 1;
+ int numTestCases = 3;
+
+ for (int index=0;index<sortOrders.length;index++) {
+
+ // Test Case 1: PK1 = Timestamp, PK2 = Varchar, PK3 = Varchar
+ String view1Name = String.format("TEST_ENTITY.Z%d",
index*numTestCases+1);
+ String partition1 = String.format("Z%d", index*numTestCases+1);
+ createTenantView(tenantId,
+ baseTableName, view1Name, partition1,
+ testTSVarVarPKTypes[0], sortOrders[index][0],
+ testTSVarVarPKTypes[1], sortOrders[index][1],
+ testTSVarVarPKTypes[2], sortOrders[index][2]);
+ testTSVarVarPKs(tenantId, view1Name, sortOrders[index]);
+ dropTenantViewData(tenantId, view1Name);
+
+ // Test Case 2: PK1 = Varchar, PK2 = Varchar, PK3 = Varchar
+ String view2Name = String.format("TEST_ENTITY.Z%d",
index*numTestCases+2);
+ String partition2 = String.format("Z%d", index*numTestCases+2);
+ PDataType[] testVarVarVarPKTypes = new PDataType[] {
PVarchar.INSTANCE, PVarchar.INSTANCE, PVarchar.INSTANCE};
+ createTenantView(tenantId,
+ baseTableName, view2Name, partition2,
+ testVarVarVarPKTypes[0], sortOrders[index][0],
+ testVarVarVarPKTypes[1], sortOrders[index][1],
+ testVarVarVarPKTypes[2], sortOrders[index][2]);
+ testVarVarVarPKs(tenantId, view2Name, sortOrders[index]);
+ dropTenantViewData(tenantId, view2Name);
+
+
+ // Test Case 3: PK1 = Bigint, PK2 = Decimal, PK3 = Bigint
+ String view3Name = String.format("TEST_ENTITY.Z%d",
index*numTestCases+3);
+ String partition3 = String.format("Z%d", index*numTestCases+3);
+ PDataType[] testIntDecIntPKTypes = new PDataType[] {
PLong.INSTANCE, PDecimal.INSTANCE, PLong.INSTANCE};
+ createTenantView(tenantId,
+ baseTableName, view3Name, partition3,
+ testIntDecIntPKTypes[0], sortOrders[index][0],
+ testIntDecIntPKTypes[1], sortOrders[index][1],
+ testIntDecIntPKTypes[2], sortOrders[index][2]);
+ testIntDecIntPK(tenantId, view3Name, sortOrders[index]);
+ dropTenantViewData(tenantId, view3Name);
+
+ }
+ }
+
+ // Test Case 1: PK1 = Timestamp, PK2 = Varchar, PK3 = Varchar
+ private void testTSVarVarPKs(int tenantId, String viewName, SortOrder[]
sortOrder) throws SQLException {
+ String testName = "testTSVarVarPKs";
+
+ long nowTime = System.currentTimeMillis();
+ List<String> UPSERT_SQLS = Arrays.asList(new String[] {
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
(%d, '%s', '%s', '%s')", viewName, nowTime, "1", "5", "row01"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
(%d, '%s', '%s', '%s')", viewName, nowTime, "1", "2", "row02"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
(%d, '%s', '%s', '%s')", viewName, nowTime, "3", "5", "row03"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
(%d, '%s', '%s', '%s')", viewName, nowTime, "3", "2", "row04"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
(%d, '%s', '%s', '%s')", viewName, nowTime+1, "2", "3", "row11"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
(%d, '%s', '%s', '%s')", viewName, nowTime+1, "2", "4", "row12"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
(%d, '%s', '%s', '%s')", viewName, nowTime+1, "2", "2", "row13"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
(%d, '%s', '%s', '%s')", viewName, nowTime+2, "4", "5", "row21"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
(%d, '%s', '%s', '%s')", viewName, nowTime+2, "1", "5", "row22"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
(%d, '%s', '%s', '%s')", viewName, nowTime+2, "1", "2", "row23"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
(%d, '%s', '%s', '%s')", viewName, nowTime+2, "3", "5", "row24"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
(%d, '%s', '%s', '%s')", viewName, nowTime+2, "3", "2", "row25"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
(%d, '%s', '%s', '%s')", viewName, nowTime+3, "1", "5", "row31"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
(%d, '%s', '%s', '%s')", viewName, nowTime+3, "1", "2", "row32"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
(%d, '%s', '%s', '%s')", viewName, nowTime+3, "3", "5", "row33"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
(%d, '%s', '%s', '%s')", viewName, nowTime+3, "3", "2", "row34"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
(%d, '%s', '%s', '%s')", viewName, nowTime+3, "6", "7", "row35")
+ });
+ String testSQL1 = String.format("SELECT ROW_ID FROM %s WHERE (ID1,
ID2, ID3) IN ((%d, '21', '1'),(%d, '2', '31'))", viewName, nowTime, nowTime);
+ String testSQL2 = String.format("SELECT ROW_ID FROM %s WHERE (ID1,
ID2, ID3) IN ((%d, '4', '5'),(%d, '2', '3'))", viewName, nowTime+1, nowTime+1);
+ String testMaxInList = String.format("SELECT ROW_ID FROM %s WHERE
(ID1, ID2, ID3) IN ((%d, '4', '5'),(%d, '2', '3')", viewName, nowTime+1,
nowTime+1);
+
+ String tenantConnectionUrl = String.format("%s;%s=%s%06d", getUrl(),
TENANT_ID_ATTRIB, TENANT_PREFIX, tenantId);
+ try (Connection tenantConnection =
DriverManager.getConnection(tenantConnectionUrl)) {
+ tenantConnection.setAutoCommit(true);
+ try (Statement ustmt = tenantConnection.createStatement()) {
+ for (String upsertSql : UPSERT_SQLS) {
+ ustmt.execute(upsertSql);
+ }
+ }
+ }
+
+
+ Set<String> expecteds1 = Collections.<String>emptySet();
+ Set<String> expecteds2 = Sets.newHashSet(new String[] {"row11"});
+ Set<String> expecteds3 = Sets.newHashSet(new String[] {"row11"});
+
+ assertExpectedWithWhere(tenantId, testName, testSQL1, expecteds1);
+ assertExpectedWithWhere(tenantId, testName, testSQL2, expecteds2);
+ PDataType[] testPKTypes = new PDataType[] { PTimestamp.INSTANCE,
PVarchar.INSTANCE, PVarchar.INSTANCE};
+ assertExpectedWithMaxInList(tenantId, testName, testPKTypes,
testMaxInList, sortOrder, expecteds3 );
+ }
+
+ // Test Case 2: PK1 = Varchar, PK2 = Varchar, PK3 = Varchar
+ private void testVarVarVarPKs(int tenantId, String viewName, SortOrder[]
sortOrder) throws SQLException {
+ String testName = "testVarVarVarPKs";
+ long nowTime = System.currentTimeMillis();
+ List<String> UPSERT_SQLS = Arrays.asList(new String[] {
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
('%s', '%s', '%s', '%s')", viewName, nowTime, 1, 5, "row01"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
('%s', '%s', '%s', '%s')", viewName, nowTime, 1, 2, "row02"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
('%s', '%s', '%s', '%s')", viewName, nowTime, 3, 5, "row03"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
('%s', '%s', '%s', '%s')", viewName, nowTime, 3, 2, "row04"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
('%s', '%s', '%s', '%s')", viewName, nowTime+1, 2, 3, "row11"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
('%s', '%s', '%s', '%s')", viewName, nowTime+1, 2, 4, "row12"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
('%s', '%s', '%s', '%s')", viewName, nowTime+1, 2, 2, "row13"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
('%s', '%s', '%s', '%s')", viewName, nowTime+1, 1, 1, "row14"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
('%s', '%s', '%s', '%s')", viewName, nowTime+1, 1, 2, "row15"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
('%s', '%s', '%s', '%s')", viewName, nowTime+2, 4, 5, "row21"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
('%s', '%s', '%s', '%s')", viewName, nowTime+2, 1, 5, "row22"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
('%s', '%s', '%s', '%s')", viewName, nowTime+2, 1, 2, "row23"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
('%s', '%s', '%s', '%s')", viewName, nowTime+2, 3, 6, "row24"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
('%s', '%s', '%s', '%s')", viewName, nowTime+2, 3, 4, "row25"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
('%s', '%s', '%s', '%s')", viewName, nowTime+2, 5, 6, "row26"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
('%s', '%s', '%s', '%s')", viewName, nowTime+2, 5, 4, "row27"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
('%s', '%s', '%s', '%s')", viewName, nowTime+3, 6, 7, "row3"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
('%s', '%s', '%s', '%s')", viewName, nowTime+3, 2, 5, "row4"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
('%s', '%s', '%s', '%s')", viewName, nowTime+3, 5, 3, "row5"),
+ });
+ String testSQL1 = String.format("SELECT ROW_ID FROM %s WHERE (ID1,
ID2, ID3) IN (('%s', '21', '1'),('%s', '2', '31'))", viewName, nowTime,
nowTime);
+ String testSQL2 = String.format("SELECT ROW_ID FROM %s WHERE (ID1,
ID2, ID3) IN (('%s', '4', '5'),('%s', '2', '3'))", viewName, nowTime+2,
nowTime+1);
+ String testMaxInList = String.format("SELECT ROW_ID FROM %s WHERE
(ID1, ID2, ID3) IN (('%s', '4', '5'),('%s', '2', '3')", viewName, nowTime+2,
nowTime+1);
+
+ String tenantConnectionUrl = String.format("%s;%s=%s%06d", getUrl(),
TENANT_ID_ATTRIB, TENANT_PREFIX, tenantId);
+ try (Connection tenantConnection =
DriverManager.getConnection(tenantConnectionUrl)) {
+ tenantConnection.setAutoCommit(true);
+ try (Statement ustmt = tenantConnection.createStatement()) {
+ for (String upsertSql : UPSERT_SQLS) {
+ ustmt.execute(upsertSql);
+ }
+ }
+ }
+
+
+ Set<String> expecteds1 = Collections.<String>emptySet();
+ Set<String> expecteds2 = Sets.newHashSet(new String[] {"row21",
"row11"});
+ Set<String> expecteds3 = Sets.newHashSet(new String[] {"row21",
"row11"});
+
+ assertExpectedWithWhere(tenantId, testName, testSQL1, expecteds1);
+ assertExpectedWithWhere(tenantId, testName, testSQL2, expecteds2);
+ PDataType[] testPKTypes = new PDataType[] { PVarchar.INSTANCE,
PVarchar.INSTANCE, PVarchar.INSTANCE};
+ assertExpectedWithMaxInList(tenantId, testName, testPKTypes,
testMaxInList, sortOrder, expecteds3);
+
+ }
+
+ private void testIntDecIntPK(int tenantId, String viewName, SortOrder[]
sortOrder) throws SQLException {
+ String testName = "testIntDecIntPK";
+ long nowTime = System.currentTimeMillis();
+ List<String> UPSERT_SQLS = Arrays.asList(new String[] {
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
(%d, %f, %d, '%s')", viewName, nowTime, 2.0, 3, "row0"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
(%d, %f, %d, '%s')", viewName, nowTime+1, 2.0, 3, "row1"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
(%d, %f, %d, '%s')", viewName, nowTime+1, 2.0, 5, "row2"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
(%d, %f, %d, '%s')", viewName, nowTime+2, 4.0, 5, "row3"),
+ String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID) VALUES
(%d, %f, %d, '%s')", viewName, nowTime+3, 6.0, 7, "row4")
+ });
+ String testSQL1 = String.format("SELECT ROW_ID FROM %s WHERE (ID1,
ID2, ID3) IN ((%d, 21, 1),(%d, 2, 31))", viewName, nowTime, nowTime);
+ String testSQL2 = String.format("SELECT ROW_ID FROM %s WHERE (ID2,
ID3) IN ((21.0, 1),(2.0, 3))", viewName);
+ String testSQL3 = String.format("SELECT ROW_ID FROM %s WHERE (ID1,
ID2) IN ((%d, 21.0),(%d, 2.0))", viewName, nowTime+1, nowTime+1);
+ String testSQL4 = String.format("SELECT ROW_ID FROM %s WHERE (ID3,
ID2, ID1) IN ((3, 21.0, %d),(3, 2.0, %d))", viewName, nowTime+1, nowTime+1);
+ String testSQL5 = String.format("SELECT ROW_ID FROM %s WHERE (ID1,
ID2, ID3) IN ((%d, 21.0, 3),(%d, 2.0, 3))", viewName, nowTime+1, nowTime+1);
+ String testMaxInList = String.format("SELECT ROW_ID FROM %s WHERE
(ID1, ID2, ID3) IN ((%d, 21.0, 3),(%d, 2.0, 3)", viewName, nowTime+1,
nowTime+1);
+
+ String tenantConnectionUrl = String.format("%s;%s=%s%06d", getUrl(),
TENANT_ID_ATTRIB, TENANT_PREFIX, tenantId);
+ try (Connection tenantConnection =
DriverManager.getConnection(tenantConnectionUrl)) {
+ tenantConnection.setAutoCommit(true);
+ try (Statement ustmt = tenantConnection.createStatement()) {
+ for (String upsertSql : UPSERT_SQLS) {
+ ustmt.execute(upsertSql);
+ }
+ }
+ }
+
+ Set<String> expecteds1 = Collections.<String>emptySet();
+ Set<String> expecteds2 = Sets.newHashSet(new String[] {"row0",
"row1"});
+ Set<String> expecteds3 = Sets.newHashSet(new String[] {"row1",
"row2"});
+ Set<String> expecteds4 = Sets.newHashSet(new String[] {"row1"});
+ Set<String> expecteds5 = Sets.newHashSet(new String[] {"row1"});
+ Set<String> expecteds6 = Sets.newHashSet(new String[] {"row1"});
+
+ assertExpectedWithWhere(tenantId, testName, testSQL1, expecteds1);
+ assertExpectedWithWhere(tenantId, testName, testSQL2, expecteds2);
+ assertExpectedWithWhere(tenantId, testName, testSQL3, expecteds3);
+ assertExpectedWithWhere(tenantId, testName, testSQL4, expecteds4);
+ assertExpectedWithWhere(tenantId, testName, testSQL5, expecteds5);
+ PDataType[] testPKTypes = new PDataType[] { PLong.INSTANCE,
PDecimal.INSTANCE, PLong.INSTANCE};
+ assertExpectedWithMaxInList(tenantId, testName, testPKTypes,
testMaxInList, sortOrder, expecteds6);
+ }
+
+ private void assertExpectedWithWhere(int tenantId, String testType,
String testSQL, Set<String> expectedSet) throws SQLException {
+ String context = "sql: " + testSQL + ", type: " + testType;
+
+ String tenantConnectionUrl = String.format("%s;%s=%s%06d", getUrl(),
TENANT_ID_ATTRIB, TENANT_PREFIX, tenantId);
+ try (Connection tenantConnection =
DriverManager.getConnection(tenantConnectionUrl)) {
+ // perform the query
+ ResultSet rs =
tenantConnection.createStatement().executeQuery(testSQL);
+ for (int i = 0; i < expectedSet.size();i++) {
+ assertTrue("did not include result '" + expectedSet.toString()
+ "' (" + context + ")", rs.next());
+ String actual = rs.getString(1);
+ assertTrue(context, expectedSet.contains(actual));
+ }
+ assertFalse(context, rs.next());
+ }
+ }
+
+ private void assertExpectedWithMaxInList(int tenantId, String testType,
PDataType[] testPKTypes, String testSQL, SortOrder[] sortOrder, Set<String>
expected) throws SQLException {
+ String context = "sql: " + testSQL + ", type: " + testType + ",
sort-order: " + Arrays.stream(sortOrder).map(s ->
s.name()).collect(Collectors.joining(","));
+ int numInLists = 25;
+ int numInListCols = 3;
+ Random rnd = new Random();
+ boolean expectSkipScan = checkMaxSkipScanCardinality ?
Arrays.stream(sortOrder).allMatch(Predicate.isEqual(SortOrder.ASC)) : true;
+
+ StringBuilder query = new StringBuilder(testSQL);
+ for (int i = 0; i < numInLists;i++) {
+ query.append(",(?,?,?)");
+ }
+ query.append(")");
+
+ String tenantConnectionUrl = String.format("%s;%s=%s%06d", getUrl(),
TENANT_ID_ATTRIB, TENANT_PREFIX, tenantId);
+ try (Connection tenantConnection =
DriverManager.getConnection(tenantConnectionUrl)) {
+ PhoenixPreparedStatement stmt =
tenantConnection.prepareStatement(query.toString()).unwrap(PhoenixPreparedStatement.class);
+ // perform the query
+ for (int i = 0; i<numInLists; i++) {
+ for (int b=0;b<testPKTypes.length;b++) {
+ int colIndex = i*numInListCols+b+1;
+ switch (testPKTypes[b].getSqlType()) {
+ case Types.VARCHAR: {
+ // pkTypeStr = "VARCHAR(25)";
+ int begin = rnd.nextInt(34);
+ int remaining =
"1234567890abcdefghijklmnopqrstuvwxyz".length() - begin;
+ int end = begin + (remaining > 25 ? 25 : remaining);
+ stmt.setString(colIndex,
"1234567890abcdefghijklmnopqrstuvwxyz".substring(begin, end));
+ break;
+ }
+ case Types.CHAR: {
+ //pkTypeStr = "CHAR(15)";
+ int begin = rnd.nextInt(34);
+ int remaining =
"1234567890abcdefghijklmnopqrstuvwxyz".length() - begin;
+ int end = begin + (remaining > 15 ? 15 : remaining);
+ stmt.setString(colIndex,
+
"1234567890abcdefghijklmnopqrstuvwxyz".substring(begin, end));
+ break;
+ }
+ case Types.DECIMAL:
+ //pkTypeStr = "DECIMAL(8,2)";
+ stmt.setDouble(colIndex, rnd.nextDouble());
+ break;
+ case Types.INTEGER:
+ //pkTypeStr = "INTEGER";
+ stmt.setInt(colIndex, rnd.nextInt(50000));
+ break;
+ case Types.BIGINT:
+ //pkTypeStr = "BIGINT";
+ stmt.setLong(colIndex, System.currentTimeMillis() +
rnd.nextInt(50000));
+ break;
+ case Types.DATE:
+ //pkTypeStr = "DATE";
+ stmt.setDate(colIndex, new
Date(System.currentTimeMillis() + rnd.nextInt(50000)));
+ break;
+ case Types.TIMESTAMP:
+ //pkTypeStr = "TIMESTAMP";
+ stmt.setTimestamp(colIndex, new
Timestamp(System.currentTimeMillis() + rnd.nextInt(50000)));
+ break;
+ default:
+ //pkTypeStr = "VARCHAR(25)";
+ stmt.setString(colIndex,
"1234567890abcdefghijklmnopqrstuvwxyz".substring(rnd.nextInt(34)).substring(0)
);
+ }
+
+ }
+ }
+ QueryPlan plan = stmt.compileQuery(query.toString());
+ if (expectSkipScan) {
+ assertTrue(plan.getExplainPlan().toString().contains("CLIENT
PARALLEL 1-WAY POINT LOOKUP ON"));
+ } else {
+ assertTrue(plan.getExplainPlan().toString().contains("CLIENT
PARALLEL 1-WAY RANGE SCAN OVER"));
+ }
+
+ ResultSet rs = stmt.executeQuery(query.toString());
+ for (int i = 0; i < expected.size();i++) {
+ assertTrue("did not include result '" + expected.toString() +
"' (" + context + ")", rs.next());
+ String actual = rs.getString(1);
+ assertTrue(context, expected.contains(actual));
+ }
+ assertFalse(context, rs.next());
+
+ }
+ }
+
+ @Test
+ public void testSkipScanCardinalityOverflow() throws SQLException {
+ if (!checkMaxSkipScanCardinality) {
+ return;
+ }
+
+ final String baseTableName = generateUniqueName();
+ final String viewName = String.format("Z_%s", baseTableName);
+ int tenantId = 1;
+
+ try (Connection globalConnection =
DriverManager.getConnection(getUrl())) {
+ try (Statement cstmt = globalConnection.createStatement()) {
+ String createDDL = "CREATE TABLE IF NOT EXISTS " +
baseTableName +
+ "(OID CHAR(15) NOT NULL, KP CHAR(3) NOT NULL, CREATED_DATE
DATE, CREATED_BY CHAR(15), SYSTEM_MODSTAMP DATE " +
+ "CONSTRAINT PK PRIMARY KEY (OID, KP))
MULTI_TENANT=true,COLUMN_ENCODED_BYTES=0";
+ cstmt.execute(createDDL);
+ }
+ }
+
+ String tenantConnectionUrl = String.format("%s;%s=%s%06d", getUrl(),
TENANT_ID_ATTRIB, TENANT_PREFIX, tenantId);
+ try (Connection tenantConnection =
DriverManager.getConnection(tenantConnectionUrl)) {
+ try (Statement cstmt = tenantConnection.createStatement()) {
+ // DESC order in PK causes key explosion when creating skip
scan
+ String TENANT_VIEW_TEMPLATE = "CREATE VIEW IF NOT EXISTS %s " +
+ "(ID1 INTEGER not null, ID2 INTEGER not null, ID3 INTEGER
not null, ID4 INTEGER not null, ID5 INTEGER not null, COL1 VARCHAR(15) " +
+ "CONSTRAINT pk PRIMARY KEY (ID1 DESC, ID2 DESC, ID3 DESC,
ID4 DESC, ID5 DESC)) " +
+ "AS SELECT * FROM %s WHERE KP = 'abc'";
+ cstmt.execute(String.format(TENANT_VIEW_TEMPLATE, viewName,
baseTableName));
+ }
+
+ int totalRows = 100; // 100 ^ 5 ( # of pk cols in view) will cause
integer overflow
+ String dml = "UPSERT INTO " + viewName +
+ "(CREATED_DATE, CREATED_BY, SYSTEM_MODSTAMP, ID1, ID2, ID3,
ID4, ID5, COL1)" +
+ " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ try (PreparedStatement ustmt =
tenantConnection.prepareStatement(dml)) {
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ for (int i = 0; i < totalRows; i++) {
+ ustmt.setDate(1, new Date(now + i));
+ ustmt.setString(2, "foo");
+ ustmt.setDate(3, new Date(now + i));
+ ustmt.setInt(4, i);
+ ustmt.setInt(5, i);
+ ustmt.setInt(6, i);
+ ustmt.setInt(7, i);
+ ustmt.setInt(8, i);
+ ustmt.setString(9, "COL1_" + i);
+ ustmt.execute();
+ }
+ tenantConnection.commit();
+ }
+
+ StringBuilder selectQuery = new StringBuilder();
+ selectQuery.append("SELECT COUNT(*) FROM " + viewName + " WHERE
(ID1, ID2, ID3, ID4, ID5) IN (");
+ for (int i = 0; i < totalRows; i++) {
+ selectQuery.append("(?, ?, ?, ?, ?)");
+ selectQuery.append(",");
+ }
+ // delete the trailing ','
+ selectQuery.deleteCharAt(selectQuery.length() - 1);
+ selectQuery.append(")");
+ try (PreparedStatement selstmt =
tenantConnection.prepareStatement(selectQuery.toString())) {
+ int paramIndex = 0;
+ for (int i = 0; i < totalRows; i++) {
+ selstmt.setInt(++paramIndex, i);
+ selstmt.setInt(++paramIndex, i);
+ selstmt.setInt(++paramIndex, i);
+ selstmt.setInt(++paramIndex, i);
+ selstmt.setInt(++paramIndex, i);
+ }
+ ResultSet rs = selstmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(totalRows, rs.getInt(1));
+ }
+ }
+ }
+
}
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
index 65bbbfdc27..6491d7d138 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.compile;
+import java.math.BigInteger;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -29,6 +30,10 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
+import org.apache.phoenix.expression.DelegateExpression;
+import org.apache.phoenix.schema.ValueSchema;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -114,16 +119,19 @@ public class WhereOptimizer {
PName tenantId = context.getConnection().getTenantId();
byte[] tenantIdBytes = null;
PTable table = context.getCurrentTable().getTable();
- Integer nBuckets = table.getBucketNum();
- boolean isSalted = nBuckets != null;
- RowKeySchema schema = table.getRowKeySchema();
- boolean isMultiTenant = tenantId != null && table.isMultiTenant();
- boolean isSharedIndex = table.getViewIndexId() != null;
- ImmutableBytesWritable ptr = context.getTempPtr();
-
- if (isMultiTenant) {
+ Integer nBuckets = table.getBucketNum();
+ boolean isSalted = nBuckets != null;
+ RowKeySchema schema = table.getRowKeySchema();
+ boolean isMultiTenant = tenantId != null && table.isMultiTenant();
+ boolean isSharedIndex = table.getViewIndexId() != null;
+ ImmutableBytesWritable ptr = context.getTempPtr();
+ int maxInListSkipScanSize =
context.getConnection().getQueryServices().getConfiguration()
+ .getInt(QueryServices.MAX_IN_LIST_SKIP_SCAN_SIZE,
+
QueryServicesOptions.DEFAULT_MAX_IN_LIST_SKIP_SCAN_SIZE);
+
+ if (isMultiTenant) {
tenantIdBytes = ScanUtil.getTenantIdBytes(schema, isSalted,
tenantId, isSharedIndex);
- }
+ }
if (whereClause == null && (tenantId == null ||
!table.isMultiTenant()) && table.getViewIndexId() == null &&
!minOffset.isPresent()) {
context.setScanRanges(ScanRanges.EVERYTHING);
@@ -198,6 +206,9 @@ public class WhereOptimizer {
boolean hasMultiRanges = false;
boolean hasRangeKey = false;
boolean useSkipScan = false;
+ boolean checkMaxSkipScanCardinality = false;
+ BigInteger inListSkipScanCardinality = BigInteger.ONE; // using
BigInteger to avoid overflow issues
+
// Concat byte arrays of literals to form scan start key
while (iterator.hasNext()) {
@@ -226,6 +237,13 @@ public class WhereOptimizer {
boolean stopExtracting = false;
// Iterate through all spans of this slot
boolean areAllSingleKey = KeyRange.areAllSingleKey(keyRanges);
+ boolean isInList = false;
+ int cnfStartPos = cnf.size();
+
+ if (keyPart.getExtractNodes() != null &&
keyPart.getExtractNodes().size() > 0
+ && keyPart.getExtractNodes().get(0) instanceof
InListExpression){
+ isInList = true;
+ }
while (true) {
SortOrder sortOrder =
schema.getField(slot.getPKPosition() +
slotOffset).getSortOrder();
@@ -269,6 +287,9 @@ public class WhereOptimizer {
pkPos = slot.getPKPosition() + slotOffset;
clipLeftSpan = 0;
prevSortOrder = sortOrder;
+ // If we had an IN clause with mixed sort ordering then we
need to check the possibility of
+ // skip scan key generation explosion.
+ checkMaxSkipScanCardinality |= isInList;
// since we have to clip the portion with the same sort
order, we can no longer
// extract the nodes from the where clause
// for eg. for the schema A VARCHAR DESC, B VARCHAR ASC
and query
@@ -301,6 +322,30 @@ public class WhereOptimizer {
slotSpanArray[cnf.size()] = clipLeftSpan-1;
cnf.add(keyRanges);
}
+
+ // Do not use the skipScanFilter when there is a large IN clause
(for e.g > 50k elements)
+ // Since the generation of point keys for skip scan filter will
blow up the memory usage.
+ // See ScanRanges.getPointKeys(...) where using the various slot
key ranges
+ // to generate point keys will lead to combinatorial explosion.
+ // The following check will ensure the cardinality of generated
point keys
+ // is below the configured max (maxInListSkipScanSize).
+ // We shall force a range scan if the configured max is exceeded.
+ // cnfStartPos => is the start slot of this IN list
+ if (checkMaxSkipScanCardinality) {
+ for (int i = cnfStartPos; i < cnf.size(); i++) {
+ // using int can result in overflow
+ inListSkipScanCardinality =
+
inListSkipScanCardinality.multiply(BigInteger.valueOf(cnf.get(i).size()));
+ }
+ // If the maxInListSkipScanSize <= 0 then the feature (to
force range scan) is turned off
+ if (maxInListSkipScanSize > 0) {
+ forcedRangeScan =
+
inListSkipScanCardinality.compareTo(BigInteger.valueOf(maxInListSkipScanSize))
== 1 ? true : false;
+ }
+ // Reset the check flag for the next IN list clause
+ checkMaxSkipScanCardinality = false;
+ }
+
// TODO: when stats are available, we may want to use a skip scan
if the
// cardinality of this slot is low.
/**
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 33d0430272..94617e2918 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -375,6 +375,9 @@ public interface QueryServices extends SQLCloseable {
*/
String SOURCE_OPERATION_ATTRIB = "phoenix.source.operation";
+ // The max point keys that can be generated for large in list clause
+ public static final String MAX_IN_LIST_SKIP_SCAN_SIZE =
"phoenix.max.inList.skipScan.size";
+
/**
* Get executor service used for parallel scans
*/
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 379eadd7f7..56ff09b128 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -98,6 +98,7 @@ import static
org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB;
import static
org.apache.phoenix.query.QueryServices.USE_STATS_FOR_PARALLELIZATION;
import static
org.apache.phoenix.query.QueryServices.CLIENT_INDEX_ASYNC_THRESHOLD;
import static
org.apache.phoenix.query.QueryServices.PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED;
+import static
org.apache.phoenix.query.QueryServices.MAX_IN_LIST_SKIP_SCAN_SIZE;
import java.util.Map.Entry;
@@ -177,6 +178,7 @@ public class QueryServicesOptions {
public static final long DEFAULT_SCAN_RESULT_CHUNK_SIZE = 2999;
public static final boolean DEFAULT_IS_NAMESPACE_MAPPING_ENABLED = false;
public static final boolean DEFAULT_IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE =
true;
+ public static final int DEFAULT_MAX_IN_LIST_SKIP_SCAN_SIZE = 50000;
//
// Spillable GroupBy - SPGBY prefix
@@ -459,7 +461,8 @@ public class QueryServicesOptions {
.setIfUnset(CLIENT_METRICS_TAG, DEFAULT_CLIENT_METRICS_TAG)
.setIfUnset(CLIENT_INDEX_ASYNC_THRESHOLD,
DEFAULT_CLIENT_INDEX_ASYNC_THRESHOLD)
.setIfUnset(PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED,
DEFAULT_SERVER_SIDE_MASKING_ENABLED)
- ;
+ .setIfUnset(MAX_IN_LIST_SKIP_SCAN_SIZE,
DEFAULT_MAX_IN_LIST_SKIP_SCAN_SIZE);
+
// HBase sets this to 1, so we reset it to something more appropriate.
// Hopefully HBase will change this, because we can't know if a user
set
// it to 1, so we'll change it.