This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 78a52087bb DRILL-8527: Hive Limit Push Down (#2997)
78a52087bb is described below

commit 78a52087bb85bcc7034157544193155b87503111
Author: shfshihuafeng <shfshihuaf...@163.com>
AuthorDate: Thu Jul 10 22:28:49 2025 +0800

    DRILL-8527: Hive Limit Push Down (#2997)
---
 .../org/apache/drill/exec/store/hive/HiveScan.java | 41 +++++++++++++++++---
 .../drill/exec/store/hive/HiveStoragePlugin.java   |  2 +-
 .../apache/drill/exec/store/hive/HiveSubScan.java  | 12 +++++-
 .../hive/readers/HiveDefaultRecordReader.java      |  8 ++--
 .../store/hive/readers/HiveTextRecordReader.java   |  4 +-
 .../store/hive/readers/ReadersInitializer.java     |  9 ++---
 .../apache/drill/exec/hive/TestHivePushDown.java   | 45 ++++++++++++++++++++++
 7 files changed, 103 insertions(+), 18 deletions(-)

diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index 3fd1258d02..f26f68eb25 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -65,7 +65,7 @@ public class HiveScan extends AbstractGroupScan {
   private final HiveReadEntry hiveReadEntry;
   private final HiveMetadataProvider metadataProvider;
   private final Map<String, String> confProperties;
-
+  private final int maxRecords;
   private List<List<LogicalInputSplit>> mappings;
   private List<LogicalInputSplit> inputSplits;
 
@@ -77,21 +77,23 @@ public class HiveScan extends AbstractGroupScan {
                   @JsonProperty("hiveStoragePluginConfig") final 
HiveStoragePluginConfig hiveStoragePluginConfig,
                   @JsonProperty("columns") final List<SchemaPath> columns,
                   @JsonProperty("confProperties") final Map<String, String> 
confProperties,
+                  @JsonProperty("maxRecords") final int maxRecords,
                   @JacksonInject final StoragePluginRegistry pluginRegistry) 
throws ExecutionSetupException {
     this(userName,
         hiveReadEntry,
         pluginRegistry.resolve(hiveStoragePluginConfig, 
HiveStoragePlugin.class),
         columns,
-        null, confProperties);
+        null, confProperties, maxRecords);
   }
 
   public HiveScan(final String userName, final HiveReadEntry hiveReadEntry, 
final HiveStoragePlugin hiveStoragePlugin,
-                  final List<SchemaPath> columns, final HiveMetadataProvider 
metadataProvider, final Map<String, String> confProperties) throws 
ExecutionSetupException {
+                  final List<SchemaPath> columns, final HiveMetadataProvider 
metadataProvider, final Map<String, String> confProperties, int maxRecords) 
throws ExecutionSetupException {
     super(userName);
     this.hiveReadEntry = hiveReadEntry;
     this.columns = columns;
     this.hiveStoragePlugin = hiveStoragePlugin;
     this.confProperties = confProperties;
+    this.maxRecords = maxRecords;
     if (metadataProvider == null) {
       this.metadataProvider = new HiveMetadataProvider(userName, 
hiveReadEntry, getHiveConf());
     } else {
@@ -106,10 +108,20 @@ public class HiveScan extends AbstractGroupScan {
     this.hiveStoragePlugin = that.hiveStoragePlugin;
     this.metadataProvider = that.metadataProvider;
     this.confProperties = that.confProperties;
+    this.maxRecords = that.maxRecords;
+  }
+  public HiveScan(final HiveScan that, int maxRecords) {
+    super(that);
+    this.columns = that.columns;
+    this.hiveReadEntry = that.hiveReadEntry;
+    this.hiveStoragePlugin = that.hiveStoragePlugin;
+    this.metadataProvider = that.metadataProvider;
+    this.confProperties = that.confProperties;
+    this.maxRecords = maxRecords;
   }
 
   public HiveScan clone(final HiveReadEntry hiveReadEntry) throws 
ExecutionSetupException {
-    return new HiveScan(getUserName(), hiveReadEntry, hiveStoragePlugin, 
columns, metadataProvider, confProperties);
+    return new HiveScan(getUserName(), hiveReadEntry, hiveStoragePlugin, 
columns, metadataProvider, confProperties, maxRecords);
   }
 
   @JsonProperty
@@ -133,6 +145,11 @@ public class HiveScan extends AbstractGroupScan {
     return confProperties;
   }
 
+  @JsonProperty
+  public int getMaxRecords() {
+    return maxRecords;
+  }
+
   @JsonIgnore
   public HiveStoragePlugin getStoragePlugin() {
     return hiveStoragePlugin;
@@ -167,6 +184,19 @@ public class HiveScan extends AbstractGroupScan {
     }
   }
 
+  @Override
+  public GroupScan applyLimit(int maxRecords) {
+    if (maxRecords == this.maxRecords){
+      return null;
+    }
+      return new HiveScan(this, maxRecords);
+  }
+
+  @Override
+  public boolean supportsLimitPushdown() {
+    return true;
+  }
+
   @Override
   public SubScan getSpecificScan(final int minorFragmentId) throws 
ExecutionSetupException {
     try {
@@ -189,7 +219,7 @@ public class HiveScan extends AbstractGroupScan {
       }
 
       final HiveReadEntry subEntry = new 
HiveReadEntry(hiveReadEntry.getTableWrapper(), parts);
-      return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, 
splitTypes, columns, hiveStoragePlugin, confProperties);
+      return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, 
splitTypes, columns, hiveStoragePlugin, maxRecords, confProperties);
     } catch (IOException | ReflectiveOperationException e) {
       throw new ExecutionSetupException(e);
     }
@@ -271,6 +301,7 @@ public class HiveScan extends AbstractGroupScan {
         + ", partitions= " + partitions
         + ", inputDirectories=" + 
metadataProvider.getInputDirectories(hiveReadEntry)
         + ", confProperties=" + confProperties
+        + ", maxRecords=" + maxRecords
         + "]";
   }
 
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index 997d0e39cb..a85159fe1e 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -113,7 +113,7 @@ public class HiveStoragePlugin extends 
AbstractStoragePlugin {
         }
       }
 
-      return new HiveScan(userName, hiveReadEntry, this, columns, null, 
confProperties);
+      return new HiveScan(userName, hiveReadEntry, this, columns, null, 
confProperties, -1);
     } catch (ExecutionSetupException e) {
       throw new IOException(e);
     }
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
index b3ec4de509..f49fcd0dd0 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
@@ -58,7 +58,7 @@ public class HiveSubScan extends AbstractBase implements 
SubScan {
   private final List<HivePartition> partitions;
   private final List<SchemaPath> columns;
   private final Map<String, String> confProperties;
-
+  private final int maxRecords;
   @JsonCreator
   public HiveSubScan(@JacksonInject StoragePluginRegistry registry,
                      @JsonProperty("userName") String userName,
@@ -67,6 +67,7 @@ public class HiveSubScan extends AbstractBase implements 
SubScan {
                      @JsonProperty("splitClasses") List<String> splitClasses,
                      @JsonProperty("columns") List<SchemaPath> columns,
                      @JsonProperty("hiveStoragePluginConfig") 
HiveStoragePluginConfig hiveStoragePluginConfig,
+                     @JsonProperty("maxRecords") int maxRecords,
                      @JsonProperty("confProperties") Map<String, String> 
confProperties)
       throws IOException, ExecutionSetupException, 
ReflectiveOperationException {
     this(userName,
@@ -75,6 +76,7 @@ public class HiveSubScan extends AbstractBase implements 
SubScan {
         splitClasses,
         columns,
         registry.resolve(hiveStoragePluginConfig, HiveStoragePlugin.class),
+        maxRecords,
         confProperties);
   }
 
@@ -84,6 +86,7 @@ public class HiveSubScan extends AbstractBase implements 
SubScan {
                      final List<String> splitClasses,
                      final List<SchemaPath> columns,
                      final HiveStoragePlugin hiveStoragePlugin,
+                     final Integer maxRecords,
                      final Map<String, String> confProperties)
     throws IOException, ReflectiveOperationException {
     super(userName);
@@ -94,6 +97,7 @@ public class HiveSubScan extends AbstractBase implements 
SubScan {
     this.splitClasses = splitClasses;
     this.columns = columns;
     this.hiveStoragePlugin = hiveStoragePlugin;
+    this.maxRecords = maxRecords;
     this.confProperties = confProperties;
 
     for (int i = 0; i < splits.size(); i++) {
@@ -121,6 +125,10 @@ public class HiveSubScan extends AbstractBase implements 
SubScan {
     return columns;
   }
 
+  public int getMaxRecords() {
+    return maxRecords;
+  }
+
   @JsonProperty
   public HiveStoragePluginConfig getHiveStoragePluginConfig() {
     return hiveStoragePlugin.getConfig();
@@ -164,7 +172,7 @@ public class HiveSubScan extends AbstractBase implements 
SubScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) 
throws ExecutionSetupException {
     try {
-      return new HiveSubScan(getUserName(), splits, hiveReadEntry, 
splitClasses, columns, hiveStoragePlugin, confProperties);
+      return new HiveSubScan(getUserName(), splits, hiveReadEntry, 
splitClasses, columns, hiveStoragePlugin, maxRecords, confProperties);
     } catch (IOException | ReflectiveOperationException e) {
       throw new ExecutionSetupException(e);
     }
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveDefaultRecordReader.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveDefaultRecordReader.java
index 0e5d54ef13..0605b42b92 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveDefaultRecordReader.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveDefaultRecordReader.java
@@ -217,7 +217,7 @@ public class HiveDefaultRecordReader extends 
AbstractRecordReader {
    */
   private StructField[] selectedStructFieldRefs;
 
-
+  private final int maxRecords;
   /**
    * Readers constructor called by initializer.
    *
@@ -231,7 +231,7 @@ public class HiveDefaultRecordReader extends 
AbstractRecordReader {
    */
   public HiveDefaultRecordReader(HiveTableWithColumnCache table, HivePartition 
partition,
                                  Collection<InputSplit> inputSplits, 
List<SchemaPath> projectedColumns,
-                                 FragmentContext context, HiveConf hiveConf, 
UserGroupInformation proxyUgi) {
+                                 FragmentContext context, HiveConf hiveConf, 
UserGroupInformation proxyUgi, int maxRecords) {
     this.hiveTable = table;
     this.partition = partition;
     this.hiveConf = hiveConf;
@@ -243,6 +243,7 @@ public class HiveDefaultRecordReader extends 
AbstractRecordReader {
     this.partitionValues = new Object[0];
     setColumns(projectedColumns);
     this.fragmentContext = context;
+    this.maxRecords = maxRecords;
   }
 
   @Override
@@ -396,7 +397,8 @@ public class HiveDefaultRecordReader extends 
AbstractRecordReader {
 
     try {
       int recordCount;
-      for (recordCount = 0; (recordCount < TARGET_RECORD_COUNT && 
hasNextValue(valueHolder)); recordCount++) {
+      int record = maxRecords > 0 ? maxRecords : TARGET_RECORD_COUNT;
+      for (recordCount = 0; (recordCount < record && 
hasNextValue(valueHolder)); recordCount++) {
         Object deserializedHiveRecord = 
partitionToTableSchemaConverter.convert(partitionDeserializer.deserialize((Writable)
 valueHolder));
         outputWriter.setPosition(recordCount);
         readHiveRecordAndInsertIntoRecordBatch(deserializedHiveRecord);
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveTextRecordReader.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveTextRecordReader.java
index dadd3bd8a5..cb873c78b0 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveTextRecordReader.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveTextRecordReader.java
@@ -58,8 +58,8 @@ public class HiveTextRecordReader extends 
HiveDefaultRecordReader {
    */
   public HiveTextRecordReader(HiveTableWithColumnCache table, HivePartition 
partition,
                               Collection<InputSplit> inputSplits, 
List<SchemaPath> projectedColumns,
-                              FragmentContext context, HiveConf hiveConf, 
UserGroupInformation proxyUgi) {
-    super(table, partition, inputSplits, projectedColumns, context, hiveConf, 
proxyUgi);
+                              FragmentContext context, HiveConf hiveConf, 
UserGroupInformation proxyUgi, int maxRecords) {
+    super(table, partition, inputSplits, projectedColumns, context, hiveConf, 
proxyUgi, maxRecords);
   }
 
   @Override
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/ReadersInitializer.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/ReadersInitializer.java
index fc3d548086..15faccfecc 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/ReadersInitializer.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/ReadersInitializer.java
@@ -56,10 +56,10 @@ public class ReadersInitializer {
     final UserGroupInformation proxyUgi = 
ImpersonationUtil.createProxyUgi(config.getUserName(), ctx.getQueryUserName());
     final List<List<InputSplit>> inputSplits = config.getInputSplits();
     final HiveConf hiveConf = config.getHiveConf();
-
+    final int maxRecords = config.getMaxRecords();
     if (inputSplits.isEmpty()) {
       return Collections.singletonList(
-          readerFactory.createReader(config.getTable(), null /*partition*/, 
null /*split*/, config.getColumns(), ctx, hiveConf, proxyUgi)
+          readerFactory.createReader(config.getTable(), null /*partition*/, 
null /*split*/, config.getColumns(), ctx, hiveConf, proxyUgi, maxRecords)
       );
     } else {
       IndexedPartitions partitions = getPartitions(config);
@@ -70,7 +70,7 @@ public class ReadersInitializer {
                   partitions.get(idx),
                   inputSplits.get(idx),
                   config.getColumns(),
-                  ctx, hiveConf, proxyUgi))
+                  ctx, hiveConf, proxyUgi, maxRecords))
           .collect(Collectors.toList());
     }
   }
@@ -109,8 +109,7 @@ public class ReadersInitializer {
 
     RecordReader createReader(HiveTableWithColumnCache table, HivePartition 
partition,
                               Collection<InputSplit> inputSplits, 
List<SchemaPath> projectedColumns,
-                              FragmentContext context, HiveConf hiveConf, 
UserGroupInformation proxyUgi);
-
+                              FragmentContext context, HiveConf hiveConf, 
UserGroupInformation proxyUgi, int maxRecords);
   }
 
   /**
diff --git 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHivePushDown.java
 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHivePushDown.java
new file mode 100644
index 0000000000..097bc8d863
--- /dev/null
+++ 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHivePushDown.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.hive;
+
+import org.apache.drill.categories.HiveStorageTest;
+import org.apache.drill.categories.SlowTest;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.assertEquals;
+
+
+@Category({SlowTest.class, HiveStorageTest.class})
+public class TestHivePushDown extends HiveTestBase {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testLimitPushDown() throws Exception {
+    String query = "SELECT * FROM hive.`default`.kv LIMIT 1";
+
+    int actualRowCount = testSql(query);
+    assertEquals("Expected and actual row count should match", 1, 
actualRowCount);
+
+    testPlanMatchingPatterns(query, new String[]{"LIMIT"}, new 
String[]{"maxRecords=1"});
+  }
+}

Reply via email to