ayushtkn commented on code in PR #4430:
URL: https://github.com/apache/hive/pull/4430#discussion_r1238578830
##########
kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplier.java:
##########
@@ -91,17 +89,13 @@ public Text getTokenAlias() {
*/
private boolean isTokenRequired(TableDesc tableDesc) {
String kafkaBrokers = (String)
tableDesc.getProperties().get(HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
- String consumerSecurityProtocol = (String) tableDesc.getProperties().get(
- CONSUMER_CONFIGURATION_PREFIX + "." +
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
- String producerSecurityProtocol = (String) tableDesc.getProperties().get(
- PRODUCER_CONFIGURATION_PREFIX + "." +
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
- return kafkaBrokers != null && !kafkaBrokers.isEmpty()
- &&
!CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL.equalsIgnoreCase(consumerSecurityProtocol)
- &&
!CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL.equalsIgnoreCase(producerSecurityProtocol);
+ SecurityProtocol protocol =
KafkaUtils.securityProtocol(tableDesc.getProperties());
+ return kafkaBrokers != null && !kafkaBrokers.isEmpty() &&
SecurityProtocol.PLAINTEXT != protocol;
Review Comment:
Can we use ```StringUtils.isEmpty()```
something like
```
!StringUtils.isEmpty(kafkaBrokers) && SecurityProtocol.PLAINTEXT != protocol;
```
##########
kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplier.java:
##########
@@ -91,17 +89,13 @@ public Text getTokenAlias() {
*/
private boolean isTokenRequired(TableDesc tableDesc) {
String kafkaBrokers = (String)
tableDesc.getProperties().get(HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
- String consumerSecurityProtocol = (String) tableDesc.getProperties().get(
- CONSUMER_CONFIGURATION_PREFIX + "." +
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
- String producerSecurityProtocol = (String) tableDesc.getProperties().get(
- PRODUCER_CONFIGURATION_PREFIX + "." +
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
- return kafkaBrokers != null && !kafkaBrokers.isEmpty()
- &&
!CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL.equalsIgnoreCase(consumerSecurityProtocol)
- &&
!CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL.equalsIgnoreCase(producerSecurityProtocol);
+ SecurityProtocol protocol =
KafkaUtils.securityProtocol(tableDesc.getProperties());
+ return kafkaBrokers != null && !kafkaBrokers.isEmpty() &&
SecurityProtocol.PLAINTEXT != protocol;
}
private Token<?> getKafkaDelegationTokenForBrokers(Configuration conf,
TableDesc tableDesc) {
- String kafkaBrokers = (String)
tableDesc.getProperties().get(HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
+ Properties tableProperties = tableDesc.getProperties();
+ String kafkaBrokers = (String)
tableProperties.get(HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
Review Comment:
can we do
```
String kafkaBrokers =
tableProperties.getProperty(HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
```
rather than casting to (String) here
##########
kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplierTest.java:
##########
@@ -75,13 +93,39 @@ public static void stopCluster() {
}
@Test
- public void testObtainTokenNotNull() {
+ public void testObtainTokenFromSamlPlainTextListenerNotNull() {
+ Properties props = new Properties();
+ props.setProperty("kafka.bootstrap.servers",
KafkaBrokerResource.BROKER_SASL_PORT);
+ checkObtainToken(props);
+ }
+
+ @Test
+ public void testObtainTokenFromSamlSslListenerNotNull()
+ throws IOException, URISyntaxException, KeyStoreException,
CertificateException, NoSuchAlgorithmException {
+ Properties props = new Properties();
+
props.setProperty(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName(),
+ KafkaBrokerResource.BROKER_SASL_SSL_PORT);
+ // Should the SSL properties be divided between producer/consumer?
Probably not!
+ props.setProperty(SECURITY_PROTOCOL_CONFIG,
SecurityProtocol.SASL_SSL.name);
+ props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
"");
+ String pwdAlias = "HereIsAnAliasForTheKeyWhichHoldsTheTrustorePassword";
+ URI storeURI = createCredentialStore(ImmutableMap.of(pwdAlias,
KAFKA_BROKER_RESOURCE.getTruststorePwd()));
+
props.setProperty(KafkaTableProperties.HIVE_KAFKA_SSL_CREDENTIAL_KEYSTORE.getName(),
storeURI.toString());
+
props.setProperty(KafkaTableProperties.HIVE_KAFKA_SSL_TRUSTSTORE_PASSWORD.getName(),
pwdAlias);
+
props.setProperty(KafkaTableProperties.HIVE_KAFKA_SSL_KEYSTORE_PASSWORD.getName(),
"");
+
props.setProperty(KafkaTableProperties.HIVE_KAFKA_SSL_KEY_PASSWORD.getName(),
"");
+ String location =
KAFKA_BROKER_RESOURCE.getTruststorePath().toUri().toString();
Review Comment:
``getTruststorePath()`` by code can return ``null`` as well, so, if does we
would fail with NPE here
```
Path getTruststorePath() {
return truststoreFile != null ? truststoreFile.toPath() : null;
}
```
##########
kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplier.java:
##########
@@ -114,7 +108,12 @@ private Token<?>
getKafkaDelegationTokenForBrokers(Configuration conf, TableDesc
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers);
- config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
SecurityProtocol.SASL_PLAINTEXT.name);
+ SecurityProtocol protocol = KafkaUtils.securityProtocol(tableProperties);
+ if (protocol == null) {
+ protocol = SecurityProtocol.SASL_PLAINTEXT;
+ LOG.warn("Kafka security.protocol is undefined in table properties.
Using default {}", protocol.name);
Review Comment:
nit:
you could have saved this if check, if in ``KafkaUtils.securityProtocol``
rather than returning ``null``, return the default & put a log before that
##########
kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java:
##########
@@ -397,4 +421,31 @@ static void addKerberosJaasConf(Configuration
configuration, Properties props) {
}
log.info("Kafka client running with following JAAS = [{}]", jaasConf);
}
+
+ /**
+ * Returns the security protocol if one is defined in the properties and
null otherwise.
+ * <p>The following properties are examined to determine the protocol:</p>
+ * <ol>
+ * <li>security.protocol</li>
+ * <li>kafka.consumer.security.protocol</li>
+ * <li>kafka.producer.security.protocol</li>
+ * </ol>
+ * <p>and the first non null/not empty is returned.</p>
+ * <p>Defining multiple security protocols at the same time is invalid but
this method is lenient and tries to pick
+ * the most reasonable option.</p>
+ * @param props the properties from which to obtain the protocol.
+ * @return the security protocol if one is defined in the properties and
null otherwise.
+ */
+ static SecurityProtocol securityProtocol(Properties props) {
+ List<String> securityProtocolConfigs =
Arrays.asList(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+ CONSUMER_CONFIGURATION_PREFIX + "." +
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+ PRODUCER_CONFIGURATION_PREFIX + "." +
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
Review Comment:
Why you need ``Arrays.asList``, can't we directly have an array?
```
String[] securityProtocolConfigs = new String[]
{CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
CONSUMER_CONFIGURATION_PREFIX + "." +
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
PRODUCER_CONFIGURATION_PREFIX + "." +
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG};
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]