This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 354716eed9 PHOENIX-7213 Add option for unlimited
phoenix.query.QueueSize
354716eed9 is described below
commit 354716eed9ef6a172ad98882194db6928385f414
Author: tkhurana <[email protected]>
AuthorDate: Wed Feb 14 10:07:05 2024 -0800
PHOENIX-7213 Add option for unlimited phoenix.query.QueueSize
---
.../phoenix/job/AbstractRoundRobinQueue.java | 16 +++++-
.../apache/phoenix/query/QueryServicesOptions.java | 31 +++++-----
.../apache/phoenix/end2end/QueryWithLimitIT.java | 67 ++++++++++++++--------
3 files changed, 72 insertions(+), 42 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/job/AbstractRoundRobinQueue.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/job/AbstractRoundRobinQueue.java
index fa68852524..001453923e 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/job/AbstractRoundRobinQueue.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/job/AbstractRoundRobinQueue.java
@@ -17,12 +17,22 @@
*/
package org.apache.phoenix.job;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
-import java.util.*;
+import java.util.AbstractQueue;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.ListIterator;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+
+import static
org.apache.phoenix.query.QueryServicesOptions.UNLIMITED_QUEUE_SIZE;
+
/**
*
* An bounded blocking queue implementation that keeps a virtual queue of
elements on per-producer
@@ -88,7 +98,7 @@ public abstract class AbstractRoundRobinQueue<E> extends
AbstractQueue<E>
ProducerList<E> producerList = null;
synchronized(lock) {
- if (this.size == this.maxSize) {
+ if (this.maxSize != UNLIMITED_QUEUE_SIZE && this.size ==
this.maxSize) {
return false;
}
producerList = this.producerMap.get(producerKey);
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index f88ec01a5a..2d8d796a4f 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -141,20 +141,21 @@ import org.apache.phoenix.util.ReadOnlyProps;
* @since 0.1
*/
public class QueryServicesOptions {
- public static final int DEFAULT_KEEP_ALIVE_MS = 60000;
- public static final int DEFAULT_THREAD_POOL_SIZE = 128;
- public static final int DEFAULT_QUEUE_SIZE = 5000;
- public static final int DEFAULT_THREAD_TIMEOUT_MS = 600000; // 10min
- public static final int DEFAULT_SPOOL_THRESHOLD_BYTES = 1024 * 1024 *
20; // 20m
- public static final int DEFAULT_SERVER_SPOOL_THRESHOLD_BYTES = 1024 *
1024 * 20; // 20m
- public static final int DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES = 1024 *
1024 * 20; // 20m
- public static final boolean DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED =
true;
- public static final boolean DEFAULT_CLIENT_JOIN_SPOOLING_ENABLED = true;
- public static final boolean DEFAULT_SERVER_ORDERBY_SPOOLING_ENABLED =
true;
+ public static final int DEFAULT_KEEP_ALIVE_MS = 60000;
+ public static final int DEFAULT_THREAD_POOL_SIZE = 128;
+ public static final int DEFAULT_QUEUE_SIZE = 5000;
+ public static final int UNLIMITED_QUEUE_SIZE = -1;
+ public static final int DEFAULT_THREAD_TIMEOUT_MS = 600000; // 10min
+ public static final int DEFAULT_SPOOL_THRESHOLD_BYTES = 1024 * 1024 * 20;
// 20m
+ public static final int DEFAULT_SERVER_SPOOL_THRESHOLD_BYTES = 1024 * 1024
* 20; // 20m
+ public static final int DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES = 1024 * 1024
* 20; // 20m
+ public static final boolean DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED = true;
+ public static final boolean DEFAULT_CLIENT_JOIN_SPOOLING_ENABLED = true;
+ public static final boolean DEFAULT_SERVER_ORDERBY_SPOOLING_ENABLED = true;
public static final String DEFAULT_SPOOL_DIRECTORY =
System.getProperty("java.io.tmpdir");
- public static final int DEFAULT_MAX_MEMORY_PERC = 15; // 15% of heap
- public static final int DEFAULT_MAX_TENANT_MEMORY_PERC = 100;
- public static final long DEFAULT_MAX_SERVER_CACHE_SIZE = 1024*1024*100;
// 100 Mb
+ public static final int DEFAULT_MAX_MEMORY_PERC = 15; // 15% of heap
+ public static final int DEFAULT_MAX_TENANT_MEMORY_PERC = 100;
+ public static final long DEFAULT_MAX_SERVER_CACHE_SIZE = 1024 * 1024 *
100; // 100 Mb
public static final int DEFAULT_TARGET_QUERY_CONCURRENCY = 32;
public static final int DEFAULT_MAX_QUERY_CONCURRENCY = 64;
public static final String DEFAULT_DATE_FORMAT =
DateUtil.DEFAULT_DATE_FORMAT;
@@ -178,7 +179,7 @@ public class QueryServicesOptions {
public final static int DEFAULT_MUTATE_BATCH_SIZE = 100; // Batch size for
UPSERT SELECT and DELETE
//Batch size in bytes for UPSERT, SELECT and DELETE. By default, 2MB
public final static long DEFAULT_MUTATE_BATCH_SIZE_BYTES = 2097152;
- // The only downside of it being out-of-sync is that the
parallelization of the scan won't be as balanced as it could be.
+ // The only downside of it being out-of-sync is that the parallelization
of the scan won't be as balanced as it could be.
public static final int DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS = 30000;
// 30 sec (with no activity)
public static final int
DEFAULT_MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS = 30 * 60000; // 30 minutes
public static final int DEFAULT_SCAN_CACHE_SIZE = 1000;
@@ -406,7 +407,7 @@ public class QueryServicesOptions {
public static final int DEFAULT_LOG_SALT_BUCKETS = 32;
public static final int DEFAULT_SALT_BUCKETS = 0;
- public static final boolean DEFAULT_SYSTEM_CATALOG_SPLITTABLE = true;
+ public static final boolean DEFAULT_SYSTEM_CATALOG_SPLITTABLE = true;
public static final String DEFAULT_GUIDE_POSTS_CACHE_FACTORY_CLASS =
"org.apache.phoenix.query.DefaultGuidePostsCacheFactory";
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithLimitIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithLimitIT.java
index 2f778c0fc6..377b6d7cfc 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithLimitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithLimitIT.java
@@ -17,6 +17,8 @@
*/
package org.apache.phoenix.end2end;
+import static
org.apache.phoenix.query.QueryServicesOptions.UNLIMITED_QUEUE_SIZE;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -36,6 +38,7 @@ import org.apache.phoenix.compile.ExplainPlanAttributes;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.TestUtil;
@@ -49,25 +52,25 @@ import
org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
public class QueryWithLimitIT extends BaseTest {
private String tableName;
-
- @Before
- public void generateTableName() {
- tableName = generateUniqueName();
- }
-
+ private static Map<String,String> props =
Maps.newHashMapWithExpectedSize(5);
@BeforeClass
public static synchronized void doSetup() throws Exception {
- Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
// Must update config before starting server
props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
Long.toString(50));
props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(1));
props.put(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
Integer.toString(0)); // Prevents RejectedExecutionException when creatomg
sequence table
props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(4));
props.put(QueryServices.LOG_SALT_BUCKETS_ATTRIB, Integer.toString(0));
// Prevents RejectedExecutionException when creating log table
+ }
+
+ @Before
+ public void setupDriver() throws Exception {
+ destroyDriver();
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ tableName = generateUniqueName();
}
-
+
@Test
public void testQueryWithLimitAndStats() throws Exception {
Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
@@ -106,26 +109,42 @@ public class QueryWithLimitIT extends BaseTest {
@Test
public void testQueryWithoutLimitFails() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
-
- conn.createStatement().execute("create table " + tableName + "\n" +
- " (i1 integer not null, i2 integer not null\n" +
- " CONSTRAINT pk PRIMARY KEY (i1,i2))");
- initTableValues(conn, 100);
- conn.createStatement().execute("UPDATE STATISTICS " + tableName);
-
+ Properties connProps =
PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
String query = "SELECT i1 FROM " + tableName;
+ try (Connection conn = DriverManager.getConnection(getUrl(),
connProps)) {
+
+ conn.createStatement().execute("create table " + tableName + "\n" +
+ " (i1 integer not null, i2 integer not null\n" +
+ " CONSTRAINT pk PRIMARY KEY (i1,i2))");
+ initTableValues(conn, 100);
+ conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+ try {
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ rs.next();
+ fail();
+ } catch (SQLException e) {
+ assertTrue(e.getCause() instanceof RejectedExecutionException);
+ }
+ }
+
+ // now run the same test with queue size set to unlimited
try {
- ResultSet rs = conn.createStatement().executeQuery(query);
- rs.next();
- fail();
- } catch (SQLException e) {
- assertTrue(e.getCause() instanceof RejectedExecutionException);
+ destroyDriver();
+ // copy the existing properties
+ Map<String, String> newProps = Maps.newHashMap(props);
+ newProps.put(QueryServices.QUEUE_SIZE_ATTRIB,
Integer.toString(UNLIMITED_QUEUE_SIZE));
+ setUpTestDriver(new ReadOnlyProps(newProps.entrySet().iterator()));
+ try (Connection conn = DriverManager.getConnection(getUrl(),
connProps)) {
+ // now the query should succeed
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ }
+ } finally {
+ destroyDriver();
}
- conn.close();
}
-
+
protected void initTableValues(Connection conn, int nRows) throws
Exception {
PreparedStatement stmt = conn.prepareStatement(
"upsert into " + tableName +