This is an automated email from the ASF dual-hosted git repository.
stoty pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push:
new 21f4532ba3 PHOENIX-7193 Fix cluster override for mapreduce jobs for
non-ZK registries
21f4532ba3 is described below
commit 21f4532ba3781d653ac66f1ba03b579dc41f8641
Author: Istvan Toth <[email protected]>
AuthorDate: Wed Jan 31 15:56:04 2024 +0100
PHOENIX-7193 Fix cluster override for mapreduce jobs for non-ZK registries
---
.../phoenix/mapreduce/PhoenixOutputFormat.java | 5 +-
.../phoenix/mapreduce/PhoenixRecordWriter.java | 2 +-
.../phoenix/mapreduce/util/ConnectionUtil.java | 89 ++++++----------------
.../mapreduce/util/PhoenixConfigurationUtil.java | 79 ++++++++++++++++++-
.../apache/phoenix/index/IndexUpgradeToolTest.java | 15 +++-
.../mapreduce/PhoenixMultiViewInputFormatTest.java | 29 ++++---
.../util/PhoenixConfigurationUtilTest.java | 24 +++---
.../apache/phoenix/query/ConnectionlessTest.java | 6 +-
.../org/apache/phoenix/util/QueryUtilTest.java | 5 ++
9 files changed, 153 insertions(+), 101 deletions(-)
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
index 055ce1f93c..23847cb397 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
@@ -38,14 +38,13 @@ import org.slf4j.LoggerFactory;
*/
public class PhoenixOutputFormat <T extends DBWritable> extends
OutputFormat<NullWritable,T> {
private static final Logger LOGGER =
LoggerFactory.getLogger(PhoenixOutputFormat.class);
- private final Set<String> propsToIgnore;
public PhoenixOutputFormat() {
this(Collections.<String>emptySet());
}
+ // FIXME Never used, and the ignore feature didn't work anyway
public PhoenixOutputFormat(Set<String> propsToIgnore) {
- this.propsToIgnore = propsToIgnore;
}
@Override
@@ -63,7 +62,7 @@ public class PhoenixOutputFormat <T extends DBWritable>
extends OutputFormat<Nul
@Override
public RecordWriter<NullWritable, T> getRecordWriter(TaskAttemptContext
context) throws IOException, InterruptedException {
try {
- return new PhoenixRecordWriter<T>(context.getConfiguration(),
propsToIgnore);
+ return new PhoenixRecordWriter<T>(context.getConfiguration());
} catch (SQLException e) {
LOGGER.error("Error calling PhoenixRecordWriter " +
e.getMessage());
throw new RuntimeException(e);
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
index 6f5b84e366..14e986c159 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
@@ -54,7 +54,7 @@ public class PhoenixRecordWriter<T extends DBWritable>
extends RecordWriter<Nul
public PhoenixRecordWriter(final Configuration configuration, Set<String>
propsToIgnore) throws SQLException {
Connection connection = null;
try {
- connection =
ConnectionUtil.getOutputConnectionWithoutTheseProps(configuration,
propsToIgnore);
+ connection = ConnectionUtil.getOutputConnection(configuration);
this.batchSize =
PhoenixConfigurationUtil.getBatchSize(configuration);
final String upsertQuery =
PhoenixConfigurationUtil.getUpsertStatement(configuration);
this.statement = connection.prepareStatement(upsertQuery);
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
index ccd55fd059..7b7d8431be 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
@@ -20,30 +20,25 @@ package org.apache.phoenix.mapreduce.util;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
-import java.util.Collections;
import java.util.Properties;
-import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
-import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.QueryUtil;
/**
* Utility class to return a {@link Connection} .
*/
public class ConnectionUtil {
- private static String TEST_PARAM =
- PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR +
PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
-
/**
* Retrieve the configured input Connection.
* @param conf configuration containing connection information
* @return the configured input connection
*/
public static Connection getInputConnection(final Configuration conf)
throws SQLException {
+ Preconditions.checkNotNull(conf);
return getInputConnection(conf, new Properties());
}
@@ -55,22 +50,17 @@ public class ConnectionUtil {
*/
public static Connection getInputConnection(final Configuration conf,
final Properties props)
throws SQLException {
- Preconditions.checkNotNull(conf);
- String zkQuorumOverride =
PhoenixConfigurationUtil.getInputClusterZkQuorum(conf);
- if (zkQuorumOverride != null) {
- return DriverManager.getConnection("jdbc:phoenix+zk:" +
zkQuorumOverride,
- PropertiesUtil.combineProperties(props, conf));
- } else {
- // FIXME find some better way to get tests working
- String zkQuorumForTest =
PhoenixConfigurationUtil.getZKQuorum(conf);
- if (zkQuorumForTest != null &&
(zkQuorumForTest.contains(TEST_PARAM)
- || zkQuorumForTest.equals(PhoenixRuntime.CONNECTIONLESS)))
{
- return DriverManager.getConnection("jdbc:phoenix+zk:" +
zkQuorumForTest,
- PropertiesUtil.combineProperties(props, conf));
- }
- return DriverManager.getConnection("jdbc:phoenix",
- PropertiesUtil.combineProperties(props, conf));
+ String inputQuorum = PhoenixConfigurationUtil.getInputCluster(conf);
+ if (inputQuorum != null) {
+ // This will not override the quorum set with setInputClusterUrl
+ Properties copyProps = PropertiesUtil.deepCopy(props);
+ copyProps.setProperty(HConstants.CLIENT_ZOOKEEPER_QUORUM,
inputQuorum);
+ return DriverManager.getConnection(
+ PhoenixConfigurationUtil.getInputClusterUrl(conf),
+ PropertiesUtil.combineProperties(copyProps, conf));
}
+ return
DriverManager.getConnection(PhoenixConfigurationUtil.getInputClusterUrl(conf),
+ PropertiesUtil.combineProperties(props, conf));
}
/**
@@ -82,16 +72,6 @@ public class ConnectionUtil {
return getOutputConnection(conf, new Properties());
}
- /**
- * Create the configured output Connection.
- * @param conf configuration containing the connection information
- * @return the configured output connection
- */
- public static Connection getOutputConnectionWithoutTheseProps(final
Configuration conf,
- Set<String> ignoreTheseProps) throws SQLException {
- return getOutputConnection(conf, new Properties(), ignoreTheseProps);
- }
-
/**
* Create the configured output Connection.
* @param conf configuration containing the connection information
@@ -100,42 +80,17 @@ public class ConnectionUtil {
*/
public static Connection getOutputConnection(final Configuration conf,
Properties props)
throws SQLException {
- return getOutputConnection(conf, props,
Collections.<String>emptySet());
- }
-
- public static Connection getOutputConnection(final Configuration conf,
Properties props,
- Set<String> withoutTheseProps) throws SQLException {
Preconditions.checkNotNull(conf);
- String zkQuorumOverride =
PhoenixConfigurationUtil.getOutputClusterZkQuorum(conf);
- if (zkQuorumOverride != null) {
- return DriverManager.getConnection("jdbc:phoenix+zk:" +
zkQuorumOverride,
- PropertiesUtil.combineProperties(props, conf));
- } else {
- // FIXME find some better way to get tests working
- String zkQuorumForTest =
PhoenixConfigurationUtil.getZKQuorum(conf);
- if (zkQuorumForTest != null &&
(zkQuorumForTest.contains(TEST_PARAM)
- || zkQuorumForTest.equals(PhoenixRuntime.CONNECTIONLESS)))
{
- return DriverManager.getConnection("jdbc:phoenix:" +
zkQuorumForTest,
- PropertiesUtil.combineProperties(props, conf));
- }
- return DriverManager.getConnection("jdbc:phoenix",
- PropertiesUtil.combineProperties(props, conf));
+ String outputQuorum = PhoenixConfigurationUtil.getOutputCluster(conf);
+ if (outputQuorum != null) {
+ // This will not override the quorum set with setInputClusterUrl
+ Properties copyProps = PropertiesUtil.deepCopy(props);
+ copyProps.setProperty(HConstants.CLIENT_ZOOKEEPER_QUORUM,
outputQuorum);
+ return DriverManager.getConnection(
+ PhoenixConfigurationUtil.getInputClusterUrl(conf),
+ PropertiesUtil.combineProperties(copyProps, conf));
}
+ return
DriverManager.getConnection(PhoenixConfigurationUtil.getOutputClusterUrl(conf),
+ PropertiesUtil.combineProperties(props, conf));
}
-
- /**
- * Returns the {@link Connection} from a ZooKeeper cluster string.
- * @param quorum a ZooKeeper quorum connection string
- * @param clientPort a ZooKeeper client port
- * @param znodeParent a zookeeper znode parent
- * @return a Phoenix connection to the given connection string
- */
- @Deprecated
- private static Connection getConnection(final String quorum, final Integer
clientPort,
- final String znodeParent, Properties props) throws SQLException {
- Preconditions.checkNotNull(quorum);
- return DriverManager.getConnection(QueryUtil.getUrl(quorum,
clientPort, znodeParent),
- props);
- }
-
}
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 6a368110c1..da5869175d 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -111,10 +111,16 @@ public final class PhoenixConfigurationUtil {
/** Configuration key for the class name of an
ImportPreUpsertKeyValueProcessor */
public static final String UPSERT_HOOK_CLASS_CONFKEY =
"phoenix.mapreduce.import.kvprocessor";
+ @Deprecated
public static final String MAPREDUCE_INPUT_CLUSTER_QUORUM =
"phoenix.mapreduce.input.cluster.quorum";
-
+
+ @Deprecated
public static final String MAPREDUCE_OUTPUT_CLUSTER_QUORUM =
"phoneix.mapreduce.output.cluster.quorum";
+ public static final String MAPREDUCE_INPUT_CLUSTER_URL =
"phoenix.mapreduce.input.cluster.url";
+
+ public static final String MAPREDUCE_OUTPUT_CLUSTER_URL =
"phoenix.mapreduce.output.cluster.url";
+
public static final String INDEX_DISABLED_TIMESTAMP_VALUE =
"phoenix.mr.index.disableTimestamp";
public static final String INDEX_MAINTAINERS =
"phoenix.mr.index.maintainers";
@@ -369,6 +375,7 @@ public final class PhoenixConfigurationUtil {
* @param configuration
* @param quorum ZooKeeper quorum string for HBase cluster the MapReduce
job will read from
*/
+ @Deprecated
public static void setInputCluster(final Configuration configuration,
final String quorum) {
Preconditions.checkNotNull(configuration);
@@ -380,12 +387,35 @@ public final class PhoenixConfigurationUtil {
* @param configuration
* @param quorum ZooKeeper quorum string for HBase cluster the MapReduce
job will write to
*/
+ @Deprecated
public static void setOutputCluster(final Configuration configuration,
final String quorum) {
Preconditions.checkNotNull(configuration);
configuration.set(MAPREDUCE_OUTPUT_CLUSTER_QUORUM, quorum);
}
-
+
+ /**
+ * Sets which HBase cluster a Phoenix MapReduce job should read from
+ * @param configuration
+ * @param url Phoenix JDBC URL
+ */
+ public static void setInputClusterUrl(final Configuration configuration,
+ final String url) {
+ Preconditions.checkNotNull(configuration);
+
configuration.set(PhoenixConfigurationUtil.MAPREDUCE_INPUT_CLUSTER_URL, url);
+ }
+
+ /**
+ * Sets which HBase cluster a Phoenix MapReduce job should write to
+ * @param configuration
+ * @param url Phoenix JDBC URL string for HBase cluster the MapReduce job
will write to
+ */
+ public static void setOutputClusterUrl(final Configuration configuration,
+ final String url) {
+ Preconditions.checkNotNull(configuration);
+
configuration.set(PhoenixConfigurationUtil.MAPREDUCE_OUTPUT_CLUSTER_URL, url);
+ }
+
public static Class<?> getInputClass(final Configuration configuration) {
return configuration.getClass(INPUT_CLASS, NullDBWritable.class);
}
@@ -587,32 +617,71 @@ public final class PhoenixConfigurationUtil {
public static String getInputCluster(final Configuration configuration) {
Preconditions.checkNotNull(configuration);
String quorum = configuration.get(MAPREDUCE_INPUT_CLUSTER_QUORUM);
+ if (quorum == null) {
+ quorum = configuration.get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
+ }
if (quorum == null) {
quorum = configuration.get(HConstants.ZOOKEEPER_QUORUM);
}
return quorum;
}
+ /**
+ * Returns the Phoenix JDBC URL a Phoenix MapReduce job will read
+ * from. If MAPREDUCE_INPUT_CLUSTER_URL is not set, then it returns the
value of
+ * "jdbc:phoenix"
+ * @param configuration
+ * @return URL string
+ */
+ public static String getInputClusterUrl(final Configuration configuration)
{
+ Preconditions.checkNotNull(configuration);
+ String url = configuration.get(MAPREDUCE_INPUT_CLUSTER_URL);
+ if (url == null) {
+ url = PhoenixRuntime.JDBC_PROTOCOL;
+ }
+ return url;
+ }
+
+
/**
* Returns the ZooKeeper quorum string for the HBase cluster a Phoenix
MapReduce job will
* read from
* @param configuration
* @return ZooKeeper quorum string if defined, null otherwise
*/
+ @Deprecated
public static String getInputClusterZkQuorum(final Configuration
configuration) {
Preconditions.checkNotNull(configuration);
return configuration.get(MAPREDUCE_INPUT_CLUSTER_QUORUM);
}
+ /**
+ * Returns the Phoenix JDBC URL a Phoenix MapReduce job will write to.
+ * If MAPREDUCE_OUTPUT_CLUSTER_URL is not set, then it returns the value of
+ * "jdbc:phoenix"
+ * @param configuration
+ * @return URL string
+ */
+ public static String getOutputClusterUrl(final Configuration
configuration) {
+ Preconditions.checkNotNull(configuration);
+ String quorum = configuration.get(MAPREDUCE_OUTPUT_CLUSTER_URL);
+ if (quorum == null) {
+ quorum = PhoenixRuntime.JDBC_PROTOCOL;
+ }
+ return quorum;
+ }
+
/**
* Returns the value of HConstants.ZOOKEEPER_QUORUM.
* For tests only
* @param configuration
* @return ZooKeeper quorum string if defined, null otherwise
*/
+ @Deprecated
public static String getZKQuorum(final Configuration configuration) {
Preconditions.checkNotNull(configuration);
- return configuration.get(HConstants.ZOOKEEPER_QUORUM);
+ return configuration.get(HConstants.CLIENT_ZOOKEEPER_QUORUM,
+ configuration.get(HConstants.ZOOKEEPER_QUORUM));
}
/**
@@ -626,6 +695,9 @@ public final class PhoenixConfigurationUtil {
public static String getOutputCluster(final Configuration configuration) {
Preconditions.checkNotNull(configuration);
String quorum = configuration.get(MAPREDUCE_OUTPUT_CLUSTER_QUORUM);
+ if (quorum == null) {
+ quorum = configuration.get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
+ }
if (quorum == null) {
quorum = configuration.get(HConstants.ZOOKEEPER_QUORUM);
}
@@ -637,6 +709,7 @@ public final class PhoenixConfigurationUtil {
* @param configuration
* @return ZooKeeper quorum string if defined, null otherwise
*/
+ @Deprecated
public static String getOutputClusterZkQuorum(final Configuration
configuration) {
Preconditions.checkNotNull(configuration);
return configuration.get(MAPREDUCE_OUTPUT_CLUSTER_QUORUM);
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexUpgradeToolTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexUpgradeToolTest.java
index f06376fd43..c90a56d43f 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexUpgradeToolTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexUpgradeToolTest.java
@@ -19,6 +19,10 @@ package org.apache.phoenix.index;
import static org.apache.phoenix.mapreduce.index.IndexUpgradeTool.ROLLBACK_OP;
import static org.apache.phoenix.mapreduce.index.IndexUpgradeTool.UPGRADE_OP;
+import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
+import static
org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
import java.sql.Connection;
import java.util.Arrays;
@@ -34,6 +38,9 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.mapreduce.index.IndexUpgradeTool;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.query.ConnectionlessTest;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -47,7 +54,7 @@ import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
-public class IndexUpgradeToolTest {
+public class IndexUpgradeToolTest extends BaseConnectionlessQueryTest{
private static final String INPUT_LIST =
"TEST.MOCK1,TEST1.MOCK2,TEST.MOCK3";
private final boolean upgrade;
private static final String DUMMY_STRING_VALUE = "anyValue";
@@ -174,7 +181,11 @@ public class IndexUpgradeToolTest {
}
private void setupConfForConnectionlessQuery(Configuration conf) {
- conf.set(HConstants.ZOOKEEPER_QUORUM, PhoenixRuntime.CONNECTIONLESS);
+ String connectionlessUrl = PhoenixRuntime.JDBC_PROTOCOL_ZK +
JDBC_PROTOCOL_SEPARATOR
+ + CONNECTIONLESS + JDBC_PROTOCOL_TERMINATOR
+ + PHOENIX_TEST_DRIVER_URL_PARAM + JDBC_PROTOCOL_TERMINATOR;
+ PhoenixConfigurationUtil.setInputClusterUrl(conf, connectionlessUrl);
+ PhoenixConfigurationUtil.setOutputClusterUrl(conf, connectionlessUrl);
conf.unset(HConstants.ZOOKEEPER_CLIENT_PORT);
conf.unset(HConstants.ZOOKEEPER_ZNODE_PARENT);
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java
index b3677b27d2..e8f2864e8d 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java
@@ -17,13 +17,6 @@
*/
package org.apache.phoenix.mapreduce;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.junit.Test;
-import org.mockito.Mockito;
-
import static junit.framework.TestCase.assertTrue;
import static junit.framework.TestCase.fail;
import static org.apache.phoenix.mapreduce.util.
@@ -32,17 +25,33 @@ import static org.apache.phoenix.mapreduce.util.
PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_SPLIT_STRATEGY_CLAZZ;
import static org.apache.phoenix.mapreduce.util.
PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_STRATEGY_CLAZZ;
+import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_ZK;
+import static
org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
import static org.mockito.Mockito.when;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+
public class PhoenixMultiViewInputFormatTest {
+ private static String CONNECTIONLESS_URL =
+ JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + CONNECTIONLESS +
JDBC_PROTOCOL_TERMINATOR
+ + PHOENIX_TEST_DRIVER_URL_PARAM + JDBC_PROTOCOL_TERMINATOR;
+
@Test
public void testDefaultConfig() throws Exception {
PhoenixMultiViewInputFormat multiViewInputFormat = new
PhoenixMultiViewInputFormat();
Configuration config = new Configuration();
config.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, "10");
- config.set(HConstants.ZOOKEEPER_QUORUM, PhoenixRuntime.CONNECTIONLESS);
+ PhoenixConfigurationUtil.setInputClusterUrl(config,
CONNECTIONLESS_URL);
JobContext mockContext = Mockito.mock(JobContext.class);
when(mockContext.getConfiguration()).thenReturn(config);
@@ -58,7 +67,7 @@ public class PhoenixMultiViewInputFormatTest {
Configuration config = new Configuration();
config.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, "10");
config.set(MAPREDUCE_MULTI_INPUT_STRATEGY_CLAZZ, "dummy.path");
- config.set(HConstants.ZOOKEEPER_QUORUM, PhoenixRuntime.CONNECTIONLESS);
+ PhoenixConfigurationUtil.setInputClusterUrl(config,
CONNECTIONLESS_URL);
JobContext mockContext = Mockito.mock(JobContext.class);
when(mockContext.getConfiguration()).thenReturn(config);
@@ -77,7 +86,7 @@ public class PhoenixMultiViewInputFormatTest {
Configuration config = new Configuration();
config.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, "10");
config.set(MAPREDUCE_MULTI_INPUT_SPLIT_STRATEGY_CLAZZ, "dummy.path");
- config.set(HConstants.ZOOKEEPER_QUORUM, PhoenixRuntime.CONNECTIONLESS);
+ PhoenixConfigurationUtil.setInputClusterUrl(config,
CONNECTIONLESS_URL);
JobContext mockContext = Mockito.mock(JobContext.class);
when(mockContext.getConfiguration()).thenReturn(config);
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
index b678514c64..5052861bb1 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
@@ -17,9 +17,6 @@
*/
package org.apache.phoenix.mapreduce.util;
-import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
-import static
org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
import static org.junit.Assert.assertEquals;
import java.sql.Connection;
@@ -28,10 +25,12 @@ import java.sql.DriverManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.jdbc.ZKConnectionInfo;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MRJobType;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
@@ -45,10 +44,7 @@ public class PhoenixConfigurationUtilTest extends
BaseConnectionlessQueryTest {
private static final String ORIGINAL_CLUSTER_QUORUM = "myzookeeperhost";
private static final String OVERRIDE_CLUSTER_QUORUM =
"myoverridezookeeperhost";
- // This is a hack that relies on the way The URL is re-constructed from
Configuration to
- // generate a Test connection for the MR jobs
- protected static String TEST_ZK_QUORUM =
- CONNECTIONLESS + JDBC_PROTOCOL_TERMINATOR +
PHOENIX_TEST_DRIVER_URL_PARAM + JDBC_PROTOCOL_TERMINATOR;
+ protected static String TEST_URL =
TestUtil.PHOENIX_CONNECTIONLESS_JDBC_URL;
@Test
/**
@@ -76,7 +72,7 @@ public class PhoenixConfigurationUtilTest extends
BaseConnectionlessQueryTest {
" AS SELECT * FROM " + tableName + "\n";
conn.createStatement().execute(viewDdl);
final Configuration configuration = new Configuration ();
- configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM);
+ PhoenixConfigurationUtil.setOutputClusterUrl(configuration,
TEST_URL);
PhoenixConfigurationUtil.setOutputTableName(configuration,
viewName);
PhoenixConfigurationUtil.setPhysicalTableName(configuration,
viewName);
PhoenixConfigurationUtil.setUpsertColumnNames(configuration, new
String[] {"A_STRING", "A_BINARY", "COL1"});
@@ -105,7 +101,7 @@ public class PhoenixConfigurationUtilTest extends
BaseConnectionlessQueryTest {
" CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
conn.createStatement().execute(ddl);
final Configuration configuration = new Configuration ();
- configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM);
+ PhoenixConfigurationUtil.setOutputClusterUrl(configuration,
TEST_URL);
PhoenixConfigurationUtil.setOutputTableName(configuration,
tableName);
PhoenixConfigurationUtil.setPhysicalTableName(configuration,
tableName);
PhoenixConfigurationUtil.setUpsertColumnNames(configuration, new
String[] {"A_STRING", "A_BINARY", "COL1"});
@@ -132,7 +128,7 @@ public class PhoenixConfigurationUtilTest extends
BaseConnectionlessQueryTest {
" CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
conn.createStatement().execute(ddl);
final Configuration configuration = new Configuration ();
- configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM);
+ PhoenixConfigurationUtil.setOutputClusterUrl(configuration,
TEST_URL);
PhoenixConfigurationUtil.setOutputTableName(configuration,
tableName);
PhoenixConfigurationUtil.setPhysicalTableName(configuration,
tableName);
final String upserStatement =
PhoenixConfigurationUtil.getUpsertStatement(configuration);
@@ -153,7 +149,7 @@ public class PhoenixConfigurationUtilTest extends
BaseConnectionlessQueryTest {
" CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
conn.createStatement().execute(ddl);
final Configuration configuration = new Configuration ();
- configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM);
+ PhoenixConfigurationUtil.setInputClusterUrl(configuration,
TEST_URL);
PhoenixConfigurationUtil.setInputTableName(configuration,
tableName);
final String selectStatement =
PhoenixConfigurationUtil.getSelectStatement(configuration);
final String expectedSelectStatement = "SELECT \"A_STRING\" ,
\"A_BINARY\" , \"0\".\"COL1\" FROM " + tableName ;
@@ -175,7 +171,7 @@ public class PhoenixConfigurationUtilTest extends
BaseConnectionlessQueryTest {
" CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
conn.createStatement().execute(ddl);
final Configuration configuration = new Configuration ();
- configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM);
+ PhoenixConfigurationUtil.setInputClusterUrl(configuration,
TEST_URL);
PhoenixConfigurationUtil.setInputTableName(configuration,
fullTableName);
final String selectStatement =
PhoenixConfigurationUtil.getSelectStatement(configuration);
final String expectedSelectStatement = "SELECT \"A_STRING\" ,
\"A_BINARY\" , \"0\".\"COL1\" FROM " + fullTableName;
@@ -195,7 +191,7 @@ public class PhoenixConfigurationUtilTest extends
BaseConnectionlessQueryTest {
" CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
conn.createStatement().execute(ddl);
final Configuration configuration = new Configuration ();
- configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM);
+ PhoenixConfigurationUtil.setInputClusterUrl(configuration,
TEST_URL);
PhoenixConfigurationUtil.setInputTableName(configuration,
tableName);
PhoenixConfigurationUtil.setSelectColumnNames(configuration, new
String[]{"A_BINARY"});
final String selectStatement =
PhoenixConfigurationUtil.getSelectStatement(configuration);
@@ -215,7 +211,7 @@ public class PhoenixConfigurationUtilTest extends
BaseConnectionlessQueryTest {
" (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[])\n";
conn.createStatement().execute(ddl);
final Configuration configuration = new Configuration ();
- configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM);
+ PhoenixConfigurationUtil.setInputClusterUrl(configuration,
TEST_URL);
PhoenixConfigurationUtil.setSelectColumnNames(configuration,new
String[]{"ID","VCARRAY"});
PhoenixConfigurationUtil.setSchemaType(configuration,
SchemaType.QUERY);
PhoenixConfigurationUtil.setInputTableName(configuration,
tableName);
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionlessTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionlessTest.java
index bf260ff258..01f87167da 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionlessTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionlessTest.java
@@ -175,7 +175,11 @@ public class ConnectionlessTest {
@Test
public void testMultipleConnectionQueryServices() throws Exception {
String url1 = getUrl();
- String url2 = url1 + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR +
"LongRunningQueries";
+ // Non-ZK registries don't have heuristics to handle missing URL
elements
+ String url2 =
+ url1 + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR
+ + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR
+ + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR +
"LongRunningQueries";
Connection conn1 = DriverManager.getConnection(url1);
try {
assertEquals(StringUtil.EMPTY_STRING,
conn1.getMetaData().getUserName());
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
index 0719dc5a61..945f3e474a 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
@@ -28,6 +28,8 @@ import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.TestZKConnectionRegistry;
+import org.apache.phoenix.jdbc.ZKConnectionInfo;
import org.apache.phoenix.parse.HintNode.Hint;
import org.junit.Test;
@@ -122,6 +124,9 @@ public class QueryUtilTest {
Properties props = new Properties();
// standard lookup. this already checks if we set
hbase.zookeeper.clientPort
Configuration conf = new Configuration(false);
+ // Need this for HBase 3 where ZK is not the default
+ conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+ ZKConnectionInfo.ZK_REGISTRY_NAME);
conf.set(HConstants.ZOOKEEPER_QUORUM, "localhost");
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181");
String conn = QueryUtil.getConnectionUrl(props, conf);