This is an automated email from the ASF dual-hosted git repository. cgivre pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new 78a52087bb DRILL-8527: Hive Limit Push Down (#2997) 78a52087bb is described below commit 78a52087bb85bcc7034157544193155b87503111 Author: shfshihuafeng <shfshihuaf...@163.com> AuthorDate: Thu Jul 10 22:28:49 2025 +0800 DRILL-8527: Hive Limit Push Down (#2997) --- .../org/apache/drill/exec/store/hive/HiveScan.java | 41 +++++++++++++++++--- .../drill/exec/store/hive/HiveStoragePlugin.java | 2 +- .../apache/drill/exec/store/hive/HiveSubScan.java | 12 +++++- .../hive/readers/HiveDefaultRecordReader.java | 8 ++-- .../store/hive/readers/HiveTextRecordReader.java | 4 +- .../store/hive/readers/ReadersInitializer.java | 9 ++--- .../apache/drill/exec/hive/TestHivePushDown.java | 45 ++++++++++++++++++++++ 7 files changed, 103 insertions(+), 18 deletions(-) diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java index 3fd1258d02..f26f68eb25 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java @@ -65,7 +65,7 @@ public class HiveScan extends AbstractGroupScan { private final HiveReadEntry hiveReadEntry; private final HiveMetadataProvider metadataProvider; private final Map<String, String> confProperties; - + private final int maxRecords; private List<List<LogicalInputSplit>> mappings; private List<LogicalInputSplit> inputSplits; @@ -77,21 +77,23 @@ public class HiveScan extends AbstractGroupScan { @JsonProperty("hiveStoragePluginConfig") final HiveStoragePluginConfig hiveStoragePluginConfig, @JsonProperty("columns") final List<SchemaPath> columns, @JsonProperty("confProperties") final Map<String, String> confProperties, + @JsonProperty("maxRecords") final int maxRecords, @JacksonInject final StoragePluginRegistry pluginRegistry) throws ExecutionSetupException { this(userName, hiveReadEntry, pluginRegistry.resolve(hiveStoragePluginConfig, HiveStoragePlugin.class), columns, - null, confProperties); + null, confProperties, maxRecords); } public HiveScan(final String userName, final HiveReadEntry hiveReadEntry, final HiveStoragePlugin hiveStoragePlugin, - final List<SchemaPath> columns, final HiveMetadataProvider metadataProvider, final Map<String, String> confProperties) throws ExecutionSetupException { + final List<SchemaPath> columns, final HiveMetadataProvider metadataProvider, final Map<String, String> confProperties, int maxRecords) throws ExecutionSetupException { super(userName); this.hiveReadEntry = hiveReadEntry; this.columns = columns; this.hiveStoragePlugin = hiveStoragePlugin; this.confProperties = confProperties; + this.maxRecords = maxRecords; if (metadataProvider == null) { this.metadataProvider = new HiveMetadataProvider(userName, hiveReadEntry, getHiveConf()); } else { @@ -106,10 +108,20 @@ public class HiveScan extends AbstractGroupScan { this.hiveStoragePlugin = that.hiveStoragePlugin; this.metadataProvider = that.metadataProvider; this.confProperties = that.confProperties; + this.maxRecords = that.maxRecords; + } + public HiveScan(final HiveScan that, int maxRecords) { + super(that); + this.columns = that.columns; + this.hiveReadEntry = that.hiveReadEntry; + this.hiveStoragePlugin = that.hiveStoragePlugin; + this.metadataProvider = that.metadataProvider; + this.confProperties = that.confProperties; + this.maxRecords = maxRecords; } public HiveScan clone(final HiveReadEntry hiveReadEntry) throws ExecutionSetupException { - return new HiveScan(getUserName(), hiveReadEntry, hiveStoragePlugin, columns, metadataProvider, confProperties); + return new HiveScan(getUserName(), hiveReadEntry, hiveStoragePlugin, columns, metadataProvider, confProperties, maxRecords); } @JsonProperty @@ -133,6 +145,11 @@ public class HiveScan extends AbstractGroupScan { return confProperties; } + @JsonProperty + public int getMaxRecords() { + return maxRecords; + } + @JsonIgnore public HiveStoragePlugin getStoragePlugin() { return hiveStoragePlugin; @@ -167,6 +184,19 @@ public class HiveScan extends AbstractGroupScan { } } + @Override + public GroupScan applyLimit(int maxRecords) { + if (maxRecords == this.maxRecords){ + return null; + } + return new HiveScan(this, maxRecords); + } + + @Override + public boolean supportsLimitPushdown() { + return true; + } + @Override public SubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupException { try { @@ -189,7 +219,7 @@ public class HiveScan extends AbstractGroupScan { } final HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.getTableWrapper(), parts); - return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, splitTypes, columns, hiveStoragePlugin, confProperties); + return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, splitTypes, columns, hiveStoragePlugin, maxRecords, confProperties); } catch (IOException | ReflectiveOperationException e) { throw new ExecutionSetupException(e); } @@ -271,6 +301,7 @@ public class HiveScan extends AbstractGroupScan { + ", partitions= " + partitions + ", inputDirectories=" + metadataProvider.getInputDirectories(hiveReadEntry) + ", confProperties=" + confProperties + + ", maxRecords=" + maxRecords + "]"; } diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java index 997d0e39cb..a85159fe1e 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java @@ -113,7 +113,7 @@ public class HiveStoragePlugin extends AbstractStoragePlugin { } } - return new HiveScan(userName, hiveReadEntry, this, columns, null, confProperties); + return new HiveScan(userName, hiveReadEntry, this, columns, null, confProperties, -1); } catch (ExecutionSetupException e) { throw new IOException(e); } diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java index b3ec4de509..f49fcd0dd0 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java @@ -58,7 +58,7 @@ public class HiveSubScan extends AbstractBase implements SubScan { private final List<HivePartition> partitions; private final List<SchemaPath> columns; private final Map<String, String> confProperties; - + private final int maxRecords; @JsonCreator public HiveSubScan(@JacksonInject StoragePluginRegistry registry, @JsonProperty("userName") String userName, @@ -67,6 +67,7 @@ public class HiveSubScan extends AbstractBase implements SubScan { @JsonProperty("splitClasses") List<String> splitClasses, @JsonProperty("columns") List<SchemaPath> columns, @JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig hiveStoragePluginConfig, + @JsonProperty("maxRecords") int maxRecords, @JsonProperty("confProperties") Map<String, String> confProperties) throws IOException, ExecutionSetupException, ReflectiveOperationException { this(userName, @@ -75,6 +76,7 @@ public class HiveSubScan extends AbstractBase implements SubScan { splitClasses, columns, registry.resolve(hiveStoragePluginConfig, HiveStoragePlugin.class), + maxRecords, confProperties); } @@ -84,6 +86,7 @@ public class HiveSubScan extends AbstractBase implements SubScan { final List<String> splitClasses, final List<SchemaPath> columns, final HiveStoragePlugin hiveStoragePlugin, + final Integer maxRecords, final Map<String, String> confProperties) throws IOException, ReflectiveOperationException { super(userName); @@ -94,6 +97,7 @@ public class HiveSubScan extends AbstractBase implements SubScan { this.splitClasses = splitClasses; this.columns = columns; this.hiveStoragePlugin = hiveStoragePlugin; + this.maxRecords = maxRecords; this.confProperties = confProperties; for (int i = 0; i < splits.size(); i++) { @@ -121,6 +125,10 @@ public class HiveSubScan extends AbstractBase implements SubScan { return columns; } + public int getMaxRecords() { + return maxRecords; + } + @JsonProperty public HiveStoragePluginConfig getHiveStoragePluginConfig() { return hiveStoragePlugin.getConfig(); @@ -164,7 +172,7 @@ public class HiveSubScan extends AbstractBase implements SubScan { @Override public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { try { - return new HiveSubScan(getUserName(), splits, hiveReadEntry, splitClasses, columns, hiveStoragePlugin, confProperties); + return new HiveSubScan(getUserName(), splits, hiveReadEntry, splitClasses, columns, hiveStoragePlugin, maxRecords, confProperties); } catch (IOException | ReflectiveOperationException e) { throw new ExecutionSetupException(e); } diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveDefaultRecordReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveDefaultRecordReader.java index 0e5d54ef13..0605b42b92 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveDefaultRecordReader.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveDefaultRecordReader.java @@ -217,7 +217,7 @@ public class HiveDefaultRecordReader extends AbstractRecordReader { */ private StructField[] selectedStructFieldRefs; - + private final int maxRecords; /** * Readers constructor called by initializer. * @@ -231,7 +231,7 @@ public class HiveDefaultRecordReader extends AbstractRecordReader { */ public HiveDefaultRecordReader(HiveTableWithColumnCache table, HivePartition partition, Collection<InputSplit> inputSplits, List<SchemaPath> projectedColumns, - FragmentContext context, HiveConf hiveConf, UserGroupInformation proxyUgi) { + FragmentContext context, HiveConf hiveConf, UserGroupInformation proxyUgi, int maxRecords) { this.hiveTable = table; this.partition = partition; this.hiveConf = hiveConf; @@ -243,6 +243,7 @@ public class HiveDefaultRecordReader extends AbstractRecordReader { this.partitionValues = new Object[0]; setColumns(projectedColumns); this.fragmentContext = context; + this.maxRecords = maxRecords; } @Override @@ -396,7 +397,8 @@ public class HiveDefaultRecordReader extends AbstractRecordReader { try { int recordCount; - for (recordCount = 0; (recordCount < TARGET_RECORD_COUNT && hasNextValue(valueHolder)); recordCount++) { + int record = maxRecords > 0 ? maxRecords : TARGET_RECORD_COUNT; + for (recordCount = 0; (recordCount < record && hasNextValue(valueHolder)); recordCount++) { Object deserializedHiveRecord = partitionToTableSchemaConverter.convert(partitionDeserializer.deserialize((Writable) valueHolder)); outputWriter.setPosition(recordCount); readHiveRecordAndInsertIntoRecordBatch(deserializedHiveRecord); diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveTextRecordReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveTextRecordReader.java index dadd3bd8a5..cb873c78b0 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveTextRecordReader.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveTextRecordReader.java @@ -58,8 +58,8 @@ public class HiveTextRecordReader extends HiveDefaultRecordReader { */ public HiveTextRecordReader(HiveTableWithColumnCache table, HivePartition partition, Collection<InputSplit> inputSplits, List<SchemaPath> projectedColumns, - FragmentContext context, HiveConf hiveConf, UserGroupInformation proxyUgi) { - super(table, partition, inputSplits, projectedColumns, context, hiveConf, proxyUgi); + FragmentContext context, HiveConf hiveConf, UserGroupInformation proxyUgi, int maxRecords) { + super(table, partition, inputSplits, projectedColumns, context, hiveConf, proxyUgi, maxRecords); } @Override diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/ReadersInitializer.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/ReadersInitializer.java index fc3d548086..15faccfecc 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/ReadersInitializer.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/ReadersInitializer.java @@ -56,10 +56,10 @@ public class ReadersInitializer { final UserGroupInformation proxyUgi = ImpersonationUtil.createProxyUgi(config.getUserName(), ctx.getQueryUserName()); final List<List<InputSplit>> inputSplits = config.getInputSplits(); final HiveConf hiveConf = config.getHiveConf(); - + final int maxRecords = config.getMaxRecords(); if (inputSplits.isEmpty()) { return Collections.singletonList( - readerFactory.createReader(config.getTable(), null /*partition*/, null /*split*/, config.getColumns(), ctx, hiveConf, proxyUgi) + readerFactory.createReader(config.getTable(), null /*partition*/, null /*split*/, config.getColumns(), ctx, hiveConf, proxyUgi, maxRecords) ); } else { IndexedPartitions partitions = getPartitions(config); @@ -70,7 +70,7 @@ public class ReadersInitializer { partitions.get(idx), inputSplits.get(idx), config.getColumns(), - ctx, hiveConf, proxyUgi)) + ctx, hiveConf, proxyUgi, maxRecords)) .collect(Collectors.toList()); } } @@ -109,8 +109,7 @@ public class ReadersInitializer { RecordReader createReader(HiveTableWithColumnCache table, HivePartition partition, Collection<InputSplit> inputSplits, List<SchemaPath> projectedColumns, - FragmentContext context, HiveConf hiveConf, UserGroupInformation proxyUgi); - + FragmentContext context, HiveConf hiveConf, UserGroupInformation proxyUgi, int maxRecords); } /** diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHivePushDown.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHivePushDown.java new file mode 100644 index 0000000000..097bc8d863 --- /dev/null +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHivePushDown.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.hive; + +import org.apache.drill.categories.HiveStorageTest; +import org.apache.drill.categories.SlowTest; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; + +import static org.junit.Assert.assertEquals; + + +@Category({SlowTest.class, HiveStorageTest.class}) +public class TestHivePushDown extends HiveTestBase { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testLimitPushDown() throws Exception { + String query = "SELECT * FROM hive.`default`.kv LIMIT 1"; + + int actualRowCount = testSql(query); + assertEquals("Expected and actual row count should match", 1, actualRowCount); + + testPlanMatchingPatterns(query, new String[]{"LIMIT"}, new String[]{"maxRecords=1"}); + } +}