This is an automated email from the ASF dual-hosted git repository.

zabetak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 62c07ca1d9f HIVE-27374: Exception while getting kafka delegation 
tokens in Kerberos/SSL enabled clusters (Stamatis Zampetakis reviewed by John 
Sherman, Kokila N, Akshat Mathur, Ayush Saxena)
62c07ca1d9f is described below

commit 62c07ca1d9f37efcebc91021876300d2d2ab75a0
Author: Stamatis Zampetakis <[email protected]>
AuthorDate: Mon Jun 12 16:53:27 2023 +0200

    HIVE-27374: Exception while getting kafka delegation tokens in Kerberos/SSL 
enabled clusters (Stamatis Zampetakis reviewed by John Sherman, Kokila N, 
Akshat Mathur, Ayush Saxena)
    
    The Kafka client that is used to obtain the token from the Kafka broker
    ignores the user defined `security.protocol` defined in the table
    properties (and pretty much every SSL configuration) thus the
    connection between the two cannot be established and an exception is
    raised. In order to avoid the problem, all the user-defined SSL
    properties must be propagated to the Kafka client retrieving the token.
    
    1. Pick-up user defined security.protocol to obtain the delegation
    token.
    2. Propagate all necessary SSL properties to Kafka client retrieving
    the token.
    3. Add KafkaUtils#securityProtocol utility function for fetching the
    protocol from properties and refactor
    `KafkaDagCredentialSupplier#isTokenRequired` to use it.
    4. Accept fetching Kafka stores from any location; the main motivation
    is to accept paths to the local file system to facilitate testing
    without having to setup a DFS cluster. Anyways if the copy fails a
    proper IO exception will be raised with more details about what went
    wrong.
    5. Adapt exception in KafkaUtilsTest based on changes in store fetching
    6. Create and configure a SAML_SSL enabled listener in
    KafkaBrokerResource to allow tests with SSL.
    
    Closes #4430
---
 .../hive/kafka/KafkaDagCredentialSupplier.java     | 27 ++++----
 .../org/apache/hadoop/hive/kafka/KafkaUtils.java   | 55 +++++++++++++++-
 .../hadoop/hive/kafka/KafkaBrokerResource.java     | 21 +++++-
 .../hive/kafka/KafkaDagCredentialSupplierTest.java | 77 ++++++++++++++++++++--
 .../apache/hadoop/hive/kafka/KafkaUtilsTest.java   |  2 +-
 5 files changed, 159 insertions(+), 23 deletions(-)

diff --git 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplier.java
 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplier.java
index 74ddaacf4f8..2e36ecfb198 100644
--- 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplier.java
+++ 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplier.java
@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.hive.kafka;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.tez.DagCredentialSupplier;
@@ -45,9 +46,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
 import static 
org.apache.hadoop.hive.kafka.KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS;
-import static 
org.apache.hadoop.hive.kafka.KafkaUtils.CONSUMER_CONFIGURATION_PREFIX;
 import static 
org.apache.hadoop.hive.kafka.KafkaUtils.KAFKA_DELEGATION_TOKEN_KEY;
-import static 
org.apache.hadoop.hive.kafka.KafkaUtils.PRODUCER_CONFIGURATION_PREFIX;
 
 public class KafkaDagCredentialSupplier implements DagCredentialSupplier {
   private static final Logger LOG = 
LoggerFactory.getLogger(KafkaDagCredentialSupplier.class);
@@ -90,18 +89,14 @@ public class KafkaDagCredentialSupplier implements 
DagCredentialSupplier {
    * @return true if a Kafka token is required for performing operations on 
the specified table and false otherwise.
    */
   private boolean isTokenRequired(TableDesc tableDesc) {
-    String kafkaBrokers = (String) 
tableDesc.getProperties().get(HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
-    String consumerSecurityProtocol = (String) tableDesc.getProperties().get(
-        CONSUMER_CONFIGURATION_PREFIX + "." + 
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
-    String producerSecurityProtocol = (String) tableDesc.getProperties().get(
-        PRODUCER_CONFIGURATION_PREFIX + "." + 
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
-    return kafkaBrokers != null && !kafkaBrokers.isEmpty()
-        && 
!CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL.equalsIgnoreCase(consumerSecurityProtocol)
-        && 
!CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL.equalsIgnoreCase(producerSecurityProtocol);
+    String kafkaBrokers = 
tableDesc.getProperties().getProperty(HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
+    SecurityProtocol protocol = 
KafkaUtils.securityProtocol(tableDesc.getProperties());
+    return !StringUtils.isEmpty(kafkaBrokers) && SecurityProtocol.PLAINTEXT != 
protocol;
   }
 
   private Token<?> getKafkaDelegationTokenForBrokers(Configuration conf, 
TableDesc tableDesc) {
-    String kafkaBrokers = (String) 
tableDesc.getProperties().get(HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
+    Properties tableProperties = tableDesc.getProperties();
+    String kafkaBrokers = (String) 
tableProperties.get(HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
     LOG.info("Getting kafka credentials for brokers: {}", kafkaBrokers);
 
     String keytab = HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
@@ -114,7 +109,12 @@ public class KafkaDagCredentialSupplier implements 
DagCredentialSupplier {
 
     Properties config = new Properties();
     config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers);
-    config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
SecurityProtocol.SASL_PLAINTEXT.name);
+    SecurityProtocol protocol = KafkaUtils.securityProtocol(tableProperties);
+    if (protocol == null) {
+      protocol = SecurityProtocol.SASL_PLAINTEXT;
+      LOG.warn("Kafka security.protocol is undefined in table properties. 
Using default {}", protocol.name);
+    }
+    config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.name);
 
     String jaasConfig =
         String.format("%s %s %s %s serviceName=\"%s\" keyTab=\"%s\" 
principal=\"%s\";",
@@ -123,6 +123,9 @@ public class KafkaDagCredentialSupplier implements 
DagCredentialSupplier {
     config.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
 
     LOG.debug("Jaas config for requesting kafka credentials: {}", jaasConfig);
+    Configuration confCopy = new Configuration(conf);
+    tableProperties.stringPropertyNames().forEach(key -> confCopy.set(key, 
tableProperties.getProperty(key)));
+    KafkaUtils.setupKafkaSslProperties(confCopy, config);
     CreateDelegationTokenOptions createDelegationTokenOptions = new 
CreateDelegationTokenOptions();
     try (AdminClient admin = AdminClient.create(config)) {
       CreateDelegationTokenResult createResult = 
admin.createDelegationToken(createDelegationTokenOptions);
diff --git 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java
index 60387f7e7cf..68d20e9aa59 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.kafka;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -39,6 +40,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.AuthorizationException;
@@ -47,6 +49,7 @@ import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,6 +79,8 @@ final class KafkaUtils {
       "org.apache.kafka.common.security.scram.ScramLoginModule required "
           + "username=\"%s\" password=\"%s\" serviceName=\"%s\" 
tokenauth=true;";
   static final Text KAFKA_DELEGATION_TOKEN_KEY = new 
Text("KAFKA_DELEGATION_TOKEN");
+  private static final Set<String> SSL_CONFIG_KEYS =
+      ImmutableSet.copyOf(new 
ConfigDef().withClientSslSupport().configKeys().keySet());
 
   private KafkaUtils() {
   }
@@ -138,6 +143,7 @@ final class KafkaUtils {
   }
 
   static void setupKafkaSslProperties(Configuration configuration, Properties 
props) {
+    copySSLProperties(configuration, props);
     // Setup SSL via credentials keystore if necessary
     final String credKeystore = 
configuration.get(KafkaTableProperties.HIVE_KAFKA_SSL_CREDENTIAL_KEYSTORE.getName());
     if (!(credKeystore == null) && !credKeystore.isEmpty()) {
@@ -182,11 +188,27 @@ final class KafkaUtils {
     }
   }
 
+  /**
+   * Copies Kafka SSL properties from source configuration to target property 
map.
+   * <p>
+   * It only copies SSL properties that are present in the source and not 
present in the target. It is useful to
+   * propagate global configurations to the Kafka client but also account for 
use-cases where table properties are not
+   * using the Hive specific prefixes ({@link #CONSUMER_CONFIGURATION_PREFIX}, 
{@link #PRODUCER_CONFIGURATION_PREFIX}).
+   * </p>
+   * @param source the configuration from which we will get the properties 
+   * @param target the property map to which we will set the properties
+   */
+  private static void copySSLProperties(Configuration source, Properties 
target) {
+    for (String p : SSL_CONFIG_KEYS) {
+      String v = source.get(p);
+      if (v != null && !target.containsKey(p)) {
+        target.setProperty(p, v);
+      }
+    }
+  }
+
   private static void writeStoreToLocal(Configuration configuration, String 
hdfsLoc, String localDest)
       throws IOException, URISyntaxException {
-    if(!"hdfs".equals(new URI(hdfsLoc).getScheme())) {
-      throw new IllegalArgumentException("Kafka stores must be located in 
HDFS, but received: " + hdfsLoc);
-    }
     try {
       // Make sure the local resources directory is created
       File localDir = new File(localDest);
@@ -397,4 +419,31 @@ final class KafkaUtils {
     }
     log.info("Kafka client running with following JAAS = [{}]", jaasConf);
   }
+
+  /**
+   * Returns the security protocol if one is defined in the properties and 
null otherwise.
+   * <p>The following properties are examined to determine the protocol:</p>
+   * <ol>
+   *   <li>security.protocol</li>
+   *   <li>kafka.consumer.security.protocol</li>
+   *   <li>kafka.producer.security.protocol</li>
+   * </ol>
+   * <p>and the first non null/not empty is returned.</p>
+   * <p>Defining multiple security protocols at the same time is invalid but 
this method is lenient and tries to pick
+   * the most reasonable option.</p>
+   * @param props the properties from which to obtain the protocol.
+   * @return the security protocol if one is defined in the properties and 
null otherwise.
+   */
+  static SecurityProtocol securityProtocol(Properties props) {
+    String[] securityProtocolConfigs = new String[] { 
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+        CONSUMER_CONFIGURATION_PREFIX + "." + 
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+        PRODUCER_CONFIGURATION_PREFIX + "." + 
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG };
+    for (String c : securityProtocolConfigs) {
+      String v = props.getProperty(c);
+      if (v != null && !v.isEmpty()) {
+        return SecurityProtocol.forName(v);
+      }
+    }
+    return null;
+  }
 }
diff --git 
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java 
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
index 48faf5ec70b..015b48eb57f 100644
--- 
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
+++ 
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
@@ -25,7 +25,9 @@ import kafka.utils.TestUtils;
 import kafka.zk.AdminZkClient;
 import kafka.zk.EmbeddedZookeeper;
 import org.apache.commons.io.FileUtils;
+import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.test.TestSslUtils;
 import org.junit.rules.ExternalResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,12 +49,14 @@ class KafkaBrokerResource extends ExternalResource {
   private static final String TOPIC = "TEST-CREATE_TOPIC";
   static final String BROKER_IP_PORT = "127.0.0.1:9092";
   static final String BROKER_SASL_PORT = "127.0.0.1:9093";
+  static final String BROKER_SASL_SSL_PORT = "127.0.0.1:9094";
   private EmbeddedZookeeper zkServer;
   private KafkaServer kafkaServer;
   private AdminZkClient adminZkClient;
   private Path tmpLogDir;
   private String principal;
   private String keytab;
+  private File truststoreFile;
 
   /**
    * Enables SASL for broker using the principal and keytab provided.
@@ -85,6 +89,7 @@ class KafkaBrokerResource extends ExternalResource {
     listeners.put("L1", new BrokerListener(BROKER_IP_PORT, "PLAINTEXT"));
     if (principal != null) {
       listeners.put("L2", new BrokerListener(BROKER_SASL_PORT, 
"SASL_PLAINTEXT"));
+      listeners.put("L3", new BrokerListener(BROKER_SASL_SSL_PORT, 
"SASL_SSL"));
     }
     String listenersURLs = listeners.entrySet().stream().map((e) -> e.getKey() 
+ "://" + e.getValue().url)
         .collect(Collectors.joining(","));
@@ -98,7 +103,10 @@ class KafkaBrokerResource extends ExternalResource {
           "com.sun.security.auth.module.Krb5LoginModule required", 
"debug=true", "useKeyTab=true", "storeKey=true",
           principal, keytab, principal + "/localhost");
       brokerProps.setProperty("listener.name.l2.gssapi.sasl.jaas.config", 
jaasConfig);
-      brokerProps.setProperty("delegation.token.secret.key", "abcd");
+      brokerProps.setProperty("listener.name.l3.gssapi.sasl.jaas.config", 
jaasConfig);
+      truststoreFile = File.createTempFile("kafka_truststore", "jks");
+      brokerProps.putAll(new 
TestSslUtils.SslConfigsBuilder(Mode.SERVER).createNewTrustStore(truststoreFile).build());
+      brokerProps.setProperty("delegation.token.secret.key", 
"AnyValueShouldDoHereItDoesntMatter");
     }
     brokerProps.setProperty("offsets.topic.replication.factor", "1");
     brokerProps.setProperty("transaction.state.log.replication.factor", "1");
@@ -131,6 +139,17 @@ class KafkaBrokerResource extends ExternalResource {
     }
   }
 
+  Path getTruststorePath() {
+    if (truststoreFile == null) {
+      throw new IllegalStateException("Truststore is available only when SASL 
is in use");
+    }
+    return truststoreFile.toPath();
+  }
+
+  String getTruststorePwd() {
+    return TestSslUtils.TRUST_STORE_PASSWORD;
+  }
+
   void deleteTopic(@SuppressWarnings("SameParameterValue") String topic) {
     adminZkClient.deleteTopic(topic);
   }
diff --git 
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplierTest.java
 
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplierTest.java
index fc2650cff6d..9ec03b97c6c 100644
--- 
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplierTest.java
+++ 
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplierTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.hive.kafka;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
@@ -27,17 +28,32 @@ import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.apache.hadoop.security.token.Token;
-
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import javax.crypto.spec.SecretKeySpec;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
 import java.util.Collections;
+import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
+import static 
org.apache.hadoop.crypto.key.JavaKeyStoreProvider.KEYSTORE_PASSWORD_DEFAULT;
+import static 
org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
+
 public class KafkaDagCredentialSupplierTest {
   private static final java.nio.file.Path KEYSTORE_DIR =
       Paths.get(System.getProperty("test.tmp.dir"), "kdc_root_dir" + 
UUID.randomUUID());
@@ -75,13 +91,39 @@ public class KafkaDagCredentialSupplierTest {
   }
 
   @Test
-  public void testObtainTokenNotNull() {
+  public void testObtainTokenFromSamlPlainTextListenerNotNull() {
+    Properties props = new Properties();
+    props.setProperty("kafka.bootstrap.servers", 
KafkaBrokerResource.BROKER_SASL_PORT);
+    checkObtainToken(props);
+  }
+
+  @Test
+  public void testObtainTokenFromSamlSslListenerNotNull()
+      throws IOException, URISyntaxException, KeyStoreException, 
CertificateException, NoSuchAlgorithmException {
+    Properties props = new Properties();
+    
props.setProperty(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName(),
+        KafkaBrokerResource.BROKER_SASL_SSL_PORT);
+    // Should the SSL properties be divided between producer/consumer? 
Probably not!
+    props.setProperty(SECURITY_PROTOCOL_CONFIG, 
SecurityProtocol.SASL_SSL.name);
+    props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, 
"");
+    String pwdAlias = "HereIsAnAliasForTheKeyWhichHoldsTheTrustorePassword";
+    URI storeURI = createCredentialStore(ImmutableMap.of(pwdAlias, 
KAFKA_BROKER_RESOURCE.getTruststorePwd()));
+    
props.setProperty(KafkaTableProperties.HIVE_KAFKA_SSL_CREDENTIAL_KEYSTORE.getName(),
 storeURI.toString());
+    
props.setProperty(KafkaTableProperties.HIVE_KAFKA_SSL_TRUSTSTORE_PASSWORD.getName(),
 pwdAlias);
+    
props.setProperty(KafkaTableProperties.HIVE_KAFKA_SSL_KEYSTORE_PASSWORD.getName(),
 "");
+    
props.setProperty(KafkaTableProperties.HIVE_KAFKA_SSL_KEY_PASSWORD.getName(), 
"");
+    String location = 
KAFKA_BROKER_RESOURCE.getTruststorePath().toUri().toString();
+    
props.setProperty(KafkaTableProperties.HIVE_SSL_TRUSTSTORE_LOCATION_CONFIG.getName(),
 location);
+    checkObtainToken(props);
+  }
+
+  private void checkObtainToken(Properties kafkaTableProperties) {
     HiveConf conf = new HiveConf();
     conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION, "KERBEROS");
     conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL, 
HIVE_USER_NAME + "/localhost");
     conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB, 
HIVE_USER_KEYTAB.toString());
 
-    TableDesc fileSinkDesc = createKafkaDesc();
+    TableDesc fileSinkDesc = createKafkaDesc(kafkaTableProperties);
     MapWork work = createFileSinkWork(fileSinkDesc);
     KafkaDagCredentialSupplier supplier = new KafkaDagCredentialSupplier();
     Token<?> t = supplier.obtainToken(work, 
Collections.singleton(fileSinkDesc), conf);
@@ -98,10 +140,33 @@ public class KafkaDagCredentialSupplierTest {
     return work;
   }
 
-  private static TableDesc createKafkaDesc() {
-    Properties props = new Properties();
+  private static TableDesc createKafkaDesc(Properties props) {
     props.setProperty("name", "kafka_table_fake");
-    props.setProperty("kafka.bootstrap.servers", 
KafkaBrokerResource.BROKER_SASL_PORT);
     return new TableDesc(KafkaInputFormat.class, KafkaOutputFormat.class, 
props);
   }
+
+  /**
+   * Creates a credential store holding the specified keys.
+   * <p>
+   * The  {@link 
org.apache.hadoop.crypto.key.JavaKeyStoreProvider#KEYSTORE_PASSWORD_DEFAULT} is 
used to protect the
+   * keys in the store. Using a smarter/non-default password requires 
additional (global) configuration settings so
+   * for the purpose of tests its better to avoid this.
+   * </p>
+   * @param keys a map holding pairs with the key name/alias and its secret
+   * @return a URI to the newly created store
+   */
+  private static URI createCredentialStore(Map<String, String> keys)
+      throws IOException, KeyStoreException, CertificateException, 
NoSuchAlgorithmException, URISyntaxException {
+    java.nio.file.Path storePath = Files.createTempFile("credstore", ".jceks");
+    KeyStore ks = KeyStore.getInstance("JCEKS");
+    try (OutputStream fos = Files.newOutputStream(storePath)) {
+      ks.load(null, null);
+      for (Map.Entry<String, String> k : keys.entrySet()) {
+        ks.setKeyEntry(k.getKey(), new SecretKeySpec(k.getValue().getBytes(), 
"AES/CTR/NoPadding"),
+            KEYSTORE_PASSWORD_DEFAULT, null);
+      }
+      ks.store(fos, KEYSTORE_PASSWORD_DEFAULT);
+    }
+    return new URI("jceks://file" + storePath);
+  }
 }
diff --git 
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java 
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java
index a19adefc712..eb501101c40 100644
--- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java
@@ -71,7 +71,7 @@ public class KafkaUtilsTest {
     Assert.assertEquals(new Properties(), props);
   }
 
-  @Test(expected = IllegalArgumentException.class) public void 
testSetupKafkaSslPropertiesSslNotInHdfs() {
+  @Test(expected = IllegalStateException.class) public void 
testSetupKafkaSslPropertiesSslNotInHdfs() {
     Configuration config = new Configuration();
     
config.set(KafkaTableProperties.HIVE_KAFKA_SSL_CREDENTIAL_KEYSTORE.getName(), 
"nonexistentfile");
     
config.set(KafkaTableProperties.HIVE_SSL_TRUSTSTORE_LOCATION_CONFIG.getName(), 
"madeup");

Reply via email to