This is an automated email from the ASF dual-hosted git repository.
lokiore pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new d0a6358d8c PHOENIX-7502 :- Decouple principal from HAGroupInfo (#2053)
d0a6358d8c is described below
commit d0a6358d8c374e27d8b485bedb354467c28f473f
Author: Lokesh Khurana <[email protected]>
AuthorDate: Wed Jan 22 14:01:21 2025 -0800
PHOENIX-7502 :- Decouple principal from HAGroupInfo (#2053)
Co-authored-by: lokesh-khurana <[email protected]>
---
.../phoenix/jdbc/FailoverPhoenixConnection.java | 50 +++--
.../phoenix/jdbc/FailoverPhoenixContext.java | 49 +++++
.../java/org/apache/phoenix/jdbc/HAURLInfo.java | 104 ++++++++++
.../apache/phoenix/jdbc/HighAvailabilityGroup.java | 187 ++++++++++++++----
.../phoenix/jdbc/HighAvailabilityPolicy.java | 68 ++++---
.../phoenix/jdbc/ParallelPhoenixConnection.java | 18 +-
.../phoenix/jdbc/ParallelPhoenixContext.java | 10 +-
.../apache/phoenix/jdbc/PhoenixEmbeddedDriver.java | 8 +-
.../phoenix/jdbc/FailoverPhoenixConnectionIT.java | 145 ++++++++++++++
.../phoenix/jdbc/HighAvailabilityGroupIT.java | 215 +++++++++++++++++++--
.../phoenix/jdbc/HighAvailabilityGroupTestIT.java | 27 +--
.../jdbc/HighAvailabilityTestingUtility.java | 22 +++
.../phoenix/jdbc/ParallelPhoenixConnectionIT.java | 13 +-
.../jdbc/FailoverPhoenixConnectionTest.java | 22 ++-
.../jdbc/ParallelPhoenixConnectionFailureTest.java | 2 +-
.../jdbc/ParallelPhoenixConnectionTest.java | 14 +-
.../phoenix/jdbc/ParallelPhoenixContextTest.java | 13 +-
.../ParallelPhoenixNullComparingResultSetTest.java | 3 +-
.../jdbc/ParallelPhoenixPreparedStatementTest.java | 3 +-
.../phoenix/jdbc/ParallelPhoenixResultSetTest.java | 15 +-
.../phoenix/jdbc/ParallelPhoenixUtilTest.java | 5 +-
21 files changed, 833 insertions(+), 160 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java
index 2ade5ef4de..0ad691958f 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java
@@ -72,14 +72,11 @@ public class FailoverPhoenixConnection implements
PhoenixMonitoredConnection {
public static final String FAILOVER_TIMEOUT_MS_ATTR =
"phoenix.ha.failover.timeout.ms";
public static final long FAILOVER_TIMEOUT_MS_DEFAULT = 10_000;
private static final Logger LOG =
LoggerFactory.getLogger(FailoverPhoenixConnection.class);
+
/**
- * Connection properties.
- */
- private final Properties properties;
- /**
- * High availability group.
+ * Context for FailoverPhoenixConnection
*/
- private final HighAvailabilityGroup haGroup;
+ private final FailoverPhoenixContext context;
/**
* Failover policy, per connection.
*/
@@ -103,13 +100,13 @@ public class FailoverPhoenixConnection implements
PhoenixMonitoredConnection {
*/
private Map<String, Map<MetricType, Long>> previousReadMetrics = new
HashMap<>();
- public FailoverPhoenixConnection(HighAvailabilityGroup haGroup, Properties
properties)
+ public FailoverPhoenixConnection(FailoverPhoenixContext context)
throws SQLException {
- this.properties = properties;
- this.haGroup = haGroup;
- this.policy = FailoverPolicy.get(properties);
+ this.context = context;
+ this.policy = FailoverPolicy.get(context.getProperties());
this.isClosed = false;
- this.connection = haGroup.connectActive(properties);
+ this.connection =
context.getHAGroup().connectActive(context.getProperties(),
+ context.getHAURLInfo());
}
/**
@@ -171,9 +168,9 @@ public class FailoverPhoenixConnection implements
PhoenixMonitoredConnection {
void failover(long timeoutMs) throws SQLException {
checkConnection();
- if (haGroup.isActive(connection)) {
+ if (context.getHAGroup().isActive(connection)) {
LOG.info("Connection {} is against ACTIVE cluster in HA group {};
skip failing over.",
- connection.getURL(), haGroup.getGroupInfo().getName());
+ connection.getURL(),
context.getHAGroup().getGroupInfo().getName());
return;
}
@@ -183,7 +180,8 @@ public class FailoverPhoenixConnection implements
PhoenixMonitoredConnection {
while (newConn == null &&
EnvironmentEdgeManager.currentTimeMillis() < startTime +
timeoutMs) {
try {
- newConn = haGroup.connectActive(properties);
+ newConn =
context.getHAGroup().connectActive(context.getProperties(),
+ context.getHAURLInfo());
} catch (SQLException e) {
cause = e;
LOG.info("Got exception when trying to connect to active
cluster.", e);
@@ -197,7 +195,7 @@ public class FailoverPhoenixConnection implements
PhoenixMonitoredConnection {
}
if (newConn == null) {
throw new FailoverSQLException("Can not failover connection",
- haGroup.getGroupInfo().toString(), cause);
+ context.getHAGroup().getGroupInfo().toString(), cause);
}
final PhoenixConnection oldConn = connection;
@@ -217,7 +215,7 @@ public class FailoverPhoenixConnection implements
PhoenixMonitoredConnection {
oldConn.close(new SQLExceptionInfo
.Builder(SQLExceptionCode.HA_CLOSED_AFTER_FAILOVER)
.setMessage("Phoenix connection got closed due to
failover")
- .setHaGroupInfo(haGroup.getGroupInfo().toString())
+
.setHaGroupInfo(context.getHAGroup().getGroupInfo().toString())
.build()
.buildException());
} catch (SQLException e) {
@@ -226,7 +224,8 @@ public class FailoverPhoenixConnection implements
PhoenixMonitoredConnection {
}
}
}
- LOG.info("Connection {} failed over to {}", haGroup.getGroupInfo(),
connection.getURL());
+ LOG.info("Connection {} failed over to {}",
context.getHAGroup().getGroupInfo(),
+ connection.getURL());
}
/**
@@ -241,7 +240,7 @@ public class FailoverPhoenixConnection implements
PhoenixMonitoredConnection {
private void checkConnection() throws SQLException {
if (isClosed) {
throw new
SQLExceptionInfo.Builder(SQLExceptionCode.CONNECTION_CLOSED)
- .setHaGroupInfo(haGroup.getGroupInfo().toString())
+
.setHaGroupInfo(context.getHAGroup().getGroupInfo().toString())
.build()
.buildException();
}
@@ -249,7 +248,7 @@ public class FailoverPhoenixConnection implements
PhoenixMonitoredConnection {
throw new SQLExceptionInfo
.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
.setMessage("Connection has not been established to ACTIVE
HBase cluster")
- .setHaGroupInfo(haGroup.getGroupInfo().toString())
+
.setHaGroupInfo(context.getHAGroup().getGroupInfo().toString())
.build()
.buildException();
}
@@ -327,8 +326,9 @@ public class FailoverPhoenixConnection implements
PhoenixMonitoredConnection {
@VisibleForTesting
<T> T wrapActionDuringFailover(SupplierWithSQLException<T> s) throws
SQLException {
checkConnection();
- final long timeoutMs =
Long.parseLong(properties.getProperty(FAILOVER_TIMEOUT_MS_ATTR,
- String.valueOf(FAILOVER_TIMEOUT_MS_DEFAULT)));
+ final long timeoutMs = Long.parseLong(context.getProperties().
+ getProperty(FAILOVER_TIMEOUT_MS_ATTR,
+ String.valueOf(FAILOVER_TIMEOUT_MS_DEFAULT)));
int failoverCount = 0;
while (true) {
try {
@@ -642,4 +642,12 @@ public class FailoverPhoenixConnection implements
PhoenixMonitoredConnection {
interface RunWithSQLException {
void run() throws SQLException;
}
+
+ /**
+ * @return the context of a given FailoverPhoenixConnection
+ */
+ @VisibleForTesting
+ public FailoverPhoenixContext getContext() {
+ return context;
+ }
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixContext.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixContext.java
new file mode 100644
index 0000000000..873712cdf3
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixContext.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import java.util.Properties;
+
+/**
+ * FailoverPhoenixContext holds the properties and HAGroup Info for a failover
phoenix connection.
+ */
+public class FailoverPhoenixContext {
+
+ private final Properties properties;
+ private final HighAvailabilityGroup haGroup;
+ private final HAURLInfo haurlInfo;
+
+ FailoverPhoenixContext(Properties properties, HighAvailabilityGroup
haGroup,
+ HAURLInfo haurlInfo) {
+ this.properties = properties;
+ this.haGroup = haGroup;
+ this.haurlInfo = haurlInfo;
+ }
+
+ public Properties getProperties() {
+ return properties;
+ }
+
+ public HighAvailabilityGroup getHAGroup() {
+ return haGroup;
+ }
+
+ public HAURLInfo getHAURLInfo() {
+ return haurlInfo;
+ }
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAURLInfo.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAURLInfo.java
new file mode 100644
index 0000000000..5c9c4f9b39
--- /dev/null
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAURLInfo.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+
+/**
+ * An HAURLInfo contains information of an HA Url with respect of HA Group
Name.
+ * <p>
+ * It is constructed based on client input, including the JDBC connection
string and properties.
+ * Objects of this class are used to get appropriate principal and additional
JDBC parameters.
+ * <p>
+ * This class is immutable.
+ */
+
+@VisibleForTesting
+public class HAURLInfo {
+ private final String name;
+ private final String principal;
+ private final String additionalJDBCParams;
+
+ HAURLInfo(String name, String principal, String additionalJDBCParams) {
+ Preconditions.checkNotNull(name);
+ this.name = name;
+ this.principal = principal;
+ this.additionalJDBCParams = additionalJDBCParams;
+ }
+
+ HAURLInfo(String name, String principal) {
+ this(name, principal, null);
+ }
+
+ HAURLInfo(String name) {
+ this(name, null, null);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getPrincipal() {
+ return principal;
+ }
+
+ public String getAdditionalJDBCParams() {
+ return additionalJDBCParams;
+ }
+
+ @Override
+ public String toString() {
+ if (principal != null) {
+ return String.format("%s[%s]", name, principal);
+ }
+ return name;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other == this) {
+ return true;
+ }
+ if (other.getClass() != getClass()) {
+ return false;
+ }
+ HAURLInfo otherInfo = (HAURLInfo) other;
+ return new EqualsBuilder()
+ .append(name, otherInfo.name)
+ .append(principal, otherInfo.principal)
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ if (principal != null) {
+ return new HashCodeBuilder(7, 47)
+ .append(name)
+ .append(principal).hashCode();
+ }
+ return new HashCodeBuilder(7, 47).append(name).hashCode();
+ }
+
+}
\ No newline at end of file
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
index fb2137d106..c5a1c67be9 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
@@ -49,10 +49,14 @@ import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -124,8 +128,23 @@ public class HighAvailabilityGroup {
public static final long PHOENIX_HA_TRANSITION_TIMEOUT_MS_DEFAULT = 5 * 60
* 1000; // 5 mins
static final Logger LOG =
LoggerFactory.getLogger(HighAvailabilityGroup.class);
+
+ /**
+ * Two maps to store client provided info mapping to HighAvailabilityGroup.
+ * GROUPS which store HAGroupInfo (name and url of clusters where CRR
resides)
+ * to HighAvailabilityGroup mapping, which is the information required to
get roleRecord
+ * and URLS which store HAGroupInfo to HAURLInfo (name, principal) 1:n
mapping
+ * which represents a given group of clients trying to connect to a
HighAvailabilityGroup,
+ * this info is required to fetch the CQSI(s) linked to given
HighAvailabilityGroup in case
+ * of failover or a change where CQSIs needs to be closed and invalidated
+ *
+ * HAURLInfo is stored in {@link ParallelPhoenixContext} and {@link
FailoverPhoenixContext}
+ * for the current given connection
+ *
+ */
@VisibleForTesting
static final Map<HAGroupInfo, HighAvailabilityGroup> GROUPS = new
ConcurrentHashMap<>();
+ static final Map<HAGroupInfo, Set<HAURLInfo>> URLS = new
ConcurrentHashMap<>();
@VisibleForTesting
static final Cache<HAGroupInfo, Boolean> MISSING_CRR_GROUPS_CACHE =
CacheBuilder.newBuilder()
.expireAfterWrite(PHOENIX_HA_TRANSITION_TIMEOUT_MS_DEFAULT,
TimeUnit.MILLISECONDS)
@@ -195,28 +214,81 @@ public class HighAvailabilityGroup {
this.state = state;
}
- public static HAGroupInfo getHAGroupInfo(String url, Properties properties)
- throws SQLException {
- if (url.startsWith(PhoenixRuntime.JDBC_PROTOCOL)) {
- url = url.substring(PhoenixRuntime.JDBC_PROTOCOL.length() + 1);
- }
- if (!(url.contains("[") && url.contains("|") && url.contains("]"))) {
- throw new
SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL)
- .setMessage(String.format("URL %s is not a valid HA
connection string", url))
- .build()
- .buildException();
- }
+ /**
+ * Get an instance of HAURLInfo given the HA connecting URL (with "|") and
client properties.
+ * Here we do parsing of url and try to extract principal and other
additional params
+ * @throws SQLException
+ */
+ public static HAURLInfo getUrlInfo(String url, Properties properties)
throws SQLException {
+ url = checkUrl(url);
+ String principal = null;
String additionalJDBCParams = null;
int idx = url.indexOf("]");
int extraIdx = url.indexOf(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR, idx
+ 1);
if (extraIdx != -1) {
- // skip the JDBC_PROTOCOL_SEPARATOR
+ //after zk quorums there should be a separator
+ if (extraIdx != idx + 1) {
+ throw new
SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL)
+ .setMessage(String.format("URL %s is not a valid HA
connection string",
+ url))
+ .build()
+ .buildException();
+ }
additionalJDBCParams = url.substring(extraIdx + 1);
+ //Get the principal
+ extraIdx =
additionalJDBCParams.indexOf(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR);
+ if (extraIdx != -1) {
+ if (extraIdx != 0) {
+ principal = additionalJDBCParams.substring(0, extraIdx);
+ }
+ //Storing terminator as part of additional Params
+ additionalJDBCParams = additionalJDBCParams.substring(extraIdx
+ 1);
+ } else {
+ extraIdx =
additionalJDBCParams.indexOf(PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR);
+ if (extraIdx != -1) {
+ //Not storing terminator to make it consistent.
+ principal = additionalJDBCParams.substring(0, extraIdx);
+ additionalJDBCParams =
String.valueOf(PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR);
+ } else {
+ principal = additionalJDBCParams;
+ additionalJDBCParams = null;
+ }
+ }
+ } else {
+ extraIdx = url.indexOf(PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR,
idx + 1);
+ if (extraIdx != -1) {
+ //There is something in between zkquorum and terminator but no
separator(s),
+ //So not sure what it is
+ if (extraIdx != idx + 1) {
+ throw new
SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL)
+ .setMessage(String.format("URL %s is not a valid
HA connection string",
+ url))
+ .build()
+ .buildException();
+ } else {
+ additionalJDBCParams = url.substring(extraIdx);
+ }
+ }
}
- url = url.substring(url.indexOf("[") + 1, url.indexOf("]"));
- String[] urls = url.split("\\|");
+ String name = properties.getProperty(PHOENIX_HA_GROUP_ATTR);
+ if (StringUtils.isEmpty(name)) {
+ throw new
SQLExceptionInfo.Builder(SQLExceptionCode.HA_INVALID_PROPERTIES)
+ .setMessage(String.format("HA group name can not be empty
for HA URL %s", url))
+ .build()
+ .buildException();
+ }
+ HAURLInfo haurlInfo = new HAURLInfo(name, principal,
additionalJDBCParams);
+ HAGroupInfo info = getHAGroupInfo(url, properties);
+ URLS.computeIfAbsent(info, haGroupInfo -> new
HashSet<>()).add(haurlInfo);
+ return haurlInfo;
+ }
+ private static HAGroupInfo getHAGroupInfo(String url, Properties
properties)
+ throws SQLException {
+ url = checkUrl(url);
+ url = url.substring(url.indexOf("[") + 1, url.indexOf("]"));
+ String [] urls = url.split("\\|");
String name = properties.getProperty(PHOENIX_HA_GROUP_ATTR);
if (StringUtils.isEmpty(name)) {
throw new
SQLExceptionInfo.Builder(SQLExceptionCode.HA_INVALID_PROPERTIES)
@@ -224,7 +296,26 @@ public class HighAvailabilityGroup {
.build()
.buildException();
}
- return new HAGroupInfo(name, urls[0], urls[1], additionalJDBCParams);
+ return new HAGroupInfo(name, urls[0], urls[1]);
+ }
+
+ /**
+ * checks if the given url is appropriate for HA Connection
+ * @param url
+ * @return the url without protocol
+ * @throws SQLException
+ */
+ private static String checkUrl(String url) throws SQLException {
+ if (url.startsWith(PhoenixRuntime.JDBC_PROTOCOL)) {
+ url = url.substring(PhoenixRuntime.JDBC_PROTOCOL.length() + 1);
+ }
+ if (!(url.contains("[") && url.contains("|") && url.contains("]"))) {
+ throw new
SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL)
+ .setMessage(String.format("URL %s is not a valid HA
connection string", url))
+ .build()
+ .buildException();
+ }
+ return url;
}
/**
@@ -488,7 +579,7 @@ public class HighAvailabilityGroup {
* @return a JDBC connection implementation
* @throws SQLException if fails to connect a JDBC connection
*/
- public Connection connect(Properties properties) throws SQLException {
+ public Connection connect(Properties properties, HAURLInfo haurlInfo)
throws SQLException {
if (state != State.READY) {
throw new SQLExceptionInfo
.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
@@ -497,7 +588,7 @@ public class HighAvailabilityGroup {
.build()
.buildException();
}
- return roleRecord.getPolicy().provide(this, properties);
+ return roleRecord.getPolicy().provide(this, properties, haurlInfo);
}
/**
@@ -509,11 +600,12 @@ public class HighAvailabilityGroup {
* @return a Phoenix connection to current active HBase cluster
* @throws SQLException if fails to get a connection
*/
- PhoenixConnection connectActive(final Properties properties) throws
SQLException {
+ PhoenixConnection connectActive(final Properties properties, final
HAURLInfo haurlInfo)
+ throws SQLException {
try {
Optional<String> url = roleRecord.getActiveUrl();
if (state == State.READY && url.isPresent()) {
- PhoenixConnection conn = connectToOneCluster(url.get(),
properties);
+ PhoenixConnection conn = connectToOneCluster(url.get(),
properties, haurlInfo);
// After connection is created, double check if the cluster is
still ACTIVE
// This is to make sure the newly created connection will not
be returned to client
// if the target cluster is not active any more. This can
happen during failover.
@@ -575,9 +667,10 @@ public class HighAvailabilityGroup {
* <p>
* The URL should belong to one of the two ZK clusters in this HA group.
It returns the Phoenix
* connection to the given cluster without checking the context of the
cluster's role. Please
- * use {@link #connectActive(Properties)} to connect to the ACTIVE cluster.
+ * use {@link #connectActive(Properties, HAURLInfo)} to connect to the
ACTIVE cluster.
*/
- PhoenixConnection connectToOneCluster(String url, Properties properties)
throws SQLException {
+ PhoenixConnection connectToOneCluster(String url, Properties properties,
HAURLInfo haurlInfo)
+ throws SQLException {
Preconditions.checkNotNull(url);
if (url.startsWith(PhoenixRuntime.JDBC_PROTOCOL)) {
Preconditions.checkArgument(url.length() >
PhoenixRuntime.JDBC_PROTOCOL.length(),
@@ -587,7 +680,7 @@ public class HighAvailabilityGroup {
Preconditions.checkArgument(url.equals(info.getUrl1()) ||
url.equals(info.getUrl2()),
"The URL '" + url + "' does not belong to this HA group " +
info);
- String jdbcString = info.getJDBCUrl(url);
+ String jdbcString = info.getJDBCUrl(url, haurlInfo);
ClusterRole role = roleRecord.getRole(url);
if (!role.canConnect()) {
@@ -742,7 +835,8 @@ public class HighAvailabilityGroup {
* An HAGroupInfo contains information of an HA group.
* <p>
* It is constructed based on client input, including the JDBC connection
string and properties.
- * Objects of this class are used as the keys of HA group cache {@link
#GROUPS}.
+ * Objects of this class are used as the keys of HA group cache {@link
#GROUPS} and HA url info cache
+ * {@link #URLS}.
* <p>
* This class is immutable.
*/
@@ -750,9 +844,8 @@ public class HighAvailabilityGroup {
static final class HAGroupInfo {
private final String name;
private final PairOfSameType<String> urls;
- private final String additionalJDBCParams;
- HAGroupInfo(String name, String url1, String url2, String
additionalJDBCParams) {
+ HAGroupInfo(String name, String url1, String url2) {
Preconditions.checkNotNull(name);
Preconditions.checkNotNull(url1);
Preconditions.checkNotNull(url2);
@@ -766,11 +859,6 @@ public class HighAvailabilityGroup {
} else {
this.urls = new PairOfSameType<>(url1, url2);
}
- this.additionalJDBCParams = additionalJDBCParams;
- }
-
- HAGroupInfo(String name, String url1, String url2) {
- this(name, url1, url2, null);
}
public String getName() {
@@ -785,26 +873,47 @@ public class HighAvailabilityGroup {
return urls.getSecond();
}
- public String getJDBCUrl(String zkUrl) {
- Preconditions.checkArgument(zkUrl.equals(getUrl1()) ||
zkUrl.equals(getUrl2()),
+
+ public String getJDBCUrl(String zkUrl, HAURLInfo haURLInfo) {
+ Preconditions.checkArgument(zkUrl.equals(getUrl1()) ||
zkUrl.equals(getUrl2())
+ || zkUrl.equals("[" + getUrl1() + "|" + getUrl2()
+ "]")
+ || zkUrl.equals("[" + getUrl2() + "|" + getUrl1()
+ "]"),
"The URL '" + zkUrl + "' does not belong to this HA group
" + this);
StringBuilder sb = new StringBuilder();
sb.append(PhoenixRuntime.JDBC_PROTOCOL_ZK);
sb.append(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR);
sb.append(zkUrl);
- if (!Strings.isNullOrEmpty(additionalJDBCParams)) {
- sb.append(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR);
- sb.append(additionalJDBCParams);
+ if (haURLInfo != null) {
+ if (!Strings.isNullOrEmpty(haURLInfo.getPrincipal())
+ &&
!Strings.isNullOrEmpty(haURLInfo.getAdditionalJDBCParams())) {
+ sb.append(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR);
+ sb.append(haURLInfo.getPrincipal());
+ if (!haURLInfo.getAdditionalJDBCParams().
+
equals(String.valueOf(PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR))) {
+ sb.append(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR);
+ }
+ sb.append(haURLInfo.getAdditionalJDBCParams());
+ } else if (!Strings.isNullOrEmpty(haURLInfo.getPrincipal())) {
+ sb.append(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR);
+ sb.append(haURLInfo.getPrincipal());
+ } else if
(!Strings.isNullOrEmpty(haURLInfo.getAdditionalJDBCParams())) {
+ if (!haURLInfo.getAdditionalJDBCParams().
+
equals(String.valueOf(PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR))) {
+ sb.append(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR);
+ sb.append(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR);
+ }
+ sb.append(haURLInfo.getAdditionalJDBCParams());
+ }
}
return sb.toString();
}
- public String getJDBCUrl1() {
- return getJDBCUrl(getUrl1());
+ public String getJDBCUrl1(HAURLInfo haURLInfo) {
+ return getJDBCUrl(getUrl1(), haURLInfo);
}
- public String getJDBCUrl2() {
- return getJDBCUrl(getUrl2());
+ public String getJDBCUrl2(HAURLInfo haURLInfo) {
+ return getJDBCUrl(getUrl2(), haURLInfo);
}
/**
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java
index 93d2661864..7a82d47a93 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java
@@ -39,9 +39,10 @@ import org.slf4j.LoggerFactory;
enum HighAvailabilityPolicy {
FAILOVER {
@Override
- public Connection provide(HighAvailabilityGroup haGroup, Properties
info)
- throws SQLException {
- return new FailoverPhoenixConnection(haGroup, info);
+ public Connection provide(HighAvailabilityGroup haGroup, Properties
info,
+ HAURLInfo haURLInfo) throws SQLException {
+ FailoverPhoenixContext context = new FailoverPhoenixContext(info,
haGroup, haURLInfo);
+ return new FailoverPhoenixConnection(context);
}
@Override
void transitClusterRole(HighAvailabilityGroup haGroup,
ClusterRoleRecord oldRecord,
@@ -65,50 +66,61 @@ enum HighAvailabilityPolicy {
LOG.info("Cluster {} becomes STANDBY in HA group {}, now close all
its connections",
zkUrl, haGroup.getGroupInfo());
ConnectionQueryServices cqs = null;
- try {
- cqs = PhoenixDriver.INSTANCE.getConnectionQueryServices(
- haGroup.getGroupInfo().getJDBCUrl(zkUrl),
haGroup.getProperties());
- cqs.closeAllConnections(new SQLExceptionInfo
- .Builder(SQLExceptionCode.HA_CLOSED_AFTER_FAILOVER)
- .setMessage("Phoenix connection got closed due to
failover")
- .setHaGroupInfo(haGroup.getGroupInfo().toString()));
- LOG.info("Closed all connections to cluster {} for HA group
{}", zkUrl,
- haGroup.getGroupInfo());
- } finally {
- if (cqs != null) {
- // CQS is closed but it is not invalidated from global
cache in PhoenixDriver
- // so that any new connection will get error instead of
creating a new CQS
- LOG.info("Closing CQS after cluster '{}' becomes STANDBY",
zkUrl);
- cqs.close();
- LOG.info("Successfully closed CQS after cluster '{}'
becomes STANDBY", zkUrl);
+
+ //Close connections for every HAURLInfo's (different principal)
conn for a give HAGroup
+ for (HAURLInfo haurlInfo :
HighAvailabilityGroup.URLS.get(haGroup.getGroupInfo())) {
+ try {
+ String jdbcZKUrl =
haGroup.getGroupInfo().getJDBCUrl(zkUrl, haurlInfo);
+ cqs = PhoenixDriver.INSTANCE.getConnectionQueryServices(
+ jdbcZKUrl, haGroup.getProperties());
+ cqs.closeAllConnections(new SQLExceptionInfo
+ .Builder(SQLExceptionCode.HA_CLOSED_AFTER_FAILOVER)
+ .setMessage("Phoenix connection got closed due to
failover")
+
.setHaGroupInfo(haGroup.getGroupInfo().toString()));
+ LOG.info("Closed all connections to cluster {} for HA
group {}",
+ jdbcZKUrl, haGroup.getGroupInfo());
+ } finally {
+ if (cqs != null) {
+ //CQS is closed but it is not invalidated from global
cache in PhoenixDriver
+ //so that any new connection will get error instead of
creating a new CQS
+ LOG.info("Closing CQS after cluster '{}' becomes
STANDBY", zkUrl);
+ cqs.close();
+ LOG.info("Successfully closed CQS after cluster '{}'
becomes STANDBY",
+ zkUrl);
+ }
}
}
}
+
private void transitActive(HighAvailabilityGroup haGroup, String zkUrl)
throws SQLException {
// Invalidate CQS cache if any that has been closed but has not
been cleared
- LOG.info("invalidating cqs cache for zkUrl: " + zkUrl);
-
PhoenixDriver.INSTANCE.invalidateCache(haGroup.getGroupInfo().getJDBCUrl(zkUrl),
- haGroup.getProperties());
+ for (HAURLInfo haurlInfo :
HighAvailabilityGroup.URLS.get(haGroup.getGroupInfo())) {
+ String jdbcZKUrl = haGroup.getGroupInfo().getJDBCUrl(zkUrl,
haurlInfo);
+ LOG.info("invalidating cqs cache for zkUrl: " + jdbcZKUrl);
+ PhoenixDriver.INSTANCE.invalidateCache(jdbcZKUrl,
+ haGroup.getProperties());
+ }
}
},
PARALLEL {
@Override
- public Connection provide(HighAvailabilityGroup haGroup, Properties
info)
- throws SQLException {
+ public Connection provide(HighAvailabilityGroup haGroup, Properties
info,
+ HAURLInfo haURLInfo) throws SQLException {
List<Boolean> executorCapacities =
PhoenixHAExecutorServiceProvider.hasCapacity(info);
if (executorCapacities.contains(Boolean.TRUE)) {
ParallelPhoenixContext context =
new ParallelPhoenixContext(info, haGroup,
- PhoenixHAExecutorServiceProvider.get(info),
executorCapacities);
+ PhoenixHAExecutorServiceProvider.get(info),
+ executorCapacities, haURLInfo);
return new ParallelPhoenixConnection(context);
} else {
// TODO: Once we have operation/primary wait timeout use the
same
// Give regular connection or a failover connection?
LOG.warn("Falling back to single phoenix connection due to
resource constraints");
GlobalClientMetrics.GLOBAL_HA_PARALLEL_CONNECTION_FALLBACK_COUNTER.increment();
- return haGroup.connectActive(info);
+ return haGroup.connectActive(info, haURLInfo);
}
}
@Override
@@ -125,10 +137,12 @@ enum HighAvailabilityPolicy {
*
* @param haGroup The high availability (HA) group
* @param info Connection properties
+ * @param haurlInfo additional info of client provided url
* @return a JDBC connection
* @throws SQLException if fails to provide a connection
*/
- abstract Connection provide(HighAvailabilityGroup haGroup, Properties
info) throws SQLException;
+ abstract Connection provide(HighAvailabilityGroup haGroup, Properties
info, HAURLInfo haurlInfo)
+ throws SQLException;
/**
* Call-back function when a cluster role transition is detected in the
high availability group.
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixConnection.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixConnection.java
index 3184af7adf..c067027493 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixConnection.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixConnection.java
@@ -69,14 +69,15 @@ public class ParallelPhoenixConnection implements
PhoenixMonitoredConnection {
CompletableFuture<PhoenixConnection> futureConnection2;
public ParallelPhoenixConnection(ParallelPhoenixContext context) throws
SQLException {
this.context = context;
- LOG.trace("First Url: {} Second Url: {}",
context.getHaGroup().getGroupInfo().getJDBCUrl1(),
- context.getHaGroup().getGroupInfo().getJDBCUrl2());
+ LOG.trace("First Url: {} Second Url: {}",
context.getHaGroup().getGroupInfo().
+ getJDBCUrl1(context.getHaurlInfo()),
context.getHaGroup().getGroupInfo().
+ getJDBCUrl2(context.getHaurlInfo()));
futureConnection1 = context.chainOnConn1(() ->
getConnection(context.getHaGroup(),
- context.getHaGroup().getGroupInfo().getJDBCUrl1(),
- context.getProperties()));
+
context.getHaGroup().getGroupInfo().getJDBCUrl1(context.getHaurlInfo()),
+ context.getProperties(), context.getHaurlInfo()));
futureConnection2 = context.chainOnConn2(() ->
getConnection(context.getHaGroup(),
- context.getHaGroup().getGroupInfo().getJDBCUrl2(),
- context.getProperties()));
+
context.getHaGroup().getGroupInfo().getJDBCUrl2(context.getHaurlInfo()),
+ context.getProperties(), context.getHaurlInfo()));
// Ensure one connection is successful before returning
ParallelPhoenixUtil.INSTANCE.runFutures(Arrays.asList(futureConnection1,
futureConnection2), context, false);
@@ -91,9 +92,10 @@ public class ParallelPhoenixConnection implements
PhoenixMonitoredConnection {
ParallelPhoenixUtil.INSTANCE.runFutures(Arrays.asList(futureConnection1,
futureConnection2), context, false);
}
- private static PhoenixConnection getConnection(HighAvailabilityGroup
haGroup, String url, Properties properties) {
+ private static PhoenixConnection getConnection(HighAvailabilityGroup
haGroup, String url,
+ Properties properties,
HAURLInfo haurlInfo) {
try {
- return haGroup.connectToOneCluster(url, properties);
+ return haGroup.connectToOneCluster(url, properties, haurlInfo);
} catch (SQLException exception) {
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Failed to get a connection for
haGroup %s to %s", haGroup.toString(), url), exception);
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixContext.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixContext.java
index 567abad2dd..7d496dd4c6 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixContext.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixContext.java
@@ -57,6 +57,7 @@ public class ParallelPhoenixContext {
private final Properties properties;
private final HighAvailabilityGroup haGroup;
+ private final HAURLInfo haurlInfo;
private final long operationTimeoutMs;
private volatile boolean isClosed = false;
@@ -72,12 +73,15 @@ public class ParallelPhoenixContext {
* @param executorCapacities Ordered list of executorCapacities
corresponding to executors. Null is interpreted as
* executors having capacity
*/
- ParallelPhoenixContext(Properties properties, HighAvailabilityGroup
haGroup,
List<PhoenixHAExecutorServiceProvider.PhoenixHAClusterExecutorServices>
executors, List<Boolean> executorCapacities) {
+ ParallelPhoenixContext(Properties properties, HighAvailabilityGroup
haGroup,
+
List<PhoenixHAExecutorServiceProvider.PhoenixHAClusterExecutorServices>
executors,
+ List<Boolean> executorCapacities, HAURLInfo
haurlInfo) {
Preconditions.checkNotNull(executors);
Preconditions.checkArgument(executors.size() >= 2, "Expected 2
executor pairs, one for each connection with a normal/close executor");
GLOBAL_HA_PARALLEL_CONNECTION_CREATED_COUNTER.increment();
this.properties = properties;
this.haGroup = haGroup;
+ this.haurlInfo = haurlInfo;
this.parallelPhoenixMetrics = new ParallelPhoenixMetrics();
this.operationTimeoutMs = getOperationTimeoutMs(properties);
@@ -124,6 +128,10 @@ public class ParallelPhoenixContext {
return haGroup;
}
+ public HAURLInfo getHaurlInfo() {
+ return haurlInfo;
+ }
+
public boolean isAutoCommit() {
return Boolean.valueOf((String)
properties.getOrDefault(AUTO_COMMIT_ATTRIB, "false"));
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
index f4de1ae779..2ea63fb694 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
@@ -135,16 +135,18 @@ public abstract class PhoenixEmbeddedDriver implements
Driver, SQLCloseable {
Properties augmentedInfo = PropertiesUtil.deepCopy(info);
augmentedInfo.putAll(getDefaultProps().asMap());
if (url.contains("|")) {
+ // Get HAURLInfo to pass it to connection creation
+ HAURLInfo haurlInfo = HighAvailabilityGroup.getUrlInfo(url,
augmentedInfo);
// High availability connection using two clusters
Optional<HighAvailabilityGroup> haGroup =
HighAvailabilityGroup.get(url, augmentedInfo);
if (haGroup.isPresent()) {
- return haGroup.get().connect(augmentedInfo);
+ return haGroup.get().connect(augmentedInfo, haurlInfo);
} else {
// If empty HA group is returned, fall back to single cluster.
url =
HighAvailabilityGroup.getFallbackCluster(url,
info).orElseThrow(
- () -> new SQLException(
- "HA group can not be initialized, fallback
to single cluster"));
+ () -> new SQLException(
+ "HA group can not be initialized,
fallback to single cluster"));
}
}
ConnectionQueryServices cqs = getConnectionQueryServices(url,
augmentedInfo);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java
index 890529c68e..29c5f2f1b0 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.jdbc;
import static org.apache.hadoop.test.GenericTestUtils.waitFor;
import static
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION;
+import static org.apache.phoenix.jdbc.HighAvailabilityGroup.URLS;
import static
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithConnection;
import static
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair;
import static
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR;
@@ -46,7 +47,10 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.RandomUtils;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.exception.FailoverSQLException;
import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
@@ -56,6 +60,7 @@ import org.apache.phoenix.query.ConnectionQueryServicesImpl;
import org.apache.phoenix.util.PhoenixRuntime;
import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -592,6 +597,138 @@ public class FailoverPhoenixConnectionIT {
assertTrue(PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn).isEmpty());
}
+ /**
+ * Test transit cluster role record which should affect all the principals
for a given HAGroup
+ */
+ @Test(timeout = 300000)
+ public void testAllConnectionsOfHAIsAffected() throws Exception {
+ Connection conn = createFailoverConnection();
+ PhoenixConnection wrappedConn = ((FailoverPhoenixConnection)
conn).getWrappedConnection();
+
+ //Create another connection with same params except different principal
+ //This should use same haGroup as default one and transiting that
haGroup should affect this conn as well.
+ String principal = RandomStringUtils.randomAlphabetic(5);
+ Connection conn2 =
DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(principal), clientProperties);
+ PhoenixConnection wrappedConn2 = ((FailoverPhoenixConnection)
conn2).getWrappedConnection();
+
+ // Following we create a new HA group and create a connection against
this HA group with default PRINCIPAL
+ String haGroupName2 = haGroup.getGroupInfo().getName() + "2";
+ CLUSTERS.initClusterRole(haGroupName2,
HighAvailabilityPolicy.FAILOVER);
+ Properties clientProperties2 = new Properties(clientProperties);
+ clientProperties2.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2);
+ Connection conn3 =
DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(), clientProperties2);
+ PhoenixConnection wrappedConn3 = ((FailoverPhoenixConnection)
conn3).getWrappedConnection();
+
+ //Create another connection with haGroup2 with same principal as for
conn2 with haGroup, which should not be
+ //affected by transiting haGroup
+ Connection conn4 =
DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(principal),
clientProperties2);
+ PhoenixConnection wrappedConn4 = ((FailoverPhoenixConnection)
conn4).getWrappedConnection();
+
+ assertFalse(wrappedConn.isClosed());
+ assertFalse(wrappedConn2.isClosed());
+ assertFalse(wrappedConn3.isClosed());
+ assertFalse(wrappedConn4.isClosed());
+
+ CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY,
ClusterRole.ACTIVE);
+
+ assertTrue(wrappedConn.isClosed());
+ assertTrue(wrappedConn2.isClosed());
+ assertFalse(wrappedConn3.isClosed()); //only connection with haGroup
will be closed irrespective of principal
+ assertFalse(wrappedConn4.isClosed());
+
+ }
+
+ @Test(timeout = 300000)
+ public void testUserPrincipal() throws Exception {
+ Connection conn = createFailoverConnection(); //PRINCIPAL, haGroupName
+ FailoverPhoenixConnection fconn = (FailoverPhoenixConnection) conn;
+ ConnectionQueryServices cqsi =
PhoenixDriver.INSTANCE.getConnectionQueryServices(CLUSTERS.getJdbcUrl1(),
clientProperties);
+
+ String haGroupName2 = testName.getMethodName() +
RandomStringUtils.randomAlphabetic(3);;
+ CLUSTERS.initClusterRole(haGroupName2,
HighAvailabilityPolicy.FAILOVER);
+ clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2);
+ Connection conn2 =
DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(), clientProperties);
//PRINCIPAL,haGroupName2
+ FailoverPhoenixConnection fconn2 = (FailoverPhoenixConnection) conn2;
+ ConnectionQueryServices cqsi2 =
PhoenixDriver.INSTANCE.getConnectionQueryServices(CLUSTERS.getJdbcUrl1(),
clientProperties);
+
+ Connection conn3 =
DriverManager.getConnection(CLUSTERS.getJdbcHAUrlWithoutPrincipal(),
clientProperties); //null,haGroupName2
+ FailoverPhoenixConnection fconn3 = (FailoverPhoenixConnection) conn3;
+ ConnectionQueryServices cqsi3 =
PhoenixDriver.INSTANCE.getConnectionQueryServices(CLUSTERS.
+ getJdbcUrlWithoutPrincipal(CLUSTERS.getUrl1()),
clientProperties);
+
+ clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName);
+ String principal4 = RandomStringUtils.randomAlphabetic(5);
+ Connection conn4 =
DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(principal4),
clientProperties);//principal4, haGroupName
+ FailoverPhoenixConnection fconn4 = (FailoverPhoenixConnection) conn4;
+ ConnectionQueryServices cqsi4 =
PhoenixDriver.INSTANCE.getConnectionQueryServices(CLUSTERS.getJdbcUrl1(principal4),
clientProperties);
+
+ //Check wrapped connection urls
+ Assert.assertEquals(CLUSTERS.getJdbcUrl1(),
fconn.getWrappedConnection().getURL());
+ Assert.assertEquals(CLUSTERS.getJdbcUrl1(),
fconn2.getWrappedConnection().getURL());
+
Assert.assertEquals(CLUSTERS.getJdbcUrlWithoutPrincipal(CLUSTERS.getUrl1()),
fconn3.getWrappedConnection().getURL());
+ Assert.assertEquals(CLUSTERS.getJdbcUrl1(principal4),
fconn4.getWrappedConnection().getURL());
+
+ //Check cqsi objects should be same with what we get from connections
+
Assert.assertEquals(HBaseTestingUtilityPair.PRINCIPAL,cqsi.getUserName());
+ Assert.assertSame(cqsi,
fconn.getWrappedConnection().getQueryServices());
+
+
Assert.assertEquals(HBaseTestingUtilityPair.PRINCIPAL,cqsi2.getUserName());
+ Assert.assertSame(cqsi2,
fconn2.getWrappedConnection().getQueryServices());
+
+ Assert.assertNull(cqsi3.getUserName());
+ Assert.assertSame(cqsi3,
fconn3.getWrappedConnection().getQueryServices());
+
+ Assert.assertEquals(principal4,cqsi4.getUserName());
+ Assert.assertSame(cqsi4,
fconn4.getWrappedConnection().getQueryServices());
+
+ }
+
+ @Test(timeout = 300000)
+ public void testHAGroupMappingsWithDifferentPrincipalsOnDifferentThreads()
throws Exception {
+ int numThreads = RandomUtils.nextInt(3, 5);
+ List<Thread> connectionThreads = new ArrayList<>(numThreads);
+ AtomicBoolean isPrincipalNull = new AtomicBoolean(false);
+ //Creating random number of connections one connection per thread with
different principal
+ //Including one connection will null principal all of them will be
using given haGroupName
+ //which is specific to test
+ for (int i = 0; i < numThreads; i++) {
+ isPrincipalNull.set((i + 1) % 3 == 0);
+ connectionThreads.add(new Thread(() -> {
+ try {
+ createConnectionWithRandomPrincipal(isPrincipalNull.get());
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }));
+ }
+
+ //Create multiple connections with given principal
+ String principal = RandomStringUtils.randomAlphabetic(3);
+ int numConnectionsWithSamePrincipal = 3;
+ for (int i = 0; i < numConnectionsWithSamePrincipal; i++) {
+ connectionThreads.add(new Thread(() -> {
+ try {
+
DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(principal), clientProperties);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }));
+ }
+
+ for (Thread connectionThread : connectionThreads) {
+ connectionThread.start();
+ }
+
+ for (Thread connectionThread : connectionThreads) {
+ connectionThread.join();
+ }
+
+ //For the given ha group of current test the value in URLS set for
current haGroupInfo
+ //should be numThreads + 1 as all the connections created with same
principal should have
+ //one entry in map.
+ Assert.assertEquals(numThreads + 1,
URLS.get(haGroup.getGroupInfo()).size());
+ }
+
/**
* Helper method to verify that the failover connection has expected
mutation metrics.
*
@@ -636,4 +773,12 @@ public class FailoverPhoenixConnectionIT {
LOG.info("Got expected failover exception after connection is
closed.", e);
} // all other type of exception will fail this test.
}
+
+ private Connection createConnectionWithRandomPrincipal(boolean
isPrincipalNull) throws SQLException {
+ String principal = RandomStringUtils.randomAlphabetic(5);
+ if (isPrincipalNull) {
+ return
DriverManager.getConnection(CLUSTERS.getJdbcHAUrlWithoutPrincipal(),
clientProperties);
+ }
+ return DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(principal),
clientProperties);
+ }
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java
index f7a99ba92a..5f1b86fdaa 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java
@@ -26,6 +26,7 @@ import static
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasic
import static
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.getHighAvailibilityGroup;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
@@ -36,6 +37,7 @@ import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@@ -76,6 +78,7 @@ public class HighAvailabilityGroupIT {
private String jdbcUrl;
/** Failover HA group for to test. */
private HighAvailabilityGroup haGroup;
+ private HAURLInfo haURLInfo;
/** HA Group name for this test. */
private String haGroupName;
@@ -106,6 +109,7 @@ public class HighAvailabilityGroupIT {
// Make first cluster ACTIVE
CLUSTERS.initClusterRole(haGroupName, HighAvailabilityPolicy.FAILOVER);
jdbcUrl = CLUSTERS.getJdbcHAUrl();
+ haURLInfo = HighAvailabilityGroup.getUrlInfo(jdbcUrl,
clientProperties);
haGroup = getHighAvailibilityGroup(jdbcUrl,clientProperties);
}
@@ -164,6 +168,185 @@ public class HighAvailabilityGroupIT {
}
}
+ /**
+ * Test HAGroup.get() method to get same HAGroups if we have different
principals only and different HAGroups
+ * if anything else in the key changes
+ * @throws Exception
+ */
+ @Test
+ public void testGetWithDifferentPrincipals() throws Exception {
+ //Client will get same HAGroup if we have difference in principal only
but JDBCURLs should have different
+ //principals
+ assertEquals(CLUSTERS.getJdbcUrl1(),
haGroup.getGroupInfo().getJDBCUrl(CLUSTERS.getUrl1(), haURLInfo));
+ assertEquals(CLUSTERS.getJdbcUrl2(),
haGroup.getGroupInfo().getJDBCUrl(CLUSTERS.getUrl2(), haURLInfo));
+ assertEquals(CLUSTERS.getJdbcHAUrl(), haGroup.getGroupInfo().
+ getJDBCUrl(String.format("[%s|%s]", CLUSTERS.getUrl1(),
CLUSTERS.getUrl2()), haURLInfo));
+
+ //Try creating new HAGroup with same params except Principal
+ Optional<HighAvailabilityGroup> haGroup2 = Optional.empty();
+ try {
+ String principal = RandomStringUtils.randomAlphabetic(5);
+ String haUrl2 = CLUSTERS.getJdbcHAUrl(principal);
+ HAURLInfo haURLInfo2 = HighAvailabilityGroup.getUrlInfo(haUrl2,
clientProperties);
+ haGroup2 = HighAvailabilityGroup.get(haUrl2, clientProperties);
+ assertTrue(haGroup2.isPresent());
+ //We should get same HAGroup as we have mapping of <HAGroupName,
urls> -> HAGroup
+ assertSame(haGroup, haGroup2.get());
+
+ //URLs we are getting for haGroup2 should have newer principal
i.e. Current HAURLInfo should have new
+ //principal instead default PRINCIPAL
+ assertEquals(CLUSTERS.getJdbcUrl(CLUSTERS.getUrl1(), principal),
+
haGroup2.get().getGroupInfo().getJDBCUrl(CLUSTERS.getUrl1(), haURLInfo2));
+ assertEquals(CLUSTERS.getJdbcUrl(CLUSTERS.getUrl2(), principal),
+
haGroup2.get().getGroupInfo().getJDBCUrl(CLUSTERS.getUrl2(), haURLInfo2));
+ assertEquals(CLUSTERS.getJdbcHAUrl(principal),
haGroup2.get().getGroupInfo().
+ getJDBCUrl(String.format("[%s|%s]", CLUSTERS.getUrl1(),
CLUSTERS.getUrl2()), haURLInfo2));
+ } finally {
+ haGroup2.ifPresent(HighAvailabilityGroup::close);
+ }
+
+ // Client will get a different HighAvailabilityGroup when group name
is different and with same principal as
+ // default HAGroup
+ String haGroupName3 = testName.getMethodName() +
RandomStringUtils.randomAlphabetic(3);
+ CLUSTERS.initClusterRole(haGroupName3,
HighAvailabilityPolicy.FAILOVER);
+ clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName3);
+ Optional<HighAvailabilityGroup> haGroup3 = Optional.empty();
+ Optional<HighAvailabilityGroup> haGroup4 = Optional.empty();
+ try {
+ HAURLInfo haurlInfo3 = HighAvailabilityGroup.getUrlInfo(jdbcUrl,
clientProperties);
+ haGroup3 = HighAvailabilityGroup.get(jdbcUrl, clientProperties);
+ assertTrue(haGroup3.isPresent());
+ assertNotSame(haGroup, haGroup3.get());
+
+ assertNotSame(haGroup.getGroupInfo(),
haGroup3.get().getGroupInfo());
+
+ //URLs we are getting for haGroup3 should have same principal as
default PRINCIPAL.
+ assertEquals(CLUSTERS.getJdbcUrl1(),
+
haGroup3.get().getGroupInfo().getJDBCUrl(CLUSTERS.getUrl1(), haurlInfo3));
+ assertEquals(CLUSTERS.getJdbcUrl2(),
+
haGroup3.get().getGroupInfo().getJDBCUrl(CLUSTERS.getUrl2(), haurlInfo3));
+ assertEquals(CLUSTERS.getJdbcHAUrl(),
haGroup3.get().getGroupInfo().
+ getJDBCUrl(String.format("[%s|%s]", CLUSTERS.getUrl1(),
CLUSTERS.getUrl2()), haurlInfo3));
+
+ // should get same ha Group without principal
+ String haUrl4 = CLUSTERS.getJdbcHAUrlWithoutPrincipal();
+ HAURLInfo haURLInfo4 = HighAvailabilityGroup.getUrlInfo(haUrl4,
clientProperties);
+ haGroup4 = HighAvailabilityGroup.get(haUrl4, clientProperties);
+ assertTrue(haGroup4.isPresent());
+ assertNotSame(haGroup, haGroup4.get());
+ assertSame(haGroup3.get(), haGroup4.get());
+
+ assertNotSame(haGroup.getGroupInfo(),
haGroup4.get().getGroupInfo());
+ assertSame(haGroup3.get().getGroupInfo(),
haGroup4.get().getGroupInfo());
+ assertNotEquals(haurlInfo3, haURLInfo4);
+
+ } finally {
+ haGroup3.ifPresent(HighAvailabilityGroup::close);
+ haGroup4.ifPresent(HighAvailabilityGroup::close);
+ }
+
+ // Client will get the same HighAvailabilityGroup using the same
information as key again
+ clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR,
haGroup.getGroupInfo().getName());
+ Optional<HighAvailabilityGroup> haGroup5 = Optional.empty();
+ try {
+ //Again using a random principal which should be used now for
generating jdbcUrls
+ String principal = RandomStringUtils.randomAlphabetic(5);
+ String haUrl5 = CLUSTERS.getJdbcHAUrl(principal);
+ HAURLInfo haURLInfo5 = HighAvailabilityGroup.getUrlInfo(haUrl5,
clientProperties);
+ haGroup5 = HighAvailabilityGroup.get(haUrl5, clientProperties);
+ assertTrue(haGroup5.isPresent());
+ assertSame(haGroup, haGroup5.get());
+
+ //URLs we are getting for haGroup4 should have newer principal
i.e. Current HAURLInfo should have new
+ //principal instead default PRINCIPAL
+ assertEquals(CLUSTERS.getJdbcUrl(CLUSTERS.getUrl1(), principal),
+
haGroup4.get().getGroupInfo().getJDBCUrl(CLUSTERS.getUrl1(), haURLInfo5));
+ assertEquals(CLUSTERS.getJdbcUrl(CLUSTERS.getUrl2(), principal),
+
haGroup4.get().getGroupInfo().getJDBCUrl(CLUSTERS.getUrl2(), haURLInfo5));
+ assertEquals(CLUSTERS.getJdbcHAUrl(principal),
haGroup4.get().getGroupInfo().
+ getJDBCUrl(String.format("[%s|%s]", CLUSTERS.getUrl1(),
CLUSTERS.getUrl2()), haURLInfo5));
+
+ } finally {
+ haGroup5.ifPresent(HighAvailabilityGroup::close);
+ }
+
+ }
+
+ @Test
+ public void testHAGroupMappings() throws Exception {
+
+ //Try creating new HAGroup with same params except Principal
+ Optional<HighAvailabilityGroup> haGroup2 = Optional.empty();
+ try {
+ String principal = RandomStringUtils.randomAlphabetic(5);
+ String haUrl2 = CLUSTERS.getJdbcHAUrl(principal);
+ HAURLInfo haURLInfo2 = HighAvailabilityGroup.getUrlInfo(haUrl2,
clientProperties);
+ haGroup2 = HighAvailabilityGroup.get(haUrl2, clientProperties);
+ assertTrue(haGroup2.isPresent());
+ //We should get same HAGroup as we have mapping of <HAGroupName,
urls> -> HAGroup
+ assertSame(haGroup, haGroup2.get());
+ //We should have 2 values on URLS mapping for the given
haGroup/haGroup2.
+ assertEquals(2, URLS.get(haGroup.getGroupInfo()).size());
+ assertTrue(URLS.get(haGroup.getGroupInfo()).contains(haURLInfo2));
+
+ } finally {
+ haGroup2.ifPresent(HighAvailabilityGroup::close);
+ }
+
+ //Create 2 more different urls connecting to a different HAGroup
+ String haGroupName3 = testName.getMethodName() +
RandomStringUtils.randomAlphabetic(3);
+ CLUSTERS.initClusterRole(haGroupName3,
HighAvailabilityPolicy.FAILOVER);
+ clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName3);
+ Optional<HighAvailabilityGroup> haGroup3 = Optional.empty();
+ Optional<HighAvailabilityGroup> haGroup4 = Optional.empty();
+ try {
+ HAURLInfo haurlInfo3 = HighAvailabilityGroup.getUrlInfo(jdbcUrl,
clientProperties);
+ haGroup3 = HighAvailabilityGroup.get(jdbcUrl, clientProperties);
+ assertTrue(haGroup3.isPresent());
+ assertNotSame(haGroup, haGroup3.get());
+ assertNotSame(haGroup.getGroupInfo(),
haGroup3.get().getGroupInfo());
+ assertEquals(1, URLS.get(haGroup3.get().getGroupInfo()).size());
+
+
+ // should get same ha Group without principal
+ String haUrl4 = CLUSTERS.getJdbcHAUrlWithoutPrincipal();
+ HAURLInfo haURLInfo4 = HighAvailabilityGroup.getUrlInfo(haUrl4,
clientProperties);
+ haGroup4 = HighAvailabilityGroup.get(haUrl4, clientProperties);
+ assertTrue(haGroup4.isPresent());
+ assertNotSame(haGroup, haGroup4.get());
+ assertSame(haGroup3.get(), haGroup4.get());
+ assertEquals(2, URLS.get(haGroup4.get().getGroupInfo()).size());
+
+ assertNotEquals(haurlInfo3, haURLInfo4);
+
+ } finally {
+ haGroup3.ifPresent(HighAvailabilityGroup::close);
+ haGroup4.ifPresent(HighAvailabilityGroup::close);
+ }
+
+ // Client will get the same HighAvailabilityGroup using the same
information as key again
+ clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR,
haGroup.getGroupInfo().getName());
+ Optional<HighAvailabilityGroup> haGroup5 = Optional.empty();
+ try {
+ String haUrl5 = CLUSTERS.getJdbcHAUrl();
+ HAURLInfo haURLInfo5 = HighAvailabilityGroup.getUrlInfo(haUrl5,
clientProperties);
+ haGroup5 = HighAvailabilityGroup.get(haUrl5, clientProperties);
+ assertTrue(haGroup5.isPresent());
+ assertSame(haGroup, haGroup5.get());
+
+ //haURLInfo5 should be same as global one and URLS mapping should
not change so
+ //set mapping for global HAGroupInfo should have 2 values
+ assertEquals(2, URLS.get(haGroup.getGroupInfo()).size());
+ assertTrue(URLS.get(haGroup.getGroupInfo()).contains(haURLInfo5));
+ assertEquals(haURLInfo5, haURLInfo);
+
+
+ } finally {
+ haGroup5.ifPresent(HighAvailabilityGroup::close);
+ }
+
+ }
+
/**
* Test that HA group should see latest version of cluster role record.
*/
@@ -317,7 +500,7 @@ public class HighAvailabilityGroupIT {
*/
@Test
public void testConnect() throws SQLException {
- Connection connection = haGroup.connect(clientProperties);
+ Connection connection = haGroup.connect(clientProperties, haURLInfo);
assertNotNull(connection);
assertNotNull(connection.unwrap(FailoverPhoenixConnection.class));
}
@@ -328,11 +511,11 @@ public class HighAvailabilityGroupIT {
@Test
public void testConnectToOneCluster() throws SQLException {
final String url = CLUSTERS.getJdbcUrl1();
- PhoenixConnection connection = haGroup.connectToOneCluster(url,
clientProperties);
+ PhoenixConnection connection = haGroup.connectToOneCluster(url,
clientProperties, haURLInfo);
assertEquals(url, connection.getURL());
try {
- haGroup.connectToOneCluster(null, clientProperties);
+ haGroup.connectToOneCluster(null, clientProperties, haURLInfo);
fail("Should have failed since null is not in any HA group");
} catch (Exception e) {
LOG.info("Got expected exception with invalid null host url", e);
@@ -341,7 +524,7 @@ public class HighAvailabilityGroupIT {
final String randomHostUrl = String.format("%s:%d",
RandomStringUtils.randomAlphabetic(4),
RandomUtils.nextInt(0,65536));
try {
- haGroup.connectToOneCluster(randomHostUrl, clientProperties);
+ haGroup.connectToOneCluster(randomHostUrl, clientProperties,
haURLInfo);
fail("Should have failed since '" + randomHostUrl + "' is not in
HA group " + haGroup);
} catch (IllegalArgumentException e) {
LOG.info("Got expected exception with invalid host url '{}'",
randomHostUrl, e);
@@ -351,7 +534,7 @@ public class HighAvailabilityGroupIT {
/**
* Test that it can connect to a given cluster in this HA group after ZK
service restarts.
*
- * Method {@link HighAvailabilityGroup#connectToOneCluster(String,
Properties)} is used by
+ * Method {@link HighAvailabilityGroup#connectToOneCluster(String,
Properties, HAURLInfo)} is used by
* Phoenix HA framework to connect to one specific HBase cluster in this
HA group. The cluster
* may not necessarily be in ACTIVE role. For example, parallel HA
connection needs to connect
* to both clusters. This tests that it can connect to a specific ZK
cluster after ZK restarts.
@@ -376,7 +559,7 @@ public class HighAvailabilityGroupIT {
doTestBasicOperationsWithConnection(conn, tableName, null);
}
// test with HA group to get connection to one cluster
- try (Connection conn = haGroup.connectToOneCluster(jdbcUrlToCluster1,
clientProperties)) {
+ try (Connection conn = haGroup.connectToOneCluster(jdbcUrlToCluster1,
clientProperties, haURLInfo)) {
doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
}
}
@@ -386,9 +569,9 @@ public class HighAvailabilityGroupIT {
*/
@Test
public void testIsConnectionActive() throws Exception {
- PhoenixConnection conn1 =
haGroup.connectToOneCluster(CLUSTERS.getUrl1(), clientProperties);
+ PhoenixConnection conn1 =
haGroup.connectToOneCluster(CLUSTERS.getUrl1(), clientProperties, haURLInfo);
assertTrue(haGroup.isActive(conn1));
- PhoenixConnection conn2 =
haGroup.connectToOneCluster(CLUSTERS.getUrl2(), clientProperties);
+ PhoenixConnection conn2 =
haGroup.connectToOneCluster(CLUSTERS.getUrl2(), clientProperties, haURLInfo);
assertFalse(haGroup.isActive(conn2));
CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY,
ClusterRole.ACTIVE);
@@ -418,7 +601,7 @@ public class HighAvailabilityGroupIT {
public void testCanConnectWhenStandbyHBaseClusterDown() throws Exception {
doTestWhenOneHBaseDown(CLUSTERS.getHBaseCluster2(), () -> {
// HA group is already initialized
- Connection connection = haGroup.connect(clientProperties);
+ Connection connection = haGroup.connect(clientProperties,
haURLInfo);
assertNotNull(connection);
assertNotNull(connection.unwrap(FailoverPhoenixConnection.class));
});
@@ -437,7 +620,7 @@ public class HighAvailabilityGroupIT {
HighAvailabilityGroup.CURATOR_CACHE.invalidateAll();
// HA group is already initialized
- Connection connection = haGroup.connect(clientProperties);
+ Connection connection = haGroup.connect(clientProperties,
haURLInfo);
assertNotNull(connection);
assertNotNull(connection.unwrap(FailoverPhoenixConnection.class));
});
@@ -459,11 +642,12 @@ public class HighAvailabilityGroupIT {
CLUSTERS.initClusterRole(haGroupName2,
HighAvailabilityPolicy.FAILOVER);
Optional<HighAvailabilityGroup> haGroup2 = Optional.empty();
try {
+ HAURLInfo haURLInfo =
HighAvailabilityGroup.getUrlInfo(jdbcUrl, clientProperties);
haGroup2 = HighAvailabilityGroup.get(jdbcUrl,
clientProperties);
assertTrue(haGroup2.isPresent());
assertNotSame(haGroup2.get(), haGroup);
// get a new connection in this new HA group; should be
pointing to ACTIVE cluster1
- try (Connection connection =
haGroup2.get().connect(clientProperties)) {
+ try (Connection connection =
haGroup2.get().connect(clientProperties, haURLInfo)) {
assertNotNull(connection);
assertNotNull(connection.unwrap(FailoverPhoenixConnection.class));
}
@@ -489,11 +673,12 @@ public class HighAvailabilityGroupIT {
doTestWhenOneZKDown(CLUSTERS.getHBaseCluster2(), () -> {
Optional<HighAvailabilityGroup> haGroup2 = Optional.empty();
try {
+ HAURLInfo haURLInfo = HighAvailabilityGroup.getUrlInfo(jdbcUrl,
clientProperties);
haGroup2 = HighAvailabilityGroup.get(jdbcUrl, clientProperties);
assertTrue(haGroup2.isPresent());
assertNotSame(haGroup2.get(), haGroup);
// get a new connection in this new HA group; should be
pointing to ACTIVE cluster1
- Connection connection =
haGroup2.get().connect(clientProperties);
+ Connection connection =
haGroup2.get().connect(clientProperties, haURLInfo);
assertNotNull(connection);
assertNotNull(connection.unwrap(FailoverPhoenixConnection.class));
} finally {
@@ -509,7 +694,7 @@ public class HighAvailabilityGroupIT {
public void testCanNotEstablishConnectionWhenActiveHBaseClusterDown()
throws Exception {
doTestWhenOneHBaseDown(CLUSTERS.getHBaseCluster1(), () -> {
try {
- haGroup.connectActive(clientProperties);
+ haGroup.connectActive(clientProperties, haURLInfo);
fail("Should have failed because ACTIVE HBase cluster is
down.");
} catch (SQLException e) {
LOG.info("Got expected exception when ACTIVE HBase cluster is
down", e);
@@ -529,7 +714,7 @@ public class HighAvailabilityGroupIT {
public void testConnectActiveWhenActiveZKClusterRestarts() throws
Exception {
doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> {
try {
- haGroup.connectActive(clientProperties);
+ haGroup.connectActive(clientProperties, haURLInfo);
fail("Should have failed because of ACTIVE ZK cluster is
down.");
} catch (SQLException e) {
LOG.info("Got expected exception when ACTIVE ZK cluster is
down", e);
@@ -537,7 +722,7 @@ public class HighAvailabilityGroupIT {
}
});
- try (Connection conn = haGroup.connectActive(clientProperties)) {
+ try (Connection conn = haGroup.connectActive(clientProperties,
haURLInfo)) {
assertNotNull(conn);
LOG.info("Successfully connect to HA group {} after restarting
ACTIVE ZK", haGroup);
} // all other exceptions will fail the test
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTestIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTestIT.java
index 304b048e49..b8bd5c61fa 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTestIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTestIT.java
@@ -90,6 +90,7 @@ public class HighAvailabilityGroupTestIT {
private final ClusterRoleRecord record = mock(ClusterRoleRecord.class);
/** The HA group to test. This is spied but not mocked. */
private HighAvailabilityGroup haGroup;
+ private HAURLInfo haURLInfo;
@Rule
public final TestName testName = new TestName();
@@ -129,6 +130,7 @@ public class HighAvailabilityGroupTestIT {
HAGroupInfo haGroupInfo = new HAGroupInfo(haGroupName, ZK1, ZK2);
haGroup = spy(new HighAvailabilityGroup(haGroupInfo, clientProperties,
record, READY));
+ haURLInfo = spy(new HAURLInfo(haGroupName));
}
/**
@@ -138,18 +140,19 @@ public class HighAvailabilityGroupTestIT {
*/
@Test
public void testConnect() throws SQLException {
- final Connection conn = haGroup.connect(clientProperties);
+ final Connection conn = haGroup.connect(clientProperties, haURLInfo);
assertTrue(conn instanceof FailoverPhoenixConnection);
FailoverPhoenixConnection failoverConnection =
conn.unwrap(FailoverPhoenixConnection.class);
assertNotNull(failoverConnection);
// Verify that the failover should have connected to ACTIVE cluster
once
- verify(haGroup, times(1)).connectActive(any(Properties.class));
- verify(haGroup, times(1)).connectToOneCluster(anyString(),
eq(clientProperties));
+ verify(haGroup, times(1)).connectActive(any(Properties.class),
any(HAURLInfo.class));
+ verify(haGroup, times(1)).connectToOneCluster(anyString(),
+ eq(clientProperties), any(HAURLInfo.class));
verify(DRIVER, atLeastOnce()).getConnectionQueryServices(anyString(),
eq(clientProperties));
when(record.getPolicy()).thenReturn(HighAvailabilityPolicy.PARALLEL);
// get a new connection from this HA group
- final Connection conn2 = haGroup.connect(clientProperties);
+ final Connection conn2 = haGroup.connect(clientProperties, haURLInfo);
assertTrue(conn2 instanceof ParallelPhoenixConnection);
}
@@ -161,7 +164,7 @@ public class HighAvailabilityGroupTestIT {
final HAGroupInfo info = haGroup.getGroupInfo();
haGroup = spy(new HighAvailabilityGroup(info, clientProperties,
record, UNINITIALIZED));
try {
- haGroup.connect(clientProperties);
+ haGroup.connect(clientProperties, haURLInfo);
fail("Should have failed since HA group is not READY!");
} catch (SQLException e) {
LOG.info("Got expected exception", e);
@@ -178,11 +181,11 @@ public class HighAvailabilityGroupTestIT {
public void testConnectToOneCluster() throws SQLException {
// test with JDBC string
final String jdbcString = String.format("jdbc:phoenix:%s", ZK1);
- haGroup.connectToOneCluster(jdbcString, clientProperties);
+ haGroup.connectToOneCluster(jdbcString, clientProperties, haURLInfo);
verify(DRIVER, times(1)).getConnectionQueryServices(anyString(),
eq(clientProperties));
// test with only ZK string
- haGroup.connectToOneCluster(ZK1, clientProperties);
+ haGroup.connectToOneCluster(ZK1, clientProperties, haURLInfo);
verify(DRIVER, times(2)).getConnectionQueryServices(anyString(),
eq(clientProperties));
}
@@ -195,7 +198,7 @@ public class HighAvailabilityGroupTestIT {
// test with JDBC string and UNKNOWN cluster role
final String jdbcString = String.format("jdbc:phoenix:%s", ZK1);
try {
- haGroup.connectToOneCluster(jdbcString, clientProperties);
+ haGroup.connectToOneCluster(jdbcString, clientProperties,
haURLInfo);
fail("Should have failed because cluster is in UNKNOWN role");
} catch (SQLException e) { // expected exception
assertEquals(SQLExceptionCode.HA_CLUSTER_CAN_NOT_CONNECT.getErrorCode(),
@@ -206,7 +209,7 @@ public class HighAvailabilityGroupTestIT {
// test with only ZK string and OFFLINE cluster role
when(record.getRole(eq(ZK1))).thenReturn(ClusterRole.OFFLINE);
try {
- haGroup.connectToOneCluster(jdbcString, clientProperties);
+ haGroup.connectToOneCluster(jdbcString, clientProperties,
haURLInfo);
fail("Should have failed because cluster is in OFFLINE role");
} catch (SQLException e) { // expected exception
assertEquals(SQLExceptionCode.HA_CLUSTER_CAN_NOT_CONNECT.getErrorCode(),
@@ -221,7 +224,7 @@ public class HighAvailabilityGroupTestIT {
@Test (expected = IllegalArgumentException.class)
public void testConnectToOneClusterShouldFailWithNonHAJdbcString() throws
SQLException {
final String jdbcString = "jdbc:phoenix:dummyhost";
- haGroup.connectToOneCluster(jdbcString, clientProperties);
+ haGroup.connectToOneCluster(jdbcString, clientProperties, haURLInfo);
verify(DRIVER, never()).getConnectionQueryServices(anyString(),
eq(clientProperties));
}
@@ -233,7 +236,7 @@ public class HighAvailabilityGroupTestIT {
// test with JDBC string
final String hosts = "zk1-2,zk1-1:2181:/hbase";
final String jdbcString = String.format("jdbc:phoenix+zk:%s", hosts);
- haGroup.connectToOneCluster(jdbcString, clientProperties);
+ haGroup.connectToOneCluster(jdbcString, clientProperties, haURLInfo);
verify(DRIVER,
times(1)).getConnectionQueryServices(eq(String.format("jdbc:phoenix+zk:%s",ZK1)),
eq(clientProperties));
}
@@ -260,7 +263,7 @@ public class HighAvailabilityGroupTestIT {
@Test
public void testIsConnectionActive() throws SQLException {
assertFalse(haGroup.isActive(null));
- PhoenixConnection connection = haGroup.connectToOneCluster(ZK1,
clientProperties);
+ PhoenixConnection connection = haGroup.connectToOneCluster(ZK1,
clientProperties, haURLInfo);
assertTrue(haGroup.isActive(connection));
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
index 526b83e292..27b627753f 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
@@ -350,9 +350,23 @@ public class HighAvailabilityTestingUtility {
return getJdbcUrl(String.format("[%s|%s]", url1, url2));
}
+ public String getJdbcHAUrl(String principal) {
+ return getJdbcUrl(String.format("[%s|%s]", url1, url2), principal);
+ }
+
+ public String getJdbcHAUrlWithoutPrincipal() {
+ return getJdbcUrlWithoutPrincipal(String.format("[%s|%s]", url1,
url2));
+ }
+
+
public String getJdbcUrl1() {
return getJdbcUrl(url1);
}
+
+ public String getJdbcUrl1(String principal) {
+ return getJdbcUrl(url1, principal);
+ }
+
public String getJdbcUrl2() {
return getJdbcUrl(url2);
}
@@ -361,6 +375,14 @@ public class HighAvailabilityTestingUtility {
return String.format("jdbc:phoenix+zk:%s:%s", zkUrl, PRINCIPAL);
}
+ public String getJdbcUrl(String zkUrl, String principal) {
+ return String.format("jdbc:phoenix+zk:%s:%s", zkUrl, principal);
+ }
+
+ public String getJdbcUrlWithoutPrincipal(String zkUrl) {
+ return String.format("jdbc:phoenix+zk:%s", zkUrl);
+ }
+
public String getUrl1() {
return url1;
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
index 5772da8f9e..106ccfd9c0 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
@@ -138,24 +138,25 @@ public class ParallelPhoenixConnectionIT {
try (Connection conn = getParallelConnection()) {
ParallelPhoenixConnection pr =
conn.unwrap(ParallelPhoenixConnection.class);
ParallelPhoenixContext context = pr.getContext();
+ HAURLInfo haurlInfo = context.getHaurlInfo();
HighAvailabilityGroup.HAGroupInfo group =
context.getHaGroup().getGroupInfo();
if (CLUSTERS.getUrl1().compareTo(CLUSTERS.getUrl2()) <= 0) {
- Assert.assertEquals(CLUSTERS.getJdbcUrl1(),
group.getJDBCUrl1());
- Assert.assertEquals(CLUSTERS.getJdbcUrl2(),
group.getJDBCUrl2());
+ Assert.assertEquals(CLUSTERS.getJdbcUrl1(),
group.getJDBCUrl1(haurlInfo));
+ Assert.assertEquals(CLUSTERS.getJdbcUrl2(),
group.getJDBCUrl2(haurlInfo));
} else {
- Assert.assertEquals(CLUSTERS.getJdbcUrl2(),
group.getJDBCUrl1());
- Assert.assertEquals(CLUSTERS.getJdbcUrl1(),
group.getJDBCUrl2());
+ Assert.assertEquals(CLUSTERS.getJdbcUrl2(),
group.getJDBCUrl1(haurlInfo));
+ Assert.assertEquals(CLUSTERS.getJdbcUrl1(),
group.getJDBCUrl2(haurlInfo));
}
ConnectionQueryServices cqsi;
// verify connection#1
- cqsi =
PhoenixDriver.INSTANCE.getConnectionQueryServices(group.getJDBCUrl1(),
clientProperties);
+ cqsi =
PhoenixDriver.INSTANCE.getConnectionQueryServices(group.getJDBCUrl1(haurlInfo),
clientProperties);
Assert.assertEquals(HBaseTestingUtilityPair.PRINCIPAL,
cqsi.getUserName());
PhoenixConnection pConn = pr.getFutureConnection1().get();
ConnectionQueryServices cqsiFromConn = pConn.getQueryServices();
Assert.assertEquals(HBaseTestingUtilityPair.PRINCIPAL,
cqsiFromConn.getUserName());
Assert.assertTrue(cqsi == cqsiFromConn);
// verify connection#2
- cqsi =
PhoenixDriver.INSTANCE.getConnectionQueryServices(group.getJDBCUrl2(),
clientProperties);
+ cqsi =
PhoenixDriver.INSTANCE.getConnectionQueryServices(group.getJDBCUrl2(haurlInfo),
clientProperties);
Assert.assertEquals(HBaseTestingUtilityPair.PRINCIPAL,
cqsi.getUserName());
pConn = pr.getFutureConnection2().get();
cqsiFromConn = pConn.getQueryServices();
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionTest.java
index 8ee3d028bd..dcf0000532 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionTest.java
@@ -59,15 +59,18 @@ public class FailoverPhoenixConnectionTest {
@Mock HighAvailabilityGroup haGroup;
final HAGroupInfo haGroupInfo = new HAGroupInfo("fake", "zk1", "zk2");
+ final HAURLInfo haURLInfo = new HAURLInfo("fake");
+ FailoverPhoenixContext context;
FailoverPhoenixConnection failoverConnection; // this connection itself is
not mocked or spied.
+
@Before
public void init() throws SQLException {
MockitoAnnotations.initMocks(this);
when(haGroup.getGroupInfo()).thenReturn(haGroupInfo);
-
when(haGroup.connectActive(any(Properties.class))).thenReturn(connection1);
-
- failoverConnection = new FailoverPhoenixConnection(haGroup, new
Properties());
+ when(haGroup.connectActive(any(Properties.class),
any(HAURLInfo.class))).thenReturn(connection1);
+ context = new FailoverPhoenixContext(new Properties(), haGroup,
haURLInfo);
+ failoverConnection = new FailoverPhoenixConnection(context);
}
/**
@@ -92,7 +95,7 @@ public class FailoverPhoenixConnectionTest {
@Test
public void testFailover() throws SQLException {
// Make HAGroup return a different phoenix connection when it gets
called next time
-
when(haGroup.connectActive(any(Properties.class))).thenReturn(connection2);
+ when(haGroup.connectActive(any(Properties.class),
any(HAURLInfo.class))).thenReturn(connection2);
// explicit call failover
failoverConnection.failover(1000L);
@@ -128,7 +131,7 @@ public class FailoverPhoenixConnectionTest {
public void testActiveFailoverIsNoOp() throws SQLException {
when(haGroup.isActive(connection1)).thenReturn(true);
// Make HAGroup return a different phoenix connection when it gets
called next time
-
when(haGroup.connectActive(any(Properties.class))).thenReturn(connection2);
+ when(haGroup.connectActive(any(Properties.class),
any(HAURLInfo.class))).thenReturn(connection2);
failoverConnection.failover(1000L);
@@ -145,11 +148,12 @@ public class FailoverPhoenixConnectionTest {
Properties properties = new Properties();
properties.setProperty(FailoverPolicy.PHOENIX_HA_FAILOVER_POLICY_ATTR,
FailoverPolicy.FailoverToActivePolicy.NAME);
- failoverConnection = new FailoverPhoenixConnection(haGroup,
properties);
+ FailoverPhoenixContext context = new
FailoverPhoenixContext(properties, haGroup, haURLInfo);
+ failoverConnection = new FailoverPhoenixConnection(context);
LOG.info("Close the wrapped phoenix connection due to failover...");
// Make HAGroup return a different phoenix connection when it gets
called next time
-
when(haGroup.connectActive(any(Properties.class))).thenReturn(connection2);
+ when(haGroup.connectActive(any(Properties.class),
any(HAURLInfo.class))).thenReturn(connection2);
// Mimic wrapped phoenix connection gets closed by HA group
doThrow(new FailoverSQLException("", "", new
Exception())).when(connection1).commit();
@@ -204,8 +208,8 @@ public class FailoverPhoenixConnectionTest {
@Test
public void testCheckConnection() throws SQLException {
// Make the wrapped phoenix connection null. This could happen if
HAGroup is failing.
- when(haGroup.connectActive(any(Properties.class))).thenReturn(null);
- failoverConnection = new FailoverPhoenixConnection(haGroup, new
Properties());
+ when(haGroup.connectActive(any(Properties.class),
any(HAURLInfo.class))).thenReturn(null);
+ failoverConnection = new FailoverPhoenixConnection(context);
assertNull(failoverConnection.getWrappedConnection());
try {
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFailureTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFailureTest.java
index 51d89728bf..e6371308b6 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFailureTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFailureTest.java
@@ -73,7 +73,7 @@ public class ParallelPhoenixConnectionFailureTest extends
BaseTest {
new ParallelPhoenixContext(new Properties(),
Mockito.mock(HighAvailabilityGroup.class),
HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(),
- null);
+ null, Mockito.mock(HAURLInfo.class));
ParallelPhoenixConnection parallelConn =
new ParallelPhoenixConnection(context,
CompletableFuture.completedFuture(connSpy1),
CompletableFuture.completedFuture(connSpy2));
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionTest.java
index cdc8c2ddb5..4c1c4e8553 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionTest.java
@@ -53,7 +53,7 @@ public class ParallelPhoenixConnectionTest {
@Before
public void init() throws SQLException {
context = new ParallelPhoenixContext(new Properties(),
Mockito.mock(HighAvailabilityGroup.class),
-
HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null);
+
HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null,
Mockito.mock(HAURLInfo.class));
parallelPhoenixConnection = new
ParallelPhoenixConnection(context,CompletableFuture.completedFuture(connection1),CompletableFuture.completedFuture(connection2));
}
@@ -185,7 +185,8 @@ public class ParallelPhoenixConnectionTest {
"1000");
ParallelPhoenixContext context =
new ParallelPhoenixContext(properties,
Mockito.mock(HighAvailabilityGroup.class),
-
HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null);
+
HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null,
+ Mockito.mock(HAURLInfo.class));
CountDownLatch cdl = new CountDownLatch(1);
CompletableFuture<PhoenixConnection> futureConnection1 =
CompletableFuture.supplyAsync(getDelayConnectionSupplier(cdl, connection1));
@@ -212,7 +213,8 @@ public class ParallelPhoenixConnectionTest {
"1000");
ParallelPhoenixContext context =
new ParallelPhoenixContext(properties,
Mockito.mock(HighAvailabilityGroup.class),
-
HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null);
+
HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null,
+ Mockito.mock(HAURLInfo.class));
CountDownLatch cdl = new CountDownLatch(1);
CompletableFuture<PhoenixConnection> futureConnection1 =
CompletableFuture.completedFuture(connection1);
@@ -239,7 +241,8 @@ public class ParallelPhoenixConnectionTest {
"1000");
ParallelPhoenixContext context =
new ParallelPhoenixContext(properties,
Mockito.mock(HighAvailabilityGroup.class),
-
HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null);
+
HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null,
+ Mockito.mock(HAURLInfo.class));
CountDownLatch cdl1 = new CountDownLatch(1);
CompletableFuture<PhoenixConnection> futureConnection1 =
CompletableFuture.supplyAsync(getDelayConnectionSupplier(cdl1, connection1));
@@ -346,7 +349,8 @@ public class ParallelPhoenixConnectionTest {
"1000");
ParallelPhoenixContext context =
new ParallelPhoenixContext(properties,
Mockito.mock(HighAvailabilityGroup.class),
-
HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null);
+
HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null,
+ Mockito.mock(HAURLInfo.class));
parallelPhoenixConnection =
new ParallelPhoenixConnection(context,
CompletableFuture.completedFuture(connection1),
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixContextTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixContextTest.java
index 1d0bc2c898..cd45b619bf 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixContextTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixContextTest.java
@@ -70,7 +70,9 @@ public class ParallelPhoenixContextTest {
ParallelPhoenixContext context =
new ParallelPhoenixContext(new Properties(),
Mockito.mock(HighAvailabilityGroup.class),
-
Lists.newArrayList(Mockito.mock(PhoenixHAExecutorServiceProvider.PhoenixHAClusterExecutorServices.class)),
null);
+
Lists.newArrayList(Mockito.mock(PhoenixHAExecutorServiceProvider.
+ PhoenixHAClusterExecutorServices.class)),
null,
+ Mockito.mock(HAURLInfo.class));
fail("Should not construct with less than 2 ThreadPools");
} catch (IllegalArgumentException e) {
}
@@ -85,7 +87,8 @@ public class ParallelPhoenixContextTest {
Mockito.mock(Properties.class),
Mockito.mock(ClusterRoleRecord.class),
HighAvailabilityGroup.State.READY),
- executorList, Lists.newArrayList(Boolean.FALSE,
Boolean.TRUE));
+ executorList, Lists.newArrayList(Boolean.FALSE,
Boolean.TRUE),
+ Mockito.mock(HAURLInfo.class));
CompletableFuture<Boolean> future1 = context.chainOnConn1(() -> true);
assertTrue(future1.isCompletedExceptionally());
assertEquals(0, ((TrackingThreadPoolExecutor)
executorList.get(0).getExecutorService()).tasksExecuted.get());
@@ -105,7 +108,8 @@ public class ParallelPhoenixContextTest {
Mockito.mock(Properties.class),
Mockito.mock(ClusterRoleRecord.class),
HighAvailabilityGroup.State.READY),
- executorList, Lists.newArrayList(Boolean.TRUE,
Boolean.FALSE));
+ executorList, Lists.newArrayList(Boolean.TRUE,
Boolean.FALSE),
+ Mockito.mock(HAURLInfo.class));
CompletableFuture<Boolean> future1 = context.chainOnConn1(() -> true);
assertTrue(future1.get());
assertEquals(1, ((TrackingThreadPoolExecutor)
executorList.get(0).getExecutorService()).tasksExecuted.get());
@@ -121,7 +125,8 @@ public class ParallelPhoenixContextTest {
ParallelPhoenixContext context =
new ParallelPhoenixContext(new Properties(),
Mockito.mock(HighAvailabilityGroup.class),
executorList,
- Lists.newArrayList(Boolean.TRUE, Boolean.TRUE));
+ Lists.newArrayList(Boolean.TRUE, Boolean.TRUE),
+ Mockito.mock(HAURLInfo.class));
CompletableFuture<Boolean> future1 = context.chainOnConn1(() -> true);
assertTrue(future1.get());
assertEquals(1, ((TrackingThreadPoolExecutor)
executorList.get(0).getExecutorService()).tasksExecuted.get());
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixNullComparingResultSetTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixNullComparingResultSetTest.java
index dd6ccb7a1d..dcea2ad8e6 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixNullComparingResultSetTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixNullComparingResultSetTest.java
@@ -55,7 +55,8 @@ public class ParallelPhoenixNullComparingResultSetTest {
Mockito.mock(Properties.class),
Mockito.mock(ClusterRoleRecord.class),
HighAvailabilityGroup.State.READY),
-
HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null);
+
HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null,
+ Mockito.mock(HAURLInfo.class));
rs1 = Mockito.mock(ResultSet.class);
rs2 = Mockito.mock(ResultSet.class);
completableRs1 = CompletableFuture.completedFuture(rs1);
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixPreparedStatementTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixPreparedStatementTest.java
index 1c157d025b..a5e1a4333f 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixPreparedStatementTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixPreparedStatementTest.java
@@ -47,7 +47,8 @@ public class ParallelPhoenixPreparedStatementTest {
@Before
public void init() throws Exception {
context = new ParallelPhoenixContext(new Properties(),
Mockito.mock(HighAvailabilityGroup.class),
-
HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null);
+
HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null,
+ Mockito.mock(HAURLInfo.class));
statement1 = Mockito.mock(PhoenixMonitoredPreparedStatement.class);
statement2 = Mockito.mock(PhoenixMonitoredPreparedStatement.class);
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixResultSetTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixResultSetTest.java
index 3e9a7e0f25..9e967e1ab6 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixResultSetTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixResultSetTest.java
@@ -53,7 +53,8 @@ public class ParallelPhoenixResultSetTest {
new ParallelPhoenixContext(new Properties(), null,
HighAvailabilityTestingUtility
.getListOfSingleThreadExecutorServices(),
- null),
+ null,
+ Mockito.mock(HAURLInfo.class)),
completableRs1, completableRs2);
}
@@ -109,7 +110,8 @@ public class ParallelPhoenixResultSetTest {
new ParallelPhoenixContext(new Properties(), null,
HighAvailabilityTestingUtility
.getListOfSingleThreadExecutorServices(),
- null),
+ null,
+ Mockito.mock(HAURLInfo.class)),
completableRs1, completableRs2);
resultSet.next();
@@ -149,7 +151,8 @@ public class ParallelPhoenixResultSetTest {
new ParallelPhoenixContext(new Properties(), null,
HighAvailabilityTestingUtility
.getListOfSingleThreadExecutorServices(),
- null),
+ null,
+ Mockito.mock(HAURLInfo.class)),
completableRs1, completableRs2);
resultSet.next();
@@ -189,7 +192,8 @@ public class ParallelPhoenixResultSetTest {
new ParallelPhoenixContext(new Properties(), null,
HighAvailabilityTestingUtility
.getListOfSingleThreadExecutorServices(),
- null),
+ null,
+ Mockito.mock(HAURLInfo.class)),
completableRs1, completableRs2);
resultSet.next();
@@ -250,7 +254,8 @@ public class ParallelPhoenixResultSetTest {
new ParallelPhoenixContext(new Properties(), null,
HighAvailabilityTestingUtility
.getListOfSingleThreadExecutorServices(),
- null),
+ null,
+ Mockito.mock(HAURLInfo.class)),
completableRs1, completableRs2);
//run next in the background
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixUtilTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixUtilTest.java
index 29d672a4e2..726fe04995 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixUtilTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixUtilTest.java
@@ -46,7 +46,8 @@ public class ParallelPhoenixUtilTest {
private static final ParallelPhoenixContext context =
new ParallelPhoenixContext(new Properties(), null,
-
HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null);
+
HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null,
+ Mockito.mock(HAURLInfo.class));
@Test
public void getAnyOfNonExceptionallySingleFutureTest() throws Exception {
@@ -114,7 +115,7 @@ public class ParallelPhoenixUtilTest {
ParallelPhoenixContext ctx =
new ParallelPhoenixContext(props, null,
HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(),
- null);
+ null, Mockito.mock(HAURLInfo.class));
long startTime = EnvironmentEdgeManager.currentTime();
try {
util.getAnyOfNonExceptionally(futures, ctx);