This is an automated email from the ASF dual-hosted git repository.
zabetak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 62c07ca1d9f HIVE-27374: Exception while getting kafka delegation
tokens in Kerberos/SSL enabled clusters (Stamatis Zampetakis reviewed by John
Sherman, Kokila N, Akshat Mathur, Ayush Saxena)
62c07ca1d9f is described below
commit 62c07ca1d9f37efcebc91021876300d2d2ab75a0
Author: Stamatis Zampetakis <[email protected]>
AuthorDate: Mon Jun 12 16:53:27 2023 +0200
HIVE-27374: Exception while getting kafka delegation tokens in Kerberos/SSL
enabled clusters (Stamatis Zampetakis reviewed by John Sherman, Kokila N,
Akshat Mathur, Ayush Saxena)
The Kafka client that is used to obtain the token from the Kafka broker
ignores the user defined `security.protocol` defined in the table
properties (and pretty much every SSL configuration) thus the
connection between the two cannot be established and an exception is
raised. In order to avoid the problem, all the user-defined SSL
properties must be propagated to the Kafka client retrieving the token.
1. Pick-up user defined security.protocol to obtain the delegation
token.
2. Propagate all necessary SSL properties to Kafka client retrieving
the token.
3. Add KafkaUtils#securityProtocol utility function for fetching the
protocol from properties and refactor
`KafkaDagCredentialSupplier#isTokenRequired` to use it.
4. Accept fetching Kafka stores from any location; the main motivation
is to accept paths to the local file system to facilitate testing
without having to setup a DFS cluster. Anyways if the copy fails a
proper IO exception will be raised with more details about what went
wrong.
5. Adapt exception in KafkaUtilsTest based on changes in store fetching
6. Create and configure a SAML_SSL enabled listener in
KafkaBrokerResource to allow tests with SSL.
Closes #4430
---
.../hive/kafka/KafkaDagCredentialSupplier.java | 27 ++++----
.../org/apache/hadoop/hive/kafka/KafkaUtils.java | 55 +++++++++++++++-
.../hadoop/hive/kafka/KafkaBrokerResource.java | 21 +++++-
.../hive/kafka/KafkaDagCredentialSupplierTest.java | 77 ++++++++++++++++++++--
.../apache/hadoop/hive/kafka/KafkaUtilsTest.java | 2 +-
5 files changed, 159 insertions(+), 23 deletions(-)
diff --git
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplier.java
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplier.java
index 74ddaacf4f8..2e36ecfb198 100644
---
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplier.java
+++
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplier.java
@@ -16,6 +16,7 @@
*/
package org.apache.hadoop.hive.kafka;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.tez.DagCredentialSupplier;
@@ -45,9 +46,7 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import static
org.apache.hadoop.hive.kafka.KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS;
-import static
org.apache.hadoop.hive.kafka.KafkaUtils.CONSUMER_CONFIGURATION_PREFIX;
import static
org.apache.hadoop.hive.kafka.KafkaUtils.KAFKA_DELEGATION_TOKEN_KEY;
-import static
org.apache.hadoop.hive.kafka.KafkaUtils.PRODUCER_CONFIGURATION_PREFIX;
public class KafkaDagCredentialSupplier implements DagCredentialSupplier {
private static final Logger LOG =
LoggerFactory.getLogger(KafkaDagCredentialSupplier.class);
@@ -90,18 +89,14 @@ public class KafkaDagCredentialSupplier implements
DagCredentialSupplier {
* @return true if a Kafka token is required for performing operations on
the specified table and false otherwise.
*/
private boolean isTokenRequired(TableDesc tableDesc) {
- String kafkaBrokers = (String)
tableDesc.getProperties().get(HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
- String consumerSecurityProtocol = (String) tableDesc.getProperties().get(
- CONSUMER_CONFIGURATION_PREFIX + "." +
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
- String producerSecurityProtocol = (String) tableDesc.getProperties().get(
- PRODUCER_CONFIGURATION_PREFIX + "." +
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
- return kafkaBrokers != null && !kafkaBrokers.isEmpty()
- &&
!CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL.equalsIgnoreCase(consumerSecurityProtocol)
- &&
!CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL.equalsIgnoreCase(producerSecurityProtocol);
+ String kafkaBrokers =
tableDesc.getProperties().getProperty(HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
+ SecurityProtocol protocol =
KafkaUtils.securityProtocol(tableDesc.getProperties());
+ return !StringUtils.isEmpty(kafkaBrokers) && SecurityProtocol.PLAINTEXT !=
protocol;
}
private Token<?> getKafkaDelegationTokenForBrokers(Configuration conf,
TableDesc tableDesc) {
- String kafkaBrokers = (String)
tableDesc.getProperties().get(HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
+ Properties tableProperties = tableDesc.getProperties();
+ String kafkaBrokers = (String)
tableProperties.get(HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
LOG.info("Getting kafka credentials for brokers: {}", kafkaBrokers);
String keytab = HiveConf.getVar(conf,
HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
@@ -114,7 +109,12 @@ public class KafkaDagCredentialSupplier implements
DagCredentialSupplier {
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers);
- config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
SecurityProtocol.SASL_PLAINTEXT.name);
+ SecurityProtocol protocol = KafkaUtils.securityProtocol(tableProperties);
+ if (protocol == null) {
+ protocol = SecurityProtocol.SASL_PLAINTEXT;
+ LOG.warn("Kafka security.protocol is undefined in table properties.
Using default {}", protocol.name);
+ }
+ config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.name);
String jaasConfig =
String.format("%s %s %s %s serviceName=\"%s\" keyTab=\"%s\"
principal=\"%s\";",
@@ -123,6 +123,9 @@ public class KafkaDagCredentialSupplier implements
DagCredentialSupplier {
config.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
LOG.debug("Jaas config for requesting kafka credentials: {}", jaasConfig);
+ Configuration confCopy = new Configuration(conf);
+ tableProperties.stringPropertyNames().forEach(key -> confCopy.set(key,
tableProperties.getProperty(key)));
+ KafkaUtils.setupKafkaSslProperties(confCopy, config);
CreateDelegationTokenOptions createDelegationTokenOptions = new
CreateDelegationTokenOptions();
try (AdminClient admin = AdminClient.create(config)) {
CreateDelegationTokenResult createResult =
admin.createDelegationToken(createDelegationTokenOptions);
diff --git
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java
index 60387f7e7cf..68d20e9aa59 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.kafka;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -39,6 +40,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
@@ -47,6 +49,7 @@ import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,6 +79,8 @@ final class KafkaUtils {
"org.apache.kafka.common.security.scram.ScramLoginModule required "
+ "username=\"%s\" password=\"%s\" serviceName=\"%s\"
tokenauth=true;";
static final Text KAFKA_DELEGATION_TOKEN_KEY = new
Text("KAFKA_DELEGATION_TOKEN");
+ private static final Set<String> SSL_CONFIG_KEYS =
+ ImmutableSet.copyOf(new
ConfigDef().withClientSslSupport().configKeys().keySet());
private KafkaUtils() {
}
@@ -138,6 +143,7 @@ final class KafkaUtils {
}
static void setupKafkaSslProperties(Configuration configuration, Properties
props) {
+ copySSLProperties(configuration, props);
// Setup SSL via credentials keystore if necessary
final String credKeystore =
configuration.get(KafkaTableProperties.HIVE_KAFKA_SSL_CREDENTIAL_KEYSTORE.getName());
if (!(credKeystore == null) && !credKeystore.isEmpty()) {
@@ -182,11 +188,27 @@ final class KafkaUtils {
}
}
+ /**
+ * Copies Kafka SSL properties from source configuration to target property
map.
+ * <p>
+ * It only copies SSL properties that are present in the source and not
present in the target. It is useful to
+ * propagate global configurations to the Kafka client but also account for
use-cases where table properties are not
+ * using the Hive specific prefixes ({@link #CONSUMER_CONFIGURATION_PREFIX},
{@link #PRODUCER_CONFIGURATION_PREFIX}).
+ * </p>
+ * @param source the configuration from which we will get the properties
+ * @param target the property map to which we will set the properties
+ */
+ private static void copySSLProperties(Configuration source, Properties
target) {
+ for (String p : SSL_CONFIG_KEYS) {
+ String v = source.get(p);
+ if (v != null && !target.containsKey(p)) {
+ target.setProperty(p, v);
+ }
+ }
+ }
+
private static void writeStoreToLocal(Configuration configuration, String
hdfsLoc, String localDest)
throws IOException, URISyntaxException {
- if(!"hdfs".equals(new URI(hdfsLoc).getScheme())) {
- throw new IllegalArgumentException("Kafka stores must be located in
HDFS, but received: " + hdfsLoc);
- }
try {
// Make sure the local resources directory is created
File localDir = new File(localDest);
@@ -397,4 +419,31 @@ final class KafkaUtils {
}
log.info("Kafka client running with following JAAS = [{}]", jaasConf);
}
+
+ /**
+ * Returns the security protocol if one is defined in the properties and
null otherwise.
+ * <p>The following properties are examined to determine the protocol:</p>
+ * <ol>
+ * <li>security.protocol</li>
+ * <li>kafka.consumer.security.protocol</li>
+ * <li>kafka.producer.security.protocol</li>
+ * </ol>
+ * <p>and the first non null/not empty is returned.</p>
+ * <p>Defining multiple security protocols at the same time is invalid but
this method is lenient and tries to pick
+ * the most reasonable option.</p>
+ * @param props the properties from which to obtain the protocol.
+ * @return the security protocol if one is defined in the properties and
null otherwise.
+ */
+ static SecurityProtocol securityProtocol(Properties props) {
+ String[] securityProtocolConfigs = new String[] {
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+ CONSUMER_CONFIGURATION_PREFIX + "." +
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+ PRODUCER_CONFIGURATION_PREFIX + "." +
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG };
+ for (String c : securityProtocolConfigs) {
+ String v = props.getProperty(c);
+ if (v != null && !v.isEmpty()) {
+ return SecurityProtocol.forName(v);
+ }
+ }
+ return null;
+ }
}
diff --git
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
index 48faf5ec70b..015b48eb57f 100644
---
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
+++
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
@@ -25,7 +25,9 @@ import kafka.utils.TestUtils;
import kafka.zk.AdminZkClient;
import kafka.zk.EmbeddedZookeeper;
import org.apache.commons.io.FileUtils;
+import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.test.TestSslUtils;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,12 +49,14 @@ class KafkaBrokerResource extends ExternalResource {
private static final String TOPIC = "TEST-CREATE_TOPIC";
static final String BROKER_IP_PORT = "127.0.0.1:9092";
static final String BROKER_SASL_PORT = "127.0.0.1:9093";
+ static final String BROKER_SASL_SSL_PORT = "127.0.0.1:9094";
private EmbeddedZookeeper zkServer;
private KafkaServer kafkaServer;
private AdminZkClient adminZkClient;
private Path tmpLogDir;
private String principal;
private String keytab;
+ private File truststoreFile;
/**
* Enables SASL for broker using the principal and keytab provided.
@@ -85,6 +89,7 @@ class KafkaBrokerResource extends ExternalResource {
listeners.put("L1", new BrokerListener(BROKER_IP_PORT, "PLAINTEXT"));
if (principal != null) {
listeners.put("L2", new BrokerListener(BROKER_SASL_PORT,
"SASL_PLAINTEXT"));
+ listeners.put("L3", new BrokerListener(BROKER_SASL_SSL_PORT,
"SASL_SSL"));
}
String listenersURLs = listeners.entrySet().stream().map((e) -> e.getKey()
+ "://" + e.getValue().url)
.collect(Collectors.joining(","));
@@ -98,7 +103,10 @@ class KafkaBrokerResource extends ExternalResource {
"com.sun.security.auth.module.Krb5LoginModule required",
"debug=true", "useKeyTab=true", "storeKey=true",
principal, keytab, principal + "/localhost");
brokerProps.setProperty("listener.name.l2.gssapi.sasl.jaas.config",
jaasConfig);
- brokerProps.setProperty("delegation.token.secret.key", "abcd");
+ brokerProps.setProperty("listener.name.l3.gssapi.sasl.jaas.config",
jaasConfig);
+ truststoreFile = File.createTempFile("kafka_truststore", "jks");
+ brokerProps.putAll(new
TestSslUtils.SslConfigsBuilder(Mode.SERVER).createNewTrustStore(truststoreFile).build());
+ brokerProps.setProperty("delegation.token.secret.key",
"AnyValueShouldDoHereItDoesntMatter");
}
brokerProps.setProperty("offsets.topic.replication.factor", "1");
brokerProps.setProperty("transaction.state.log.replication.factor", "1");
@@ -131,6 +139,17 @@ class KafkaBrokerResource extends ExternalResource {
}
}
+ Path getTruststorePath() {
+ if (truststoreFile == null) {
+ throw new IllegalStateException("Truststore is available only when SASL
is in use");
+ }
+ return truststoreFile.toPath();
+ }
+
+ String getTruststorePwd() {
+ return TestSslUtils.TRUST_STORE_PASSWORD;
+ }
+
void deleteTopic(@SuppressWarnings("SameParameterValue") String topic) {
adminZkClient.deleteTopic(topic);
}
diff --git
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplierTest.java
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplierTest.java
index fc2650cff6d..9ec03b97c6c 100644
---
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplierTest.java
+++
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplierTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.hadoop.hive.kafka;
+import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.CompilationOpContext;
@@ -27,17 +28,32 @@ import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.token.Token;
-
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import javax.crypto.spec.SecretKeySpec;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
import java.nio.file.Paths;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
import java.util.Collections;
+import java.util.Map;
import java.util.Properties;
import java.util.UUID;
+import static
org.apache.hadoop.crypto.key.JavaKeyStoreProvider.KEYSTORE_PASSWORD_DEFAULT;
+import static
org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
+
public class KafkaDagCredentialSupplierTest {
private static final java.nio.file.Path KEYSTORE_DIR =
Paths.get(System.getProperty("test.tmp.dir"), "kdc_root_dir" +
UUID.randomUUID());
@@ -75,13 +91,39 @@ public class KafkaDagCredentialSupplierTest {
}
@Test
- public void testObtainTokenNotNull() {
+ public void testObtainTokenFromSamlPlainTextListenerNotNull() {
+ Properties props = new Properties();
+ props.setProperty("kafka.bootstrap.servers",
KafkaBrokerResource.BROKER_SASL_PORT);
+ checkObtainToken(props);
+ }
+
+ @Test
+ public void testObtainTokenFromSamlSslListenerNotNull()
+ throws IOException, URISyntaxException, KeyStoreException,
CertificateException, NoSuchAlgorithmException {
+ Properties props = new Properties();
+
props.setProperty(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName(),
+ KafkaBrokerResource.BROKER_SASL_SSL_PORT);
+ // Should the SSL properties be divided between producer/consumer?
Probably not!
+ props.setProperty(SECURITY_PROTOCOL_CONFIG,
SecurityProtocol.SASL_SSL.name);
+ props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
"");
+ String pwdAlias = "HereIsAnAliasForTheKeyWhichHoldsTheTrustorePassword";
+ URI storeURI = createCredentialStore(ImmutableMap.of(pwdAlias,
KAFKA_BROKER_RESOURCE.getTruststorePwd()));
+
props.setProperty(KafkaTableProperties.HIVE_KAFKA_SSL_CREDENTIAL_KEYSTORE.getName(),
storeURI.toString());
+
props.setProperty(KafkaTableProperties.HIVE_KAFKA_SSL_TRUSTSTORE_PASSWORD.getName(),
pwdAlias);
+
props.setProperty(KafkaTableProperties.HIVE_KAFKA_SSL_KEYSTORE_PASSWORD.getName(),
"");
+
props.setProperty(KafkaTableProperties.HIVE_KAFKA_SSL_KEY_PASSWORD.getName(),
"");
+ String location =
KAFKA_BROKER_RESOURCE.getTruststorePath().toUri().toString();
+
props.setProperty(KafkaTableProperties.HIVE_SSL_TRUSTSTORE_LOCATION_CONFIG.getName(),
location);
+ checkObtainToken(props);
+ }
+
+ private void checkObtainToken(Properties kafkaTableProperties) {
HiveConf conf = new HiveConf();
conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION, "KERBEROS");
conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL,
HIVE_USER_NAME + "/localhost");
conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB,
HIVE_USER_KEYTAB.toString());
- TableDesc fileSinkDesc = createKafkaDesc();
+ TableDesc fileSinkDesc = createKafkaDesc(kafkaTableProperties);
MapWork work = createFileSinkWork(fileSinkDesc);
KafkaDagCredentialSupplier supplier = new KafkaDagCredentialSupplier();
Token<?> t = supplier.obtainToken(work,
Collections.singleton(fileSinkDesc), conf);
@@ -98,10 +140,33 @@ public class KafkaDagCredentialSupplierTest {
return work;
}
- private static TableDesc createKafkaDesc() {
- Properties props = new Properties();
+ private static TableDesc createKafkaDesc(Properties props) {
props.setProperty("name", "kafka_table_fake");
- props.setProperty("kafka.bootstrap.servers",
KafkaBrokerResource.BROKER_SASL_PORT);
return new TableDesc(KafkaInputFormat.class, KafkaOutputFormat.class,
props);
}
+
+ /**
+ * Creates a credential store holding the specified keys.
+ * <p>
+ * The {@link
org.apache.hadoop.crypto.key.JavaKeyStoreProvider#KEYSTORE_PASSWORD_DEFAULT} is
used to protect the
+ * keys in the store. Using a smarter/non-default password requires
additional (global) configuration settings so
+ * for the purpose of tests its better to avoid this.
+ * </p>
+ * @param keys a map holding pairs with the key name/alias and its secret
+ * @return a URI to the newly created store
+ */
+ private static URI createCredentialStore(Map<String, String> keys)
+ throws IOException, KeyStoreException, CertificateException,
NoSuchAlgorithmException, URISyntaxException {
+ java.nio.file.Path storePath = Files.createTempFile("credstore", ".jceks");
+ KeyStore ks = KeyStore.getInstance("JCEKS");
+ try (OutputStream fos = Files.newOutputStream(storePath)) {
+ ks.load(null, null);
+ for (Map.Entry<String, String> k : keys.entrySet()) {
+ ks.setKeyEntry(k.getKey(), new SecretKeySpec(k.getValue().getBytes(),
"AES/CTR/NoPadding"),
+ KEYSTORE_PASSWORD_DEFAULT, null);
+ }
+ ks.store(fos, KEYSTORE_PASSWORD_DEFAULT);
+ }
+ return new URI("jceks://file" + storePath);
+ }
}
diff --git
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java
index a19adefc712..eb501101c40 100644
--- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java
@@ -71,7 +71,7 @@ public class KafkaUtilsTest {
Assert.assertEquals(new Properties(), props);
}
- @Test(expected = IllegalArgumentException.class) public void
testSetupKafkaSslPropertiesSslNotInHdfs() {
+ @Test(expected = IllegalStateException.class) public void
testSetupKafkaSslPropertiesSslNotInHdfs() {
Configuration config = new Configuration();
config.set(KafkaTableProperties.HIVE_KAFKA_SSL_CREDENTIAL_KEYSTORE.getName(),
"nonexistentfile");
config.set(KafkaTableProperties.HIVE_SSL_TRUSTSTORE_LOCATION_CONFIG.getName(),
"madeup");