This is an automated email from the ASF dual-hosted git repository.
dengzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 2aaba3c79e HIVE-26439: Skip collecting Kafka delegation tokens if
PLAINTEXT is set (#3488) (Yu-Wen Lai, reviewed by Laszlo Bodor, Zhihua Deng)
2aaba3c79e is described below
commit 2aaba3c79e740ef27fc263b5a8ff33ad679c5a12
Author: Yu-Wen <[email protected]>
AuthorDate: Fri Aug 5 10:42:08 2022 +0800
HIVE-26439: Skip collecting Kafka delegation tokens if PLAINTEXT is set
(#3488) (Yu-Wen Lai, reviewed by Laszlo Bodor, Zhihua Deng)
---
.../apache/hadoop/hive/ql/exec/tez/DagUtils.java | 24 +++++++++++-----------
1 file changed, 12 insertions(+), 12 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index d8ac63acea..eb4eef03bd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -312,20 +312,13 @@ public class DagUtils {
Map<String, PartitionDesc> partitions = work.getAliasToPartnInfo();
- for (PartitionDesc partition : partitions.values()) {
+ // We don't need to iterate on all partitions, and check the same
TableDesc.
+ PartitionDesc partition =
partitions.values().stream().findFirst().orElse(null);
+ if (partition != null) {
TableDesc tableDesc = partition.getTableDesc();
- boolean tokenCollected = collectKafkaDelegationTokenForTableDesc(dag,
conf, tableDesc);
- if (tokenCollected) {
+ if (collectKafkaDelegationTokenForTableDesc(dag, conf, tableDesc)) {
// don't collect delegation token again, if it was already successful
return;
- } else {
- /*
- * We don't need to iterate on all partitions, and check the same
TableDesc:
- * if partitions[0].getTableDesc() doesn't show a kafka table, let's
break the loop quickly.
- * Note: at this point we cannot return from this method, as
fileSinkTableDescs should
- * be checked too.
- */
- break;
}
}
@@ -339,6 +332,7 @@ public class DagUtils {
/**
* Tries to collect delegation tokens for kafka in the scope of a TableDesc.
+ * If "security.protocol" is set to "PLAINTEXT", we don't need to collect
delegation token at all.
* @param dag
* @param conf
* @param tableDesc
@@ -346,7 +340,13 @@ public class DagUtils {
*/
private boolean collectKafkaDelegationTokenForTableDesc(DAG dag, JobConf
conf, TableDesc tableDesc) {
String kafkaBrokers = (String)
tableDesc.getProperties().get("kafka.bootstrap.servers"); //FIXME:
KafkaTableProperties
- if (kafkaBrokers != null && !kafkaBrokers.isEmpty()) {
+ String consumerSecurityProtocol = (String) tableDesc.getProperties().get(
+ "kafka.consumer." + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
+ String producerSecurityProtocol = (String) tableDesc.getProperties().get(
+ "kafka.producer." + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
+ if (kafkaBrokers != null && !kafkaBrokers.isEmpty() &&
+
!CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL.equalsIgnoreCase(consumerSecurityProtocol)
&&
+
!CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL.equalsIgnoreCase(producerSecurityProtocol))
{
getKafkaDelegationTokenForBrokers(dag, conf, kafkaBrokers);
return true;
}