This is an automated email from the ASF dual-hosted git repository.

zabetak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ed2f2993ca HIVE-29238: Upgrade Kafka version from 2.5.0 to 3.9.1 
(#6110)
2ed2f2993ca is described below

commit 2ed2f2993ca3492f8d910c738245ef7ea5d699a0
Author: ramitg254 <[email protected]>
AuthorDate: Fri Nov 14 14:46:12 2025 +0530

    HIVE-29238: Upgrade Kafka version from 2.5.0 to 3.9.1 (#6110)
---
 itests/qtest-druid/pom.xml                         | 10 ++-
 .../apache/hive/kafka/SingleNodeKafkaCluster.java  | 16 +++--
 .../hadoop/hive/kafka/HiveKafkaProducer.java       | 47 ++++++++++---
 .../hadoop/hive/kafka/KafkaRecordIterator.java     |  2 +-
 .../hadoop/hive/kafka/KafkaRecordReader.java       |  3 +-
 .../hive/kafka/VectorizedKafkaRecordReader.java    |  3 +-
 .../hadoop/hive/kafka/HiveKafkaProducerTest.java   | 13 +++-
 .../hadoop/hive/kafka/KafkaBrokerResource.java     | 10 +--
 .../hadoop/hive/kafka/KafkaRecordIteratorTest.java |  3 +-
 pom.xml                                            |  2 +-
 .../queries/clientpositive/kafka_storage_handler.q |  2 -
 .../kafka/kafka_storage_handler.q.out              | 78 ++++------------------
 12 files changed, 90 insertions(+), 99 deletions(-)

diff --git a/itests/qtest-druid/pom.xml b/itests/qtest-druid/pom.xml
index b59fce39ac6..5227f9b05e2 100644
--- a/itests/qtest-druid/pom.xml
+++ b/itests/qtest-druid/pom.xml
@@ -36,7 +36,6 @@
     <druid.derby.version>10.11.1.1</druid.derby.version>
     <druid.guava.version>16.0.1</druid.guava.version>
     <druid.guice.version>4.1.0</druid.guice.version>
-    <kafka.test.version>2.5.0</kafka.test.version>
     <druid.guice.version>4.1.0</druid.guice.version>
     <slf4j.version>1.7.30</slf4j.version>
   </properties>
@@ -219,12 +218,17 @@
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.12</artifactId>
-      <version>${kafka.test.version}</version>
+      <version>${kafka.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
-      <version>${kafka.test.version}</version>
+      <version>${kafka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-server</artifactId>
+      <version>${kafka.version}</version>
     </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
diff --git 
a/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java
 
b/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java
index 746830a9a6b..cd43a37b184 100644
--- 
a/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java
+++ 
b/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java
@@ -19,7 +19,7 @@
 package org.apache.hive.kafka;
 
 import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
+import kafka.server.KafkaServer;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.service.AbstractService;
@@ -29,6 +29,7 @@
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Time;
 
 import com.google.common.base.Throwables;
 import com.google.common.io.Files;
@@ -43,6 +44,7 @@
 import java.util.List;
 import java.util.Properties;
 import java.util.stream.IntStream;
+import scala.Option;
 
 /**
  * This class has the hooks to start and stop single node kafka cluster.
@@ -54,7 +56,7 @@ public class SingleNodeKafkaCluster extends AbstractService {
   private static final String LOCALHOST = "localhost";
 
 
-  private final KafkaServerStartable serverStartable;
+  private final KafkaServer server;
   private final int brokerPort;
   private final String kafkaServer;
 
@@ -81,8 +83,8 @@ public SingleNodeKafkaCluster(String name, String logDir, 
Integer zkPort, Intege
 
     properties.setProperty("zookeeper.connect", zkString);
     properties.setProperty("broker.id", String.valueOf(1));
-    properties.setProperty("host.name", LOCALHOST);
-    properties.setProperty("port", Integer.toString(brokerPort));
+    properties.setProperty("listeners", "PLAINTEXT://" + LOCALHOST + ":" + 
Integer.toString(brokerPort));
+    properties.setProperty("advertised.listeners", "PLAINTEXT://" + LOCALHOST 
+ ":" + Integer.toString(brokerPort));
     properties.setProperty("log.dir", logDir);
     // This property is very important, we are sending form records with a 
specific time
     // Thus need to make sure that they don't get DELETED
@@ -94,13 +96,13 @@ public SingleNodeKafkaCluster(String name, String logDir, 
Integer zkPort, Intege
     properties.setProperty("transaction.state.log.min.isr", String.valueOf(1));
     properties.setProperty("log.cleaner.dedupe.buffer.size", "1048577");
 
-    this.serverStartable = new 
KafkaServerStartable(KafkaConfig.fromProps(properties));
+    this.server = new KafkaServer(KafkaConfig.fromProps(properties), 
Time.SYSTEM, Option.empty(), false);
   }
 
 
   @Override
   protected void serviceStart() throws Exception {
-    serverStartable.startup();
+    server.startup();
     log.info("Kafka Server Started on port {}", brokerPort);
 
   }
@@ -108,7 +110,7 @@ protected void serviceStart() throws Exception {
   @Override
   protected void serviceStop() throws Exception {
     log.info("Stopping Kafka Server");
-    serverStartable.shutdown();
+    server.shutdown();
     log.info("Kafka Server Stopped");
   }
 
diff --git 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
index 7a7d0360a01..8aa156bab2e 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
@@ -33,6 +33,7 @@
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.Uuid;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,6 +68,11 @@ class HiveKafkaProducer<K, V> implements Producer<K, V> {
     kafkaProducer = new KafkaProducer<>(properties);
   }
 
+  @Override
+  public Uuid clientInstanceId(Duration timeout) {
+    throw new UnsupportedOperationException();
+  }
+
   @Override public void initTransactions() {
     kafkaProducer.initTransactions();
   }
@@ -138,11 +144,11 @@ synchronized void resumeTransaction(long producerId, 
short epoch) {
 
     Object transactionManager = getValue(kafkaProducer, "transactionManager");
 
-    Object topicPartitionBookkeeper = getValue(transactionManager, 
"topicPartitionBookkeeper");
+    Object txnPartitionMap = getValue(transactionManager, "txnPartitionMap");
     invoke(transactionManager,
         "transitionTo",
         
getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
-    invoke(topicPartitionBookkeeper, "reset");
+    invoke(txnPartitionMap, "reset");
     Object producerIdAndEpoch = getValue(transactionManager, 
"producerIdAndEpoch");
     setValue(producerIdAndEpoch, "producerId", producerId);
     setValue(producerIdAndEpoch, "epoch", epoch);
@@ -189,14 +195,35 @@ private void flushNewPartitions() {
 
   private synchronized TransactionalRequestResult enqueueNewPartitions() {
     Object transactionManager = getValue(kafkaProducer, "transactionManager");
-    Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
-    invoke(transactionManager,
-        "enqueueRequest",
-        new Class[] {txnRequestHandler.getClass().getSuperclass()},
-        new Object[] {txnRequestHandler});
-    return (TransactionalRequestResult) getValue(txnRequestHandler,
-        txnRequestHandler.getClass().getSuperclass(),
-        "result");
+    synchronized (transactionManager) {
+      Object newPartitionsInTransaction =
+              getValue(transactionManager, "newPartitionsInTransaction");
+      Object newPartitionsInTransactionIsEmpty =
+              invoke(newPartitionsInTransaction, "isEmpty");
+      TransactionalRequestResult result;
+      if (newPartitionsInTransactionIsEmpty instanceof Boolean
+              && !((Boolean) newPartitionsInTransactionIsEmpty)) {
+        Object txnRequestHandler =
+                invoke(transactionManager, 
"addPartitionsToTransactionHandler");
+        invoke(
+                transactionManager,
+                "enqueueRequest",
+                new Class[]{txnRequestHandler.getClass().getSuperclass()},
+                new Object[]{txnRequestHandler});
+
+        result = (TransactionalRequestResult)
+                getValue(
+                        txnRequestHandler,
+                        txnRequestHandler.getClass().getSuperclass(),
+                        "result");
+      } else {
+        // we don't have an operation but this operation string is also used in
+        // addPartitionsToTransactionHandler.
+        result = new TransactionalRequestResult("AddPartitionsToTxn");
+        result.done();
+      }
+      return result;
+    }
   }
 
   @SuppressWarnings("unchecked") private static Enum<?> getEnum(String 
enumFullName) {
diff --git 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
index 74614dea916..66784d28cbb 100644
--- 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
+++ 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
@@ -140,7 +140,7 @@ class KafkaRecordIterator implements 
Iterator<ConsumerRecord<byte[], byte[]>> {
       }
     } else {
       // case seek to beginning of stream
-      consumer.seekToBeginning(Collections.singleton(topicPartition));
+      consumer.seekToBeginning(topicPartitionList);
       // seekToBeginning is lazy thus need to call position() or poll(0)
       this.startOffset = consumer.position(topicPartition);
       LOG.info("Consumer at beginning of topic partition [{}], current start 
offset [{}]",
diff --git 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java
index 3651cf09f8a..2a18cd43bee 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java
@@ -30,6 +30,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
 import java.util.Iterator;
 import java.util.Properties;
 
@@ -150,7 +151,7 @@ private synchronized void initialize(KafkaInputSplit 
inputSplit, Configuration j
     LOG.trace("total read bytes [{}]", readBytes);
     if (consumer != null) {
       consumer.wakeup();
-      consumer.close();
+      consumer.close(Duration.ZERO);
     }
   }
 
diff --git 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java
 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java
index 5f55bbce20b..bb379f4be05 100644
--- 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java
+++ 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java
@@ -37,6 +37,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Iterator;
 import java.util.Properties;
 
@@ -150,7 +151,7 @@ private void cleanRowBoat() {
     LOG.trace("total read bytes [{}]", readBytes);
     if (consumer != null) {
       consumer.wakeup();
-      consumer.close();
+      consumer.close(Duration.ZERO);
     }
   }
 
diff --git 
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java
 
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java
index 8c9ed5f99b1..934e8eb30fb 100644
--- 
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java
+++ 
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java
@@ -25,6 +25,7 @@
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -158,7 +159,9 @@
   @Test(expected = org.apache.kafka.common.KafkaException.class) public void 
testWrongEpochAndId() {
     HiveKafkaProducer secondProducer = new 
HiveKafkaProducer(producerProperties);
     secondProducer.resumeTransaction(3434L, (short) 12);
-    secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), 
"__dummy_consumer_group");
+    secondProducer.sendOffsetsToTransaction(Collections.singletonMap(
+            new TopicPartition("dummy_topic", 0),
+            new OffsetAndMetadata(0L)), "__dummy_consumer_group");
     secondProducer.close(Duration.ZERO);
   }
 
@@ -169,7 +172,9 @@
     producer.close(Duration.ZERO);
     HiveKafkaProducer secondProducer = new 
HiveKafkaProducer(producerProperties);
     secondProducer.resumeTransaction(pid, (short) 12);
-    secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), 
"__dummy_consumer_group");
+    secondProducer.sendOffsetsToTransaction(Collections.singletonMap(
+            new TopicPartition("dummy_topic", 0),
+            new OffsetAndMetadata(0L)), "__dummy_consumer_group");
     secondProducer.close(Duration.ZERO);
   }
 
@@ -180,7 +185,9 @@
     producer.close(Duration.ZERO);
     HiveKafkaProducer secondProducer = new 
HiveKafkaProducer(producerProperties);
     secondProducer.resumeTransaction(45L, epoch);
-    secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), 
"__dummy_consumer_group");
+    secondProducer.sendOffsetsToTransaction(Collections.singletonMap(
+            new TopicPartition("dummy_topic", 0),
+            new OffsetAndMetadata(0L)), "__dummy_consumer_group");
     secondProducer.close(Duration.ZERO);
   }
 }
diff --git 
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java 
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
index e2f8bbafe01..84a79edeca0 100644
--- 
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
+++ 
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
@@ -26,7 +26,7 @@
 import kafka.zk.EmbeddedZookeeper;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hive.common.IPStackUtils;
-import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.network.ConnectionMode;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestSslUtils;
 import org.junit.rules.ExternalResource;
@@ -41,6 +41,7 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
+import scala.Option;
 
 /**
  * Test Helper Class to start and stop a kafka broker.
@@ -106,7 +107,8 @@ KafkaBrokerResource enableSASL(String principal, String 
keytab) {
       brokerProps.setProperty("listener.name.l2.gssapi.sasl.jaas.config", 
jaasConfig);
       brokerProps.setProperty("listener.name.l3.gssapi.sasl.jaas.config", 
jaasConfig);
       truststoreFile = File.createTempFile("kafka_truststore", "jks");
-      brokerProps.putAll(new 
TestSslUtils.SslConfigsBuilder(Mode.SERVER).createNewTrustStore(truststoreFile).build());
+      brokerProps.putAll(new 
TestSslUtils.SslConfigsBuilder(ConnectionMode.SERVER)
+              .createNewTrustStore(truststoreFile).build());
       brokerProps.setProperty("delegation.token.master.key", 
"AnyValueShouldDoHereItDoesntMatter");
     }
     brokerProps.setProperty("offsets.topic.replication.factor", "1");
@@ -116,9 +118,9 @@ KafkaBrokerResource enableSASL(String principal, String 
keytab) {
     kafkaServer = TestUtils.createServer(config, Time.SYSTEM);
     kafkaServer.startup();
     kafkaServer.zkClient();
-    adminZkClient = new AdminZkClient(kafkaServer.zkClient());
+    adminZkClient = new AdminZkClient(kafkaServer.zkClient(), Option.empty());
     LOG.info("Creating kafka TOPIC [{}]", TOPIC);
-    adminZkClient.createTopic(TOPIC, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+    adminZkClient.createTopic(TOPIC, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$, false);
   }
 
   /**
diff --git 
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
 
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
index b2dbf12817e..3df2c8c4231 100644
--- 
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
+++ 
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
@@ -44,6 +44,7 @@
 
 import javax.annotation.Nullable;
 import java.nio.charset.Charset;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
@@ -304,7 +305,7 @@ private static void sendData(List<ConsumerRecord<byte[], 
byte[]>> recordList, @N
   @After public void tearDown() {
     this.kafkaRecordIterator = null;
     if (this.consumer != null) {
-      this.consumer.close();
+      this.consumer.close(Duration.ZERO);
     }
   }
 
diff --git a/pom.xml b/pom.xml
index c2a61999945..e3030567fc0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -173,7 +173,7 @@
     <junit.version>4.13.2</junit.version>
     <junit.jupiter.version>5.13.3</junit.jupiter.version>
     <junit.vintage.version>5.13.3</junit.vintage.version>
-    <kafka.version>2.5.0</kafka.version>
+    <kafka.version>3.9.1</kafka.version>
     <kryo.version>5.5.0</kryo.version>
     <reflectasm.version>1.11.9</reflectasm.version>
     <kudu.version>1.17.0</kudu.version>
diff --git a/ql/src/test/queries/clientpositive/kafka_storage_handler.q 
b/ql/src/test/queries/clientpositive/kafka_storage_handler.q
index b8283e7b7f4..f4670f55348 100644
--- a/ql/src/test/queries/clientpositive/kafka_storage_handler.q
+++ b/ql/src/test/queries/clientpositive/kafka_storage_handler.q
@@ -1,5 +1,3 @@
---! qt:disabled:HIVE-23985
-
 SET hive.vectorized.execution.enabled=true;
 
 CREATE EXTERNAL TABLE kafka_table
diff --git 
a/ql/src/test/results/clientpositive/kafka/kafka_storage_handler.q.out 
b/ql/src/test/results/clientpositive/kafka/kafka_storage_handler.q.out
index a9a3f5b0bc4..c6396ca267b 100644
--- a/ql/src/test/results/clientpositive/kafka/kafka_storage_handler.q.out
+++ b/ql/src/test/results/clientpositive/kafka/kafka_storage_handler.q.out
@@ -225,8 +225,10 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
 10
 PREHOOK: query: Drop table kafka_table_offsets
 PREHOOK: type: DROPTABLE
+PREHOOK: Output: database:default
 POSTHOOK: query: Drop table kafka_table_offsets
 POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: database:default
 PREHOOK: query: create table kafka_table_offsets(partition_id int, max_offset 
bigint, insert_time timestamp)
 PREHOOK: type: CREATETABLE
 PREHOOK: Output: database:default
@@ -257,8 +259,10 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
 0      -1
 PREHOOK: query: Drop table orc_kafka_table
 PREHOOK: type: DROPTABLE
+PREHOOK: Output: database:default
 POSTHOOK: query: Drop table orc_kafka_table
 POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: database:default
 PREHOOK: query: Create table orc_kafka_table (partition_id int, row_offset 
bigint, kafka_ts bigint,
  `__time` timestamp , `page` string, `user` string, `language` string,
 `country` string,`continent` string, `namespace` string, `newPage` boolean, 
`unpatrolled` boolean,
@@ -435,10 +439,12 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
 PREHOOK: query: Drop table kafka_table_offsets
 PREHOOK: type: DROPTABLE
 PREHOOK: Input: default@kafka_table_offsets
+PREHOOK: Output: database:default
 PREHOOK: Output: default@kafka_table_offsets
 POSTHOOK: query: Drop table kafka_table_offsets
 POSTHOOK: type: DROPTABLE
 POSTHOOK: Input: default@kafka_table_offsets
+POSTHOOK: Output: database:default
 POSTHOOK: Output: default@kafka_table_offsets
 PREHOOK: query: create table kafka_table_offsets(partition_id int, max_offset 
bigint, insert_time timestamp)
 PREHOOK: type: CREATETABLE
@@ -451,10 +457,12 @@ POSTHOOK: Output: default@kafka_table_offsets
 PREHOOK: query: Drop table orc_kafka_table
 PREHOOK: type: DROPTABLE
 PREHOOK: Input: default@orc_kafka_table
+PREHOOK: Output: database:default
 PREHOOK: Output: default@orc_kafka_table
 POSTHOOK: query: Drop table orc_kafka_table
 POSTHOOK: type: DROPTABLE
 POSTHOOK: Input: default@orc_kafka_table
+POSTHOOK: Output: database:default
 POSTHOOK: Output: default@orc_kafka_table
 PREHOOK: query: Create table orc_kafka_table (partition_id int, row_offset 
bigint, kafka_ts bigint,
  `__time` timestamp , `page` string, `user` string, `language` string,
@@ -1077,7 +1085,7 @@ POSTHOOK: query: explain extended select distinct 
`__offset`, cast(`__timestamp`
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@wiki_kafka_avro_table
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-OPTIMIZED SQL: SELECT `__offset` AS `$f0`, CAST(`__timestamp` AS TIMESTAMP) AS 
`$f1`, `__key` AS `$f2`
+OPTIMIZED SQL: SELECT `__offset`, CAST(`__timestamp` AS TIMESTAMP) AS `_c1`, 
`__key`
 FROM `default`.`wiki_kafka_avro_table`
 GROUP BY `__offset`, CAST(`__timestamp` AS TIMESTAMP), `__key`
 STAGE DEPENDENCIES:
@@ -1408,7 +1416,7 @@ POSTHOOK: query: explain extended select distinct 
`__offset`, cast(`__timestamp`
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@wiki_kafka_avro_table
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-OPTIMIZED SQL: SELECT `__offset` AS `$f0`, CAST(`__timestamp` AS TIMESTAMP) AS 
`$f1`, `__key` AS `$f2`
+OPTIMIZED SQL: SELECT `__offset`, CAST(`__timestamp` AS TIMESTAMP) AS `_c1`, 
`__key`
 FROM `default`.`wiki_kafka_avro_table`
 GROUP BY `__offset`, CAST(`__timestamp` AS TIMESTAMP), `__key`
 STAGE DEPENDENCIES:
@@ -1569,72 +1577,12 @@ STAGE PLANS:
                     output format: 
org.apache.hadoop.hive.kafka.KafkaOutputFormat
                     properties:
                       EXTERNAL TRUE
-                      avro.schema.literal {
-  "type" : "record",
-  "name" : "Wikipedia",
-  "namespace" : "org.apache.hive.kafka",
-  "version": "1",
-  "fields" : [ {
-    "name" : "isrobot",
-    "type" : "boolean"
-  }, {
-    "name" : "channel",
-    "type" : "string"
-  }, {
-    "name" : "timestamp",
-    "type" : "string"
-  }, {
-    "name" : "flags",
-    "type" : "string"
-  }, {
-    "name" : "isunpatrolled",
-    "type" : "boolean"
-  }, {
-    "name" : "page",
-    "type" : "string"
-  }, {
-    "name" : "diffurl",
-    "type" : "string"
-  }, {
-    "name" : "added",
-    "type" : "long"
-  }, {
-    "name" : "comment",
-    "type" : "string"
-  }, {
-    "name" : "commentlength",
-    "type" : "long"
-  }, {
-    "name" : "isnew",
-    "type" : "boolean"
-  }, {
-    "name" : "isminor",
-    "type" : "boolean"
-  }, {
-    "name" : "delta",
-    "type" : "long"
-  }, {
-    "name" : "isanonymous",
-    "type" : "boolean"
-  }, {
-    "name" : "user",
-    "type" : "string"
-  }, {
-    "name" : "deltabucket",
-    "type" : "double"
-  }, {
-    "name" : "deleted",
-    "type" : "long"
-  }, {
-    "name" : "namespace",
-    "type" : "string"
-  } ]
-}
+                      avro.schema.literal 
{"type":"record","name":"Wikipedia","namespace":"org.apache.hive.kafka","fields":[{"name":"isrobot","type":"boolean"},{"name":"channel","type":"string"},{"name":"timestamp","type":"string"},{"name":"flags","type":"string"},{"name":"isunpatrolled","type":"boolean"},{"name":"page","type":"string"},{"name":"diffurl","type":"string"},{"name":"added","type":"long"},{"name":"comment","type":"string"},{"name":"commentlength","type":"long"},{"name":"isne
 [...]
                       bucketing_version 2
                       column.name.delimiter ,
-                      columns 
isrobot,channel,timestamp,flags,isunpatrolled,page,diffurl,added,comment,commentlength,isnew,isminor,delta,isanonymous,user,deltabucket,deleted,namespace,__key,__partition,__offset,__timestamp
+                      columns 
isrobot,channel,timestamp,flags,isunpatrolled,page,diffurl,added,comment,commentlength,isnew,isminor,delta,isanonymous,user,deltabucket,deleted,namespace
                       columns.comments 'from deserializer','from 
deserializer','from deserializer','from deserializer','from deserializer','from 
deserializer','from deserializer','from deserializer','from deserializer','from 
deserializer','from deserializer','from deserializer','from deserializer','from 
deserializer','from deserializer','from deserializer','from deserializer','from 
deserializer','from deserializer','from deserializer','from deserializer','from 
deserializer'
-                      columns.types 
boolean:string:string:string:boolean:string:string:bigint:string:bigint:boolean:boolean:bigint:boolean:string:double:bigint:string:binary:int:bigint:bigint
+                      columns.types 
boolean,string,string,string,boolean,string,string,bigint,string,bigint,boolean,boolean,bigint,boolean,string,double,bigint,string
 #### A masked pattern was here ####
                       hive.kafka.max.retries 6
                       hive.kafka.metadata.poll.timeout.ms 30000

Reply via email to