This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 5a3b1785c1 DRILL-8353: Format plugin for Delta Lake (#2702)
5a3b1785c1 is described below
commit 5a3b1785c119a5e3986b6cb269a48ba76db94a3b
Author: Volodymyr Vysotskyi <[email protected]>
AuthorDate: Mon Nov 21 17:28:29 2022 +0200
DRILL-8353: Format plugin for Delta Lake (#2702)
---
contrib/format-deltalake/README.md | 36 ++
contrib/format-deltalake/pom.xml | 73 +++
.../drill/exec/store/delta/DeltaGroupScan.java | 579 +++++++++++++++++++++
.../delta/DeltaParquetTableMetadataProvider.java | 81 +++
.../drill/exec/store/delta/DeltaRowGroupScan.java | 141 +++++
.../store/delta/format/DeltaFormatMatcher.java | 68 +++
.../exec/store/delta/format/DeltaFormatPlugin.java | 233 +++++++++
.../delta/format/DeltaFormatPluginConfig.java | 32 ++
.../store/delta/plan/DeltaPluginImplementor.java | 227 ++++++++
.../delta/plan/DrillExprToDeltaTranslator.java | 246 +++++++++
.../store/delta/read/DeltaScanBatchCreator.java | 65 +++
.../main/resources/bootstrap-format-plugins.json | 20 +
.../src/main/resources/drill-module.conf | 24 +
.../drill/exec/store/delta/DeltaQueriesTest.java | 196 +++++++
...-4628-8462-cc9c56edfebb-c000.snappy.parquet.crc | Bin 0 -> 20 bytes
...-449a-8f93-92a40d9f065d-c000.snappy.parquet.crc | Bin 0 -> 20 bytes
.../_delta_log/00000000000000000000.json | 5 +
...9219-4628-8462-cc9c56edfebb-c000.snappy.parquet | Bin 0 -> 1432 bytes
...3b5b-449a-8f93-92a40d9f065d-c000.snappy.parquet | Bin 0 -> 1439 bytes
.../_delta_log/00000000000000000000.json | 6 +
...-4363-bd87-19cfb3403e9a.c000.snappy.parquet.crc | Bin 0 -> 24 bytes
...0134-4363-bd87-19cfb3403e9a.c000.snappy.parquet | Bin 0 -> 1944 bytes
...-40e7-adbe-60920680770f.c000.snappy.parquet.crc | Bin 0 -> 24 bytes
...a9f5-40e7-adbe-60920680770f.c000.snappy.parquet | Bin 0 -> 1944 bytes
...-43cf-9acb-0fbed63e011c.c000.snappy.parquet.crc | Bin 0 -> 24 bytes
...385b-43cf-9acb-0fbed63e011c.c000.snappy.parquet | Bin 0 -> 1944 bytes
...-4e7b-96a1-e2bb0f246b06-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
...-4475-a810-fbd8e7994971-c000.snappy.parquet.crc | Bin 0 -> 32 bytes
.../_delta_log/00000000000000000000.json | 5 +
...50b3-4e7b-96a1-e2bb0f246b06-c000.snappy.parquet | Bin 0 -> 2482 bytes
...36c2-4475-a810-fbd8e7994971-c000.snappy.parquet | Bin 0 -> 2568 bytes
contrib/pom.xml | 3 +-
distribution/pom.xml | 5 +
distribution/src/assemble/component.xml | 1 +
.../apache/drill/exec/expr/StatisticsProvider.java | 79 ++-
.../base/AbstractGroupScanWithMetadata.java | 22 +-
.../exec/planner/logical/DrillConstExecutor.java | 4 +
.../store/parquet/AbstractParquetGroupScan.java | 2 +-
.../store/parquet/AbstractParquetRowGroupScan.java | 4 +
.../parquet/AbstractParquetScanBatchCreator.java | 15 +-
.../exec/store/parquet/FilterEvaluatorUtils.java | 7 +-
.../store/parquet/ParquetScanBatchCreator.java | 4 +-
.../exec/store/plan/AbstractPluginImplementor.java | 5 +
.../drill/exec/store/plan/PluginImplementor.java | 8 +
.../exec/store/plan/rule/PluginFilterRule.java | 18 +-
.../drill/exec/store/plan/rule/PluginJoinRule.java | 2 +-
46 files changed, 2173 insertions(+), 43 deletions(-)
diff --git a/contrib/format-deltalake/README.md
b/contrib/format-deltalake/README.md
new file mode 100644
index 0000000000..e4cbaef529
--- /dev/null
+++ b/contrib/format-deltalake/README.md
@@ -0,0 +1,36 @@
+# Delta Lake format plugin
+
+This format plugin enables Drill to query Delta Lake tables.
+
+## Supported optimizations and features
+
+### Project pushdown
+
+This format plugin supports project and filter pushdown optimizations.
+
+For the case of project pushdown, only columns specified in the query will be
read, even when they are nested columns.
+
+### Filter pushdown
+
+For the case of filter pushdown, all expressions supported by Delta Lake API
will be pushed down, so only data that
+matches the filter expression will be read. Additionally, filtering logic for
parquet files is enabled
+to allow pruning of parquet files that do not match the filter expression.
+
+## Configuration
+
+The format plugin has the following configuration options:
+
+- `type` - format plugin type, should be `'delta'`
+
+### Format config example:
+
+```json
+{
+ "type": "file",
+ "formats": {
+ "delta": {
+ "type": "delta"
+ }
+ }
+}
+```
diff --git a/contrib/format-deltalake/pom.xml b/contrib/format-deltalake/pom.xml
new file mode 100644
index 0000000000..5f0e31f870
--- /dev/null
+++ b/contrib/format-deltalake/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>drill-contrib-parent</artifactId>
+ <groupId>org.apache.drill.contrib</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>drill-deltalake-format</artifactId>
+
+ <name>Drill : Contrib : Format : Delta Lake</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>drill-java-exec</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.delta</groupId>
+ <artifactId>delta-storage</artifactId>
+ <version>2.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>io.delta</groupId>
+ <artifactId>delta-standalone_2.13</artifactId>
+ <version>0.5.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-avro</artifactId>
+ <version>${parquet.version}</version>
+ </dependency>
+
+ <!-- Test dependency -->
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>drill-java-exec</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill</groupId>
+ <artifactId>drill-common</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git
a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaGroupScan.java
b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaGroupScan.java
new file mode 100644
index 0000000000..2130bea075
--- /dev/null
+++
b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaGroupScan.java
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.DeltaScan;
+import io.delta.standalone.Snapshot;
+import io.delta.standalone.actions.AddFile;
+import io.delta.standalone.data.CloseableIterator;
+import io.delta.standalone.types.ArrayType;
+import io.delta.standalone.types.BinaryType;
+import io.delta.standalone.types.BooleanType;
+import io.delta.standalone.types.ByteType;
+import io.delta.standalone.types.DataType;
+import io.delta.standalone.types.DateType;
+import io.delta.standalone.types.DecimalType;
+import io.delta.standalone.types.DoubleType;
+import io.delta.standalone.types.FloatType;
+import io.delta.standalone.types.IntegerType;
+import io.delta.standalone.types.LongType;
+import io.delta.standalone.types.ShortType;
+import io.delta.standalone.types.StringType;
+import io.delta.standalone.types.StructField;
+import io.delta.standalone.types.StructType;
+import io.delta.standalone.types.TimestampType;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.expr.StatisticsProvider;
+import org.apache.drill.exec.expr.fn.impl.DateUtility;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
+import org.apache.drill.exec.metastore.store.FileSystemMetadataProviderManager;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.TupleBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.delta.format.DeltaFormatPlugin;
+import org.apache.drill.exec.store.delta.plan.DrillExprToDeltaTranslator;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
+import org.apache.drill.exec.store.parquet.AbstractParquetGroupScan;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
+import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.metastore.metadata.LocationProvider;
+import org.apache.drill.metastore.metadata.Metadata;
+import org.apache.drill.metastore.statistics.ColumnStatistics;
+import org.apache.drill.metastore.statistics.TableStatisticsKind;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+@JsonTypeName("delta-scan")
+public class DeltaGroupScan extends AbstractParquetGroupScan {
+
+ private final DeltaFormatPlugin formatPlugin;
+
+ private final String path;
+
+ private final TupleMetadata schema;
+
+ private final LogicalExpression condition;
+
+ private final DrillFileSystem fs;
+
+ private List<AddFile> addFiles;
+
+ private List<EndpointAffinity> endpointAffinities;
+
+ private final Map<Path, Map<String, String>> partitionHolder;
+
+ @JsonCreator
+ public DeltaGroupScan(
+ @JsonProperty("userName") String userName,
+ @JsonProperty("entries") List<ReadEntryWithPath> entries,
+ @JsonProperty("storage") StoragePluginConfig storageConfig,
+ @JsonProperty("format") FormatPluginConfig formatConfig,
+ @JsonProperty("columns") List<SchemaPath> columns,
+ @JsonProperty("schema") TupleMetadata schema,
+ @JsonProperty("path") String path,
+ @JsonProperty("readerConfig") ParquetReaderConfig readerConfig,
+ @JsonProperty("condition") LogicalExpression condition,
+ @JsonProperty("limit") Integer limit,
+ @JsonProperty("partitionHolder") Map<Path, Map<String, String>>
partitionHolder,
+ @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException {
+ super(ImpersonationUtil.resolveUserName(userName), columns, entries,
readerConfig, condition);
+ this.formatPlugin = pluginRegistry.resolveFormat(storageConfig,
formatConfig, DeltaFormatPlugin.class);
+ this.columns = columns;
+ this.path = path;
+ this.schema = schema;
+ this.condition = condition;
+ this.limit = limit;
+ this.fs = ImpersonationUtil.createFileSystem(
+ ImpersonationUtil.resolveUserName(userName), formatPlugin.getFsConf());
+
+ DeltaParquetTableMetadataProvider metadataProvider =
+ defaultTableMetadataProviderBuilder(new
FileSystemMetadataProviderManager())
+ .withEntries(entries)
+ .withFormatPlugin(formatPlugin)
+ .withReaderConfig(readerConfig)
+ .withSchema(schema)
+ .build();
+
+ this.metadataProvider = metadataProvider;
+ this.entries = metadataProvider.getEntries();
+ this.partitionHolder = partitionHolder;
+ this.fileSet = metadataProvider.getFileSet();
+
+ init();
+ }
+
+ private DeltaGroupScan(DeltaGroupScanBuilder builder) throws IOException {
+ super(ImpersonationUtil.resolveUserName(builder.userName), builder.columns,
+ builder.entries, builder.readerConfig, builder.condition);
+ this.formatPlugin = builder.formatPlugin;
+ this.columns = builder.columns;
+ this.path = builder.path;
+ this.schema = builder.schema;
+ this.condition = builder.condition;
+ this.limit = builder.limit;
+ this.fs = ImpersonationUtil.createFileSystem(
+ ImpersonationUtil.resolveUserName(userName), formatPlugin.getFsConf());
+
+ DeltaParquetTableMetadataProvider metadataProvider =
+ defaultTableMetadataProviderBuilder(new
FileSystemMetadataProviderManager())
+ .withEntries(entries)
+ .withFormatPlugin(formatPlugin)
+ .withReaderConfig(readerConfig)
+ .withSchema(schema)
+ .build();
+
+ this.metadataProvider = metadataProvider;
+ this.entries = metadataProvider.getEntries();
+ this.partitionHolder = builder.partitionValues;
+ this.fileSet = metadataProvider.getFileSet();
+
+ init();
+ }
+
+ /**
+ * Private constructor, used for cloning.
+ *
+ * @param that The DeltaGroupScan to clone
+ */
+ private DeltaGroupScan(DeltaGroupScan that) {
+ super(that);
+ this.columns = that.columns;
+ this.formatPlugin = that.formatPlugin;
+ this.path = that.path;
+ this.condition = that.condition;
+ this.schema = that.schema;
+ this.mappings = that.mappings;
+ this.fs = that.fs;
+ this.limit = that.limit;
+ this.addFiles = that.addFiles;
+ this.endpointAffinities = that.endpointAffinities;
+ this.partitionHolder = that.partitionHolder;
+ }
+
+ @Override
+ protected DeltaParquetTableMetadataProvider.Builder
tableMetadataProviderBuilder(MetadataProviderManager source) {
+ return defaultTableMetadataProviderBuilder(source);
+ }
+
+ @Override
+ protected DeltaParquetTableMetadataProvider.Builder
defaultTableMetadataProviderBuilder(MetadataProviderManager source) {
+ return new DeltaParquetTableMetadataProvider.Builder(source);
+ }
+
+ public static DeltaGroupScanBuilder builder() {
+ return new DeltaGroupScanBuilder();
+ }
+
+ @Override
+ public DeltaGroupScan clone(List<SchemaPath> columns) {
+ try {
+ return toBuilder().columns(columns).build();
+ } catch (IOException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ @Override
+ public DeltaGroupScan applyLimit(int maxRecords) {
+ DeltaGroupScan clone = new DeltaGroupScan(this);
+ clone.limit = maxRecords;
+ return clone;
+ }
+
+ @Override
+ public SubScan getSpecificScan(int minorFragmentId) {
+ List<RowGroupReadEntry> readEntries = getReadEntries(minorFragmentId);
+ Map<Path, Map<String, String>> subPartitionHolder = new HashMap<>();
+ for (RowGroupReadEntry readEntry : readEntries) {
+ Map<String, String> values = partitionHolder.get(readEntry.getPath());
+ subPartitionHolder.put(readEntry.getPath(), values);
+ }
+ return new DeltaRowGroupScan(getUserName(), formatPlugin, readEntries,
columns, subPartitionHolder,
+ readerConfig, filter, getTableMetadata().getSchema());
+ }
+
+ @Override
+ public DeltaGroupScan clone(FileSelection selection) throws IOException {
+ DeltaGroupScan newScan = new DeltaGroupScan(this);
+ newScan.modifyFileSelection(selection);
+ newScan.init();
+ return newScan;
+ }
+
+ @Override
+ protected RowGroupScanFilterer<?> getFilterer() {
+ return new DeltaParquetScanFilterer(this);
+ }
+
+ @Override
+ protected Collection<DrillbitEndpoint> getDrillbits() {
+ return formatPlugin.getContext().getBits();
+ }
+
+ @Override
+ protected AbstractParquetGroupScan cloneWithFileSelection(Collection<Path>
filePaths) throws IOException {
+ FileSelection newSelection = new FileSelection(null, new
ArrayList<>(filePaths), null, null, false);
+ return clone(newSelection);
+ }
+
+ @Override
+ protected boolean supportsFileImplicitColumns() {
+ // current group scan should populate directory partition values
+ return false;
+ }
+
+ @Override
+ protected List<String> getPartitionValues(LocationProvider locationProvider)
{
+ return Collections.emptyList();
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ Preconditions.checkArgument(children.isEmpty());
+ return new DeltaGroupScan(this);
+ }
+
+ @Override
+ public boolean supportsLimitPushdown() {
+ return false;
+ }
+
+ @JsonProperty("schema")
+ public TupleMetadata getSchema() {
+ return schema;
+ }
+
+ @JsonProperty("storage")
+ public StoragePluginConfig getStorageConfig() {
+ return formatPlugin.getStorageConfig();
+ }
+
+ @JsonProperty("format")
+ public FormatPluginConfig getFormatConfig() {
+ return formatPlugin.getConfig();
+ }
+
+ @JsonProperty("path")
+ public String getPath() {
+ return path;
+ }
+
+ @JsonProperty("condition")
+ public LogicalExpression getCondition() {
+ return condition;
+ }
+
+ @JsonProperty("partitionHolder")
+ public Map<Path, Map<String, String>> getPartitionHolder() {
+ return partitionHolder;
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("path", path)
+ .field("entries", entries)
+ .field("schema", schema)
+ .field("columns", columns)
+ .field("addFiles", addFiles)
+ .field("limit", limit)
+ .field("numFiles", getEntries().size())
+ .toString();
+ }
+
+ public DeltaGroupScanBuilder toBuilder() {
+ return new DeltaGroupScanBuilder()
+ .userName(this.userName)
+ .formatPlugin(this.formatPlugin)
+ .schema(this.schema)
+ .path(this.path)
+ .condition(this.condition)
+ .columns(this.columns)
+ .limit(this.limit);
+ }
+
+ private static class DeltaParquetScanFilterer extends
RowGroupScanFilterer<DeltaParquetScanFilterer> {
+
+ public DeltaParquetScanFilterer(DeltaGroupScan source) {
+ super(source);
+ }
+
+ @Override
+ protected AbstractParquetGroupScan getNewScan() {
+ return new DeltaGroupScan((DeltaGroupScan) source);
+ }
+
+ @Override
+ protected DeltaParquetScanFilterer self() {
+ return this;
+ }
+
+ @Override
+ protected <T extends Metadata> Map<SchemaPath, ColumnStatistics<?>>
getImplicitColumnStatistics(
+ OptionManager optionManager, T metadata, Map<SchemaPath,
ColumnStatistics<?>> columnsStatistics) {
+ if (metadata instanceof LocationProvider && optionManager != null) {
+ LocationProvider locationProvider = (LocationProvider) metadata;
+ columnsStatistics = new HashMap<>(columnsStatistics);
+ Map<String, String> partitions =
+ ((DeltaGroupScan)
source).getPartitionHolder().get(locationProvider.getPath());
+ for (Map.Entry<String, String> partitionValue : partitions.entrySet())
{
+ TypeProtos.MinorType minorType =
+
tableSchema.column(partitionValue.getKey()).getType().getMinorType();
+ String value = partitionValue.getValue();
+ if (value != null) {
+
columnsStatistics.put(SchemaPath.getCompoundPath(partitionValue.getKey()),
+ StatisticsProvider.getConstantColumnStatistics(
+ castPartitionValue(value, minorType), minorType));
+ } else {
+ Long rowCount = TableStatisticsKind.ROW_COUNT.getValue(metadata);
+
columnsStatistics.put(SchemaPath.getCompoundPath(partitionValue.getKey()),
+ StatisticsProvider.getColumnStatistics(null, null, rowCount,
minorType));
+ }
+ }
+ }
+
+ return columnsStatistics;
+ }
+
+ private Object castPartitionValue(String value, TypeProtos.MinorType type)
{
+ switch (type) {
+ case BIT:
+ return Boolean.parseBoolean(value);
+ case TINYINT:
+ return Byte.parseByte(value);
+ case SMALLINT:
+ return Short.parseShort(value);
+ case INT:
+ return Integer.parseInt(value);
+ case BIGINT:
+ return Long.parseLong(value);
+ case FLOAT4:
+ return Float.parseFloat(value);
+ case FLOAT8:
+ return Double.parseDouble(value);
+ case DATE:
+ return DateUtility.parseLocalDate(value);
+ case TIME:
+ return DateUtility.parseLocalTime(value);
+ case TIMESTAMP:
+ return DateUtility.parseBest(value);
+ case VARCHAR:
+ return value;
+ case VARDECIMAL:
+ return new BigDecimal(value);
+ default:
+ throw new UnsupportedOperationException("Unsupported partition type: "
+ type);
+ }
+ }
+ }
+
+ public static class DeltaGroupScanBuilder {
+ private String userName;
+
+ private DeltaFormatPlugin formatPlugin;
+
+ private TupleMetadata schema;
+
+ private String path;
+
+ private LogicalExpression condition;
+
+ private List<SchemaPath> columns;
+
+ private int limit;
+
+ private List<ReadEntryWithPath> entries;
+
+ private ParquetReaderConfig readerConfig =
ParquetReaderConfig.getDefaultInstance();
+
+ private Map<Path, Map<String, String>> partitionValues;
+
+ public DeltaGroupScanBuilder userName(String userName) {
+ this.userName = userName;
+ return this;
+ }
+
+ public DeltaGroupScanBuilder formatPlugin(DeltaFormatPlugin formatPlugin) {
+ this.formatPlugin = formatPlugin;
+ return this;
+ }
+
+ public DeltaGroupScanBuilder schema(TupleMetadata schema) {
+ this.schema = schema;
+ return this;
+ }
+
+ public DeltaGroupScanBuilder path(String path) {
+ this.path = path;
+ return this;
+ }
+
+ public DeltaGroupScanBuilder condition(LogicalExpression condition) {
+ this.condition = condition;
+ return this;
+ }
+
+ public DeltaGroupScanBuilder columns(List<SchemaPath> columns) {
+ this.columns = columns;
+ return this;
+ }
+
+ public DeltaGroupScanBuilder limit(int maxRecords) {
+ this.limit = maxRecords;
+ return this;
+ }
+
+ public DeltaGroupScanBuilder readerConfig(ParquetReaderConfig
readerConfig) {
+ this.readerConfig = readerConfig;
+ return this;
+ }
+
+ public DeltaGroupScan build() throws IOException {
+ DeltaLog log = DeltaLog.forTable(formatPlugin.getFsConf(), path);
+ Snapshot snapshot = log.snapshot();
+ StructType structType = snapshot.getMetadata().getSchema();
+ schema = toSchema(structType);
+
+ DeltaScan scan = Optional.ofNullable(condition)
+ .map(c -> c.accept(new DrillExprToDeltaTranslator(structType), null))
+ .map(snapshot::scan)
+ .orElse(snapshot.scan());
+
+ try {
+ CloseableIterator<AddFile> files = scan.getFiles();
+ ArrayList<AddFile> addFiles = Lists.newArrayList(() -> files);
+ entries = addFiles.stream()
+ .map(addFile -> new ReadEntryWithPath(new
Path(URI.create(path).getPath(), URI.create(addFile.getPath()).getPath())))
+ .collect(Collectors.toList());
+
+ partitionValues = addFiles.stream()
+ .collect(Collectors.toMap(
+ addFile -> new Path(URI.create(path).getPath(),
URI.create(addFile.getPath()).getPath()),
+ addFile -> collectPartitionedValues(snapshot, addFile)));
+
+ files.close();
+ } catch (IOException e) {
+ throw new DrillRuntimeException(e);
+ }
+
+ return new DeltaGroupScan(this);
+ }
+
+ private Map<String, String> collectPartitionedValues(Snapshot snapshot,
AddFile addFile) {
+ Map<String, String> partitionValues = new LinkedHashMap<>();
+ snapshot.getMetadata().getPartitionColumns().stream()
+ .map(col -> Pair.of(col, addFile.getPartitionValues().get(col)))
+ .forEach(pair -> partitionValues.put(pair.getKey(), pair.getValue()));
+ return partitionValues;
+ }
+
+ private TupleMetadata toSchema(StructType structType) {
+ TupleBuilder tupleBuilder = new TupleBuilder();
+ for (StructField field : structType.getFields()) {
+ tupleBuilder.addColumn(toColumnMetadata(field));
+ }
+
+ return tupleBuilder.schema();
+ }
+
+ private ColumnMetadata toColumnMetadata(StructField field) {
+ DataType dataType = field.getDataType();
+ if (dataType instanceof ArrayType) {
+ DataType elementType = ((ArrayType) dataType).getElementType();
+ if (elementType instanceof ArrayType) {
+ return MetadataUtils.newRepeatedList(field.getName(),
+ toColumnMetadata(new StructField(field.getName(), ((ArrayType)
elementType).getElementType(), false)));
+ } else if (elementType instanceof StructType) {
+ return MetadataUtils.newMapArray(field.getName(),
toSchema((StructType) elementType));
+ }
+ return MetadataUtils.newScalar(field.getName(),
toMinorType(elementType), TypeProtos.DataMode.REPEATED);
+ } else if (dataType instanceof StructType) {
+ return MetadataUtils.newMap(field.getName(), toSchema((StructType)
dataType));
+ } else {
+ return MetadataUtils.newScalar(field.getName(),
toMinorType(field.getDataType()),
+ field.isNullable() ? TypeProtos.DataMode.OPTIONAL :
TypeProtos.DataMode.REQUIRED);
+ }
+ }
+
+ private TypeProtos.MinorType toMinorType(DataType dataType) {
+ if (dataType instanceof BinaryType) {
+ return TypeProtos.MinorType.VARBINARY;
+ } else if (dataType instanceof BooleanType) {
+ return TypeProtos.MinorType.BIT;
+ } else if (dataType instanceof ByteType) {
+ return TypeProtos.MinorType.TINYINT;
+ } else if (dataType instanceof DateType) {
+ return TypeProtos.MinorType.DATE;
+ } else if (dataType instanceof DecimalType) {
+ return TypeProtos.MinorType.VARDECIMAL;
+ } else if (dataType instanceof DoubleType) {
+ return TypeProtos.MinorType.FLOAT8;
+ } else if (dataType instanceof FloatType) {
+ return TypeProtos.MinorType.FLOAT4;
+ } else if (dataType instanceof IntegerType) {
+ return TypeProtos.MinorType.INT;
+ } else if (dataType instanceof LongType) {
+ return TypeProtos.MinorType.BIGINT;
+ } else if (dataType instanceof ShortType) {
+ return TypeProtos.MinorType.SMALLINT;
+ } else if (dataType instanceof StringType) {
+ return TypeProtos.MinorType.VARCHAR;
+ } else if (dataType instanceof TimestampType) {
+ return TypeProtos.MinorType.TIMESTAMP;
+ } else {
+ throw new DrillRuntimeException("Unsupported data type: " + dataType);
+ }
+ }
+ }
+}
diff --git
a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaParquetTableMetadataProvider.java
b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaParquetTableMetadataProvider.java
new file mode 100644
index 0000000000..aeec5b0785
--- /dev/null
+++
b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaParquetTableMetadataProvider.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta;
+
+import org.apache.drill.exec.metastore.MetadataProviderManager;
+import org.apache.drill.exec.store.delta.format.DeltaFormatPlugin;
+import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
+import org.apache.drill.exec.store.parquet.BaseParquetMetadataProvider;
+import org.apache.drill.exec.store.parquet.metadata.Metadata;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * This is Metadata provider for Delta tables, which are read by Drill native
Parquet reader
+ */
+public class DeltaParquetTableMetadataProvider extends
BaseParquetMetadataProvider {
+
+ private final DeltaFormatPlugin deltaFormatPlugin;
+
+ private DeltaParquetTableMetadataProvider(Builder builder) throws
IOException {
+ super(builder);
+
+ this.deltaFormatPlugin = builder.formatPlugin;
+
+ init((BaseParquetMetadataProvider)
builder.metadataProviderManager().getTableMetadataProvider());
+ }
+
+ @Override
+ protected void initInternal() throws IOException {
+ Map<FileStatus, FileSystem> fileStatusConfMap = new LinkedHashMap<>();
+ for (ReadEntryWithPath entry : entries) {
+ Path path = entry.getPath();
+ FileSystem fs = path.getFileSystem(deltaFormatPlugin.getFsConf());
+
fileStatusConfMap.put(fs.getFileStatus(Path.getPathWithoutSchemeAndAuthority(path)),
fs);
+ }
+ parquetTableMetadata = Metadata.getParquetTableMetadata(fileStatusConfMap,
readerConfig);
+ }
+
+ public static class Builder extends
BaseParquetMetadataProvider.Builder<Builder> {
+ private DeltaFormatPlugin formatPlugin;
+
+ public Builder(MetadataProviderManager source) {
+ super(source);
+ }
+
+ protected Builder withFormatPlugin(DeltaFormatPlugin formatPlugin) {
+ this.formatPlugin = formatPlugin;
+ return self();
+ }
+
+ @Override
+ protected Builder self() {
+ return this;
+ }
+
+ @Override
+ public DeltaParquetTableMetadataProvider build() throws IOException {
+ return new DeltaParquetTableMetadataProvider(this);
+ }
+ }
+}
diff --git
a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaRowGroupScan.java
b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaRowGroupScan.java
new file mode 100644
index 0000000000..5645c3f559
--- /dev/null
+++
b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaRowGroupScan.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.delta.format.DeltaFormatPlugin;
+import org.apache.drill.exec.store.delta.format.DeltaFormatPluginConfig;
+import org.apache.drill.exec.store.parquet.AbstractParquetRowGroupScan;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
+import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@JsonTypeName("delta-row-group-scan")
+public class DeltaRowGroupScan extends AbstractParquetRowGroupScan {
+
+ public static final String OPERATOR_TYPE = "DELTA_ROW_GROUP_SCAN";
+
+ private final DeltaFormatPlugin formatPlugin;
+ private final DeltaFormatPluginConfig formatPluginConfig;
+ private final Map<Path, Map<String, String>> partitions;
+
+ @JsonCreator
+ public DeltaRowGroupScan(@JacksonInject StoragePluginRegistry registry,
+ @JsonProperty("userName") String userName,
+ @JsonProperty("storage") StoragePluginConfig storageConfig,
+ @JsonProperty("formatPluginConfig") FormatPluginConfig formatPluginConfig,
+ @JsonProperty("rowGroupReadEntries") List<RowGroupReadEntry>
rowGroupReadEntries,
+ @JsonProperty("columns") List<SchemaPath> columns,
+ @JsonProperty("partitions") Map<Path, Map<String, String>> partitions,
+ @JsonProperty("readerConfig") ParquetReaderConfig readerConfig,
+ @JsonProperty("filter") LogicalExpression filter,
+ @JsonProperty("schema") TupleMetadata schema) {
+ this(userName,
+ registry.resolveFormat(storageConfig, formatPluginConfig,
DeltaFormatPlugin.class),
+ rowGroupReadEntries,
+ columns,
+ partitions,
+ readerConfig,
+ filter,
+ schema);
+ }
+
+ public DeltaRowGroupScan(String userName,
+ DeltaFormatPlugin formatPlugin,
+ List<RowGroupReadEntry> rowGroupReadEntries,
+ List<SchemaPath> columns,
+ Map<Path, Map<String, String>> partitions,
+ ParquetReaderConfig readerConfig,
+ LogicalExpression filter,
+ TupleMetadata schema) {
+ super(userName, rowGroupReadEntries, columns, readerConfig, filter,null,
schema);
+ this.formatPlugin = formatPlugin;
+ this.formatPluginConfig = formatPlugin.getConfig();
+ this.partitions = partitions;
+ }
+
+ @JsonProperty
+ public DeltaFormatPluginConfig getFormatPluginConfig() {
+ return formatPluginConfig;
+ }
+
+ @JsonProperty
+ public Map<Path, Map<String, String>> getPartitions() {
+ return partitions;
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ Preconditions.checkArgument(children.isEmpty());
+ return new DeltaRowGroupScan(getUserName(), formatPlugin,
rowGroupReadEntries, columns, partitions,
+ readerConfig, filter, schema);
+ }
+
+ @Override
+ public String getOperatorType() {
+ return OPERATOR_TYPE;
+ }
+
+ @Override
+ public AbstractParquetRowGroupScan copy(List<SchemaPath> columns) {
+ return new DeltaRowGroupScan(getUserName(), formatPlugin,
rowGroupReadEntries, columns, partitions,
+ readerConfig, filter, schema);
+ }
+
+ @Override
+ public Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry) {
+ return formatPlugin.getFsConf();
+ }
+
+ @Override
+ public boolean supportsFileImplicitColumns() {
+ return true;
+ }
+
+ @Override
+ public List<String> getPartitionValues(RowGroupReadEntry rowGroupReadEntry) {
+ return Collections.emptyList();
+ }
+
+ public Map<String, String> getPartitions(RowGroupReadEntry
rowGroupReadEntry) {
+ return partitions.get(rowGroupReadEntry.getPath());
+ }
+
+ @Override
+ public boolean isImplicitColumn(SchemaPath path, String
partitionColumnLabel) {
+ return partitions.values().stream()
+ .anyMatch(map -> map.containsKey(path.getAsUnescapedPath()));
+ }
+}
+
diff --git
a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatMatcher.java
b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatMatcher.java
new file mode 100644
index 0000000000..502a74d5be
--- /dev/null
+++
b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatMatcher.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.format;
+
+import io.delta.standalone.DeltaLog;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.plan.rel.PluginDrillTable;
+import org.apache.hadoop.fs.FileStatus;
+
+public class DeltaFormatMatcher extends FormatMatcher {
+
+ private final DeltaFormatPlugin formatPlugin;
+
+ public DeltaFormatMatcher(DeltaFormatPlugin formatPlugin) {
+ this.formatPlugin = formatPlugin;
+ }
+
+ @Override
+ public boolean supportDirectoryReads() {
+ return true;
+ }
+
+ @Override
+ public DrillTable isReadable(DrillFileSystem fs, FileSelection selection,
FileSystemPlugin fsPlugin,
+ String storageEngineName, SchemaConfig schemaConfig) {
+ if (DeltaLog.forTable(fsPlugin.getFsConf(),
selection.getSelectionRoot()).tableExists()) {
+ FormatSelection formatSelection = new
FormatSelection(formatPlugin.getConfig(), selection);
+ return new PluginDrillTable(fsPlugin, storageEngineName,
schemaConfig.getUserName(), formatSelection, formatPlugin.getConvention());
+ }
+ return null;
+ }
+
+ @Override
+ public boolean isFileReadable(DrillFileSystem fs, FileStatus status) {
+ return false;
+ }
+
+ @Override
+ public FormatPlugin getFormatPlugin() {
+ return formatPlugin;
+ }
+
+ public int priority() {
+ return HIGH_PRIORITY;
+ }
+}
diff --git
a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPlugin.java
b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPlugin.java
new file mode 100644
index 0000000000..37e4803ae5
--- /dev/null
+++
b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPlugin.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.format;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.planner.common.DrillStatsTable;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.PluginRulesProviderImpl;
+import org.apache.drill.exec.store.StoragePluginRulesSupplier;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.delta.DeltaGroupScan;
+import org.apache.drill.exec.store.delta.plan.DeltaPluginImplementor;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
+import org.apache.drill.exec.store.plan.rel.PluginRel;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DeltaFormatPlugin implements FormatPlugin {
+
+ private static final String DELTA_CONVENTION_PREFIX = "DELTA.";
+
+ /**
+ * Generator for format id values. Formats with the same name may be defined
+ * in multiple storage plugins, so using the unique id within the convention
name
+ * to ensure the rule names will be unique for different plugin instances.
+ */
+ private static final AtomicInteger NEXT_ID = new AtomicInteger(0);
+
+ private final FileSystemConfig storageConfig;
+
+ private final DeltaFormatPluginConfig config;
+
+ private final Configuration fsConf;
+
+ private final DrillbitContext context;
+
+ private final String name;
+
+ private final DeltaFormatMatcher matcher;
+
+ private final StoragePluginRulesSupplier storagePluginRulesSupplier;
+
+ public DeltaFormatPlugin(
+ String name,
+ DrillbitContext context,
+ Configuration fsConf,
+ FileSystemConfig storageConfig,
+ DeltaFormatPluginConfig config) {
+ this.storageConfig = storageConfig;
+ this.config = config;
+ this.fsConf = fsConf;
+ this.context = context;
+ this.name = name;
+ this.matcher = new DeltaFormatMatcher(this);
+ this.storagePluginRulesSupplier = storagePluginRulesSupplier(name +
NEXT_ID.getAndIncrement());
+ }
+
+ private static StoragePluginRulesSupplier storagePluginRulesSupplier(String
name) {
+ Convention convention = new Convention.Impl(DELTA_CONVENTION_PREFIX +
name, PluginRel.class);
+ return StoragePluginRulesSupplier.builder()
+ .rulesProvider(new PluginRulesProviderImpl(convention,
DeltaPluginImplementor::new))
+ .supportsFilterPushdown(true)
+ .supportsProjectPushdown(true)
+ .supportsLimitPushdown(true)
+ .convention(convention)
+ .build();
+ }
+
+ @Override
+ public boolean supportsRead() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsWrite() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsAutoPartitioning() {
+ return false;
+ }
+
+ @Override
+ public FormatMatcher getMatcher() {
+ return matcher;
+ }
+
+ @Override
+ public AbstractWriter getWriter(PhysicalOperator child, String location,
List<String> partitionColumns) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<? extends RelOptRule> getOptimizerRules(PlannerPhase phase) {
+ switch (phase) {
+ case PHYSICAL:
+ case LOGICAL:
+ return storagePluginRulesSupplier.getOptimizerRules();
+ case LOGICAL_PRUNE_AND_JOIN:
+ case LOGICAL_PRUNE:
+ case PARTITION_PRUNING:
+ case JOIN_PLANNING:
+ default:
+ return Collections.emptySet();
+ }
+ }
+
+ @Override
+ public AbstractGroupScan getGroupScan(String userName, FileSelection
selection, List<SchemaPath> columns) throws IOException {
+ return getGroupScan(userName, selection, columns, (OptionManager) null);
+ }
+
+ @Override
+ public AbstractGroupScan getGroupScan(String userName, FileSelection
selection, List<SchemaPath> columns, OptionManager options) throws IOException {
+ return getGroupScan(userName, selection, columns, options, null);
+ }
+
+ @Override
+ public AbstractGroupScan getGroupScan(String userName, FileSelection
selection,
+ List<SchemaPath> columns, OptionManager options, MetadataProviderManager
metadataProviderManager) throws IOException {
+ ParquetReaderConfig readerConfig = ParquetReaderConfig.builder()
+ .withConf(fsConf)
+ .withOptions(options)
+ .build();
+ return DeltaGroupScan.builder()
+ .userName(userName)
+ .formatPlugin(this)
+ .readerConfig(readerConfig)
+ .path(selection.selectionRoot.toUri().getPath())
+ .columns(columns)
+ .limit(-1)
+ .build();
+ }
+
+ @Override
+ public AbstractGroupScan getGroupScan(String userName, FileSelection
selection,
+ List<SchemaPath> columns, MetadataProviderManager metadataProviderManager)
throws IOException {
+ SchemaProvider schemaProvider =
metadataProviderManager.getSchemaProvider();
+ TupleMetadata schema = schemaProvider != null
+ ? schemaProvider.read().getSchema()
+ : null;
+ return DeltaGroupScan.builder()
+ .userName(userName)
+ .formatPlugin(this)
+ .readerConfig(ParquetReaderConfig.builder().withConf(fsConf).build())
+ .schema(schema)
+ .path(selection.selectionRoot.toUri().getPath())
+ .columns(columns)
+ .limit(-1)
+ .build();
+ }
+
+ @Override
+ public boolean supportsStatistics() {
+ return false;
+ }
+
+ @Override
+ public DrillStatsTable.TableStatistics readStatistics(FileSystem fs, Path
statsTablePath) {
+ throw new UnsupportedOperationException("unimplemented");
+ }
+
+ @Override
+ public void writeStatistics(DrillStatsTable.TableStatistics statistics,
FileSystem fs, Path statsTablePath) {
+ throw new UnsupportedOperationException("unimplemented");
+ }
+
+ @Override
+ public DeltaFormatPluginConfig getConfig() {
+ return config;
+ }
+
+ @Override
+ public FileSystemConfig getStorageConfig() {
+ return storageConfig;
+ }
+
+ @Override
+ public Configuration getFsConf() {
+ return fsConf;
+ }
+
+ @Override
+ public DrillbitContext getContext() {
+ return context;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ public Convention getConvention() {
+ return storagePluginRulesSupplier.convention();
+ }
+
+}
diff --git
a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java
b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java
new file mode 100644
index 0000000000..9fd7bb7e30
--- /dev/null
+++
b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.format;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+@JsonTypeName(DeltaFormatPluginConfig.NAME)
+public class DeltaFormatPluginConfig implements FormatPluginConfig {
+
+ public static final String NAME = "delta";
+
+ @JsonCreator
+ public DeltaFormatPluginConfig() {
+ }
+}
diff --git
a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/plan/DeltaPluginImplementor.java
b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/plan/DeltaPluginImplementor.java
new file mode 100644
index 0000000000..84be68cf3b
--- /dev/null
+++
b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/plan/DeltaPluginImplementor.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.plan;
+
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Util;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.planner.logical.DrillConstExecutor;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.delta.DeltaGroupScan;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.plan.AbstractPluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class DeltaPluginImplementor extends AbstractPluginImplementor {
+
+ private DeltaGroupScan groupScan;
+
+ @Override
+ public void implement(StoragePluginTableScan scan) {
+ groupScan = (DeltaGroupScan) scan.getGroupScan();
+ }
+
+ @Override
+ public void implement(PluginFilterRel filter) throws IOException {
+ visitChild(filter.getInput());
+
+ RexNode condition = filter.getCondition();
+ LogicalExpression expression = DrillOptiq.toDrill(
+ new
DrillParseContext(PrelUtil.getPlannerSettings(filter.getCluster().getPlanner())),
+ filter.getInput(),
+ condition);
+
+ DrillConstExecutor executor = (DrillConstExecutor)
filter.getCluster().getPlanner().getExecutor();
+ PlannerSettings plannerSettings =
filter.getCluster().getPlanner().getContext().unwrap(PlannerSettings.class);
+ groupScan = Optional.ofNullable((DeltaGroupScan)
groupScan.applyFilter(expression, executor.getUdfUtilities(),
+ plannerSettings.functionImplementationRegistry,
plannerSettings.getOptions()))
+ .orElse(groupScan);
+ }
+
+ @Override
+ public void implement(PluginProjectRel project) throws IOException {
+ visitChild(project.getInput());
+
+ DrillParseContext context = new
DrillParseContext(PrelUtil.getPlannerSettings(project.getCluster().getPlanner()));
+ RelNode input = project.getInput();
+
+ List<SchemaPath> projects = project.getProjects().stream()
+ .map(e -> (SchemaPath) DrillOptiq.toDrill(context, input, e))
+ .collect(Collectors.toList());
+ groupScan = groupScan.clone(projects);
+ }
+
+ @Override
+ public boolean canImplement(Filter filter) {
+ FilterFinder filterFinder = new FilterFinder();
+ filter.getInput().accept(filterFinder);
+ return filterFinder.getFilter() == null;
+ }
+
+ @Override
+ public void implement(PluginLimitRel limit) throws IOException {
+ visitChild(limit.getInput());
+ int maxRecords = getArtificialLimit(limit);
+ if (maxRecords >= 0) {
+ groupScan = groupScan.applyLimit(maxRecords);
+ }
+ }
+
+ @Override
+ public boolean canImplement(DrillLimitRelBase limit) {
+ if (hasPluginGroupScan(limit)) {
+ FirstLimitFinder finder = new FirstLimitFinder();
+ limit.getInput().accept(finder);
+ int oldLimit = getArtificialLimit(finder.getFetch(), finder.getOffset());
+ int newLimit = getArtificialLimit(limit);
+ return newLimit >= 0 && (oldLimit < 0 || newLimit < oldLimit);
+ }
+ return false;
+ }
+
+ @Override
+ public boolean artificialLimit() {
+ return true;
+ }
+
+ @Override
+ public boolean artificialFilter() {
+ return true;
+ }
+
+ @Override
+ protected Class<? extends StoragePlugin> supportedPlugin() {
+ return FileSystemPlugin.class;
+ }
+
+ @Override
+ public boolean splitProject(Project project) {
+ return true;
+ }
+
+ @Override
+ public boolean canImplement(Project project) {
+ return hasPluginGroupScan(project);
+ }
+
+ @Override
+ public GroupScan getPhysicalOperator() {
+ return groupScan;
+ }
+
+ @Override
+ protected boolean hasPluginGroupScan(RelNode node) {
+ return findGroupScan(node) instanceof DeltaGroupScan;
+ }
+
+ private int rexLiteralIntValue(RexLiteral offset) {
+ return ((BigDecimal) offset.getValue()).intValue();
+ }
+
+ private int getArtificialLimit(DrillLimitRelBase limit) {
+ return getArtificialLimit(limit.getFetch(), limit.getOffset());
+ }
+
+ private int getArtificialLimit(RexNode fetch, RexNode offset) {
+ int maxRows = -1;
+ if (fetch != null) {
+ maxRows = rexLiteralIntValue((RexLiteral) fetch);
+ if (offset != null) {
+ maxRows += rexLiteralIntValue((RexLiteral) offset);
+ }
+ }
+ return maxRows;
+ }
+
+ private static class FilterFinder extends RelShuttleImpl {
+ private RelNode filter;
+
+ @Override
+ public RelNode visit(LogicalFilter filter) {
+ this.filter = filter;
+ return filter;
+ }
+
+ @Override
+ public RelNode visit(RelNode other) {
+ if (other instanceof Filter) {
+ this.filter = other;
+ return other;
+ } else if (other instanceof RelSubset) {
+ RelSubset relSubset = (RelSubset) other;
+ Util.first(relSubset.getBest(), relSubset.getOriginal()).accept(this);
+ }
+ return super.visit(other);
+ }
+
+ public RelNode getFilter() {
+ return filter;
+ }
+ }
+
+ private static class FirstLimitFinder extends RelShuttleImpl {
+ private RexNode fetch;
+
+ private RexNode offset;
+
+ @Override
+ public RelNode visit(RelNode other) {
+ if (other instanceof DrillLimitRelBase) {
+ DrillLimitRelBase limitRelBase = (DrillLimitRelBase) other;
+ fetch = limitRelBase.getFetch();
+ offset = limitRelBase.getOffset();
+ return other;
+ } else if (other instanceof RelSubset) {
+ RelSubset relSubset = (RelSubset) other;
+ Util.first(relSubset.getBest(), relSubset.getOriginal()).accept(this);
+ }
+ return super.visit(other);
+ }
+
+ public RexNode getFetch() {
+ return fetch;
+ }
+
+ public RexNode getOffset() {
+ return offset;
+ }
+ }
+}
diff --git
a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/plan/DrillExprToDeltaTranslator.java
b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/plan/DrillExprToDeltaTranslator.java
new file mode 100644
index 0000000000..bd95bdeb35
--- /dev/null
+++
b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/plan/DrillExprToDeltaTranslator.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.plan;
+
+import io.delta.standalone.expressions.And;
+import io.delta.standalone.expressions.EqualTo;
+import io.delta.standalone.expressions.Expression;
+import io.delta.standalone.expressions.GreaterThan;
+import io.delta.standalone.expressions.GreaterThanOrEqual;
+import io.delta.standalone.expressions.IsNotNull;
+import io.delta.standalone.expressions.IsNull;
+import io.delta.standalone.expressions.LessThan;
+import io.delta.standalone.expressions.LessThanOrEqual;
+import io.delta.standalone.expressions.Literal;
+import io.delta.standalone.expressions.Not;
+import io.delta.standalone.expressions.Or;
+import io.delta.standalone.expressions.Predicate;
+import io.delta.standalone.types.StructType;
+import org.apache.drill.common.FunctionNames;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+
+public class DrillExprToDeltaTranslator extends
AbstractExprVisitor<Expression, Void, RuntimeException> {
+
+ private final StructType structType;
+
+ public DrillExprToDeltaTranslator(StructType structType) {
+ this.structType = structType;
+ }
+
+ @Override
+ public Expression visitFunctionCall(FunctionCall call, Void value) {
+ try {
+ return visitFunctionCall(call);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ private Predicate visitFunctionCall(FunctionCall call) {
+ switch (call.getName()) {
+ case FunctionNames.AND: {
+ Expression left = call.arg(0).accept(this, null);
+ Expression right = call.arg(1).accept(this, null);
+ if (left != null && right != null) {
+ return new And(left, right);
+ }
+ return null;
+ }
+ case FunctionNames.OR: {
+ Expression left = call.arg(0).accept(this, null);
+ Expression right = call.arg(1).accept(this, null);
+ if (left != null && right != null) {
+ return new Or(left, right);
+ }
+ return null;
+ }
+ case FunctionNames.NOT: {
+ Expression expression = call.arg(0).accept(this, null);
+ if (expression != null) {
+ return new Not(expression);
+ }
+ return null;
+ }
+ case FunctionNames.IS_NULL: {
+ LogicalExpression arg = call.arg(0);
+ if (arg instanceof SchemaPath) {
+ String name = getPath((SchemaPath) arg);
+ return new IsNull(structType.column(name));
+ }
+ return null;
+ }
+ case FunctionNames.IS_NOT_NULL: {
+ LogicalExpression arg = call.arg(0);
+ if (arg instanceof SchemaPath) {
+ String name = getPath((SchemaPath) arg);
+ return new IsNotNull(structType.column(name));
+ }
+ return null;
+ }
+ case FunctionNames.LT: {
+ LogicalExpression nameRef = call.arg(0);
+ Expression expression = call.arg(1).accept(this, null);
+ if (nameRef instanceof SchemaPath) {
+ String name = getPath((SchemaPath) nameRef);
+ return new LessThan(structType.column(name), expression);
+ }
+ return null;
+ }
+ case FunctionNames.LE: {
+ LogicalExpression nameRef = call.arg(0);
+ Expression expression = call.arg(1).accept(this, null);
+ if (nameRef instanceof SchemaPath) {
+ String name = getPath((SchemaPath) nameRef);
+ return new LessThanOrEqual(structType.column(name), expression);
+ }
+ return null;
+ }
+ case FunctionNames.GT: {
+ LogicalExpression nameRef = call.args().get(0);
+ Expression expression = call.args().get(1).accept(this, null);
+ if (nameRef instanceof SchemaPath) {
+ String name = getPath((SchemaPath) nameRef);
+ return new GreaterThan(structType.column(name), expression);
+ }
+ return null;
+ }
+ case FunctionNames.GE: {
+ LogicalExpression nameRef = call.args().get(0);
+ Expression expression = call.args().get(0).accept(this, null);
+ if (nameRef instanceof SchemaPath) {
+ String name = getPath((SchemaPath) nameRef);
+ return new GreaterThanOrEqual(structType.column(name), expression);
+ }
+ return null;
+ }
+ case FunctionNames.EQ: {
+ LogicalExpression nameRef = call.args().get(0);
+ Expression expression = call.args().get(1).accept(this, null);
+ if (nameRef instanceof SchemaPath) {
+ String name = getPath((SchemaPath) nameRef);
+ return new EqualTo(structType.column(name), expression);
+ }
+ return null;
+ }
+ case FunctionNames.NE: {
+ LogicalExpression nameRef = call.args().get(0);
+ Expression expression = call.args().get(1).accept(this, null);
+ if (nameRef instanceof SchemaPath) {
+ String name = getPath((SchemaPath) nameRef);
+ return new Not(new EqualTo(structType.column(name), expression));
+ }
+ return null;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Expression visitFloatConstant(ValueExpressions.FloatExpression fExpr,
Void value) {
+ return Literal.of(fExpr.getFloat());
+ }
+
+ @Override
+ public Expression visitIntConstant(ValueExpressions.IntExpression intExpr,
Void value) {
+ return Literal.of(intExpr.getInt());
+ }
+
+ @Override
+ public Expression visitLongConstant(ValueExpressions.LongExpression
longExpr, Void value) {
+ return Literal.of(longExpr.getLong());
+ }
+
+ @Override
+ public Expression visitDecimal9Constant(ValueExpressions.Decimal9Expression
decExpr, Void value) {
+ return Literal.of(decExpr.getIntFromDecimal());
+ }
+
+ @Override
+ public Expression
visitDecimal18Constant(ValueExpressions.Decimal18Expression decExpr, Void
value) {
+ return Literal.of(decExpr.getLongFromDecimal());
+ }
+
+ @Override
+ public Expression
visitDecimal28Constant(ValueExpressions.Decimal28Expression decExpr, Void
value) {
+ return Literal.of(decExpr.getBigDecimal());
+ }
+
+ @Override
+ public Expression
visitDecimal38Constant(ValueExpressions.Decimal38Expression decExpr, Void
value) {
+ return Literal.of(decExpr.getBigDecimal());
+ }
+
+ @Override
+ public Expression
visitVarDecimalConstant(ValueExpressions.VarDecimalExpression decExpr, Void
value) {
+ return Literal.of(decExpr.getBigDecimal());
+ }
+
+ @Override
+ public Expression visitDateConstant(ValueExpressions.DateExpression
dateExpr, Void value) {
+ return Literal.of(dateExpr.getDate());
+ }
+
+ @Override
+ public Expression visitTimeConstant(ValueExpressions.TimeExpression
timeExpr, Void value) {
+ return Literal.of(timeExpr.getTime());
+ }
+
+ @Override
+ public Expression
visitTimeStampConstant(ValueExpressions.TimeStampExpression timestampExpr, Void
value) {
+ return Literal.of(timestampExpr.getTimeStamp());
+ }
+
+ @Override
+ public Expression visitDoubleConstant(ValueExpressions.DoubleExpression
dExpr, Void value) {
+ return Literal.of(dExpr.getDouble());
+ }
+
+ @Override
+ public Expression visitBooleanConstant(ValueExpressions.BooleanExpression e,
Void value) {
+ return Literal.of(e.getBoolean());
+ }
+
+ @Override
+ public Expression visitQuotedStringConstant(ValueExpressions.QuotedString e,
Void value) {
+ return Literal.of(e.getString());
+ }
+
+ @Override
+ public Expression visitUnknown(LogicalExpression e, Void value) {
+ return null;
+ }
+
+ private static String getPath(SchemaPath schemaPath) {
+ StringBuilder sb = new StringBuilder();
+ PathSegment segment = schemaPath.getRootSegment();
+ sb.append(segment.getNameSegment().getPath());
+
+ while ((segment = segment.getChild()) != null) {
+ sb.append('.')
+ .append(segment.isNamed()
+ ? segment.getNameSegment().getPath()
+ : "element");
+ }
+ return sb.toString();
+ }
+}
diff --git
a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/read/DeltaScanBatchCreator.java
b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/read/DeltaScanBatchCreator.java
new file mode 100644
index 0000000000..78cac37892
--- /dev/null
+++
b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/read/DeltaScanBatchCreator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.read;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.delta.DeltaRowGroupScan;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.parquet.AbstractParquetRowGroupScan;
+import org.apache.drill.exec.store.parquet.AbstractParquetScanBatchCreator;
+import org.apache.drill.exec.store.parquet.ParquetScanBatchCreator;
+import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+
+@SuppressWarnings("unused")
+public class DeltaScanBatchCreator extends AbstractParquetScanBatchCreator
+ implements BatchCreator<DeltaRowGroupScan> {
+
+ @Override
+ public CloseableRecordBatch getBatch(ExecutorFragmentContext context,
DeltaRowGroupScan rowGroupScan,
+ List<RecordBatch> children) throws ExecutionSetupException {
+ Preconditions.checkArgument(children.isEmpty());
+ OperatorContext oContext = context.newOperatorContext(rowGroupScan);
+ return getBatch(context, rowGroupScan, oContext);
+ }
+
+ @Override
+ protected AbstractDrillFileSystemManager getDrillFileSystemCreator(
+ OperatorContext operatorContext, OptionManager optionManager) {
+ return new
ParquetScanBatchCreator.ParquetDrillFileSystemManager(operatorContext,
+
optionManager.getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val);
+ }
+
+ @Override
+ protected Map<String, String> getImplicitValues(AbstractParquetRowGroupScan
rowGroupScan,
+ ColumnExplorer columnExplorer, RowGroupReadEntry rowGroup, DrillFileSystem
fs) {
+ return ((DeltaRowGroupScan) rowGroupScan).getPartitions(rowGroup);
+ }
+
+}
diff --git
a/contrib/format-deltalake/src/main/resources/bootstrap-format-plugins.json
b/contrib/format-deltalake/src/main/resources/bootstrap-format-plugins.json
new file mode 100644
index 0000000000..41d6f2ff69
--- /dev/null
+++ b/contrib/format-deltalake/src/main/resources/bootstrap-format-plugins.json
@@ -0,0 +1,20 @@
+{
+ "storage":{
+ "dfs": {
+ "type": "file",
+ "formats": {
+ "delta": {
+ "type": "delta"
+ }
+ }
+ },
+ "s3": {
+ "type": "file",
+ "formats": {
+ "delta": {
+ "type": "delta"
+ }
+ }
+ }
+ }
+}
diff --git a/contrib/format-deltalake/src/main/resources/drill-module.conf
b/contrib/format-deltalake/src/main/resources/drill-module.conf
new file mode 100644
index 0000000000..d4f6639eb7
--- /dev/null
+++ b/contrib/format-deltalake/src/main/resources/drill-module.conf
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file tells Drill to consider this module when class path scanning.
+# This file can also include any supplementary configuration information.
+# This file is in HOCON format, see
https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+drill.classpath.scanning: {
+ packages += "org.apache.drill.exec.store.delta"
+}
diff --git
a/contrib/format-deltalake/src/test/java/org/apache/drill/exec/store/delta/DeltaQueriesTest.java
b/contrib/format-deltalake/src/test/java/org/apache/drill/exec/store/delta/DeltaQueriesTest.java
new file mode 100644
index 0000000000..e74b7d350e
--- /dev/null
+++
b/contrib/format-deltalake/src/test/java/org/apache/drill/exec/store/delta/DeltaQueriesTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta;
+
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.delta.format.DeltaFormatPluginConfig;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.drill.exec.util.StoragePluginTestUtils.DFS_PLUGIN_NAME;
+import static org.junit.Assert.assertEquals;
+
+public class DeltaQueriesTest extends ClusterTest {
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ startCluster(ClusterFixture.builder(dirTestWatcher));
+
+ StoragePluginRegistry pluginRegistry =
cluster.drillbit().getContext().getStorage();
+ FileSystemConfig pluginConfig = (FileSystemConfig)
pluginRegistry.getPlugin(DFS_PLUGIN_NAME).getConfig();
+ Map<String, FormatPluginConfig> formats = new
HashMap<>(pluginConfig.getFormats());
+ formats.put("delta", new DeltaFormatPluginConfig());
+ FileSystemConfig newPluginConfig = new FileSystemConfig(
+ pluginConfig.getConnection(),
+ pluginConfig.getConfig(),
+ pluginConfig.getWorkspaces(),
+ formats,
+ PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+ newPluginConfig.setEnabled(pluginConfig.isEnabled());
+ pluginRegistry.put(DFS_PLUGIN_NAME, newPluginConfig);
+
+ dirTestWatcher.copyResourceToRoot(Paths.get("data-reader-primitives"));
+
dirTestWatcher.copyResourceToRoot(Paths.get("data-reader-partition-values"));
+ dirTestWatcher.copyResourceToRoot(Paths.get("data-reader-nested-struct"));
+ }
+
+ @Test
+ public void testSerDe() throws Exception {
+ String plan = queryBuilder().sql("select * from
dfs.`data-reader-partition-values`").explainJson();
+ long count = queryBuilder().physical(plan).run().recordCount();
+ assertEquals(3, count);
+ }
+
+ @Test
+ public void testAllPrimitives() throws Exception {
+ testBuilder()
+ .sqlQuery("select * from dfs.`data-reader-primitives`")
+ .ordered()
+ .baselineColumns("as_int", "as_long", "as_byte", "as_short",
"as_boolean", "as_float",
+ "as_double", "as_string", "as_binary", "as_big_decimal")
+ .baselineValues(null, null, null, null, null, null, null, null, null,
null)
+ .baselineValues(0, 0L, 0, 0, true, 0.0f, 0.0, "0", new byte[]{0, 0},
BigDecimal.valueOf(0))
+ .baselineValues(1, 1L, 1, 1, false, 1.0f, 1.0, "1", new byte[]{1, 1},
BigDecimal.valueOf(1))
+ .baselineValues(2, 2L, 2, 2, true, 2.0f, 2.0, "2", new byte[]{2, 2},
BigDecimal.valueOf(2))
+ .baselineValues(3, 3L, 3, 3, false, 3.0f, 3.0, "3", new byte[]{3, 3},
BigDecimal.valueOf(3))
+ .baselineValues(4, 4L, 4, 4, true, 4.0f, 4.0, "4", new byte[]{4, 4},
BigDecimal.valueOf(4))
+ .baselineValues(5, 5L, 5, 5, false, 5.0f, 5.0, "5", new byte[]{5, 5},
BigDecimal.valueOf(5))
+ .baselineValues(6, 6L, 6, 6, true, 6.0f, 6.0, "6", new byte[]{6, 6},
BigDecimal.valueOf(6))
+ .baselineValues(7, 7L, 7, 7, false, 7.0f, 7.0, "7", new byte[]{7, 7},
BigDecimal.valueOf(7))
+ .baselineValues(8, 8L, 8, 8, true, 8.0f, 8.0, "8", new byte[]{8, 8},
BigDecimal.valueOf(8))
+ .baselineValues(9, 9L, 9, 9, false, 9.0f, 9.0, "9", new byte[]{9, 9},
BigDecimal.valueOf(9))
+ .go();
+ }
+
+ @Test
+ public void testProjectingColumns() throws Exception {
+
+ String query = "select as_int, as_string from
dfs.`data-reader-primitives`";
+
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include("columns=\\[`as_int`, `as_string`\\]")
+ .match();
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("as_int", "as_string")
+ .baselineValues(null, null)
+ .baselineValues(0, "0")
+ .baselineValues(1, "1")
+ .baselineValues(2, "2")
+ .baselineValues(3, "3")
+ .baselineValues(4, "4")
+ .baselineValues(5, "5")
+ .baselineValues(6, "6")
+ .baselineValues(7, "7")
+ .baselineValues(8, "8")
+ .baselineValues(9, "9")
+ .go();
+ }
+
+ @Test
+ public void testProjectNestedColumn() throws Exception {
+ String query = "select t.a.ac.acb as acb, b from
dfs.`data-reader-nested-struct` t";
+
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include("columns=\\[`a`.`ac`.`acb`, `b`\\]")
+ .match();
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("acb", "b")
+ .baselineValues(0L, 0)
+ .baselineValues(1L, 1)
+ .baselineValues(2L, 2)
+ .baselineValues(3L, 3)
+ .baselineValues(4L, 4)
+ .baselineValues(5L, 5)
+ .baselineValues(6L, 6)
+ .baselineValues(7L, 7)
+ .baselineValues(8L, 8)
+ .baselineValues(9L, 9)
+ .go();
+ }
+
+ @Test
+ public void testPartitionPruning() throws Exception {
+ String query = "select as_int, as_string from
dfs.`data-reader-partition-values` where as_long = 1";
+
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include("numFiles\\=1")
+ .match();
+
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("as_int", "as_string")
+ .baselineValues("1", "1")
+ .go();
+ }
+
+ @Test
+ public void testEmptyResults() throws Exception {
+ String query = "select as_int, as_string from
dfs.`data-reader-partition-values` where as_long = 101";
+
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include("numFiles\\=1")
+ .match();
+
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .expectsEmptyResultSet()
+ .go();
+ }
+
+ @Test
+ public void testLimit() throws Exception {
+ String query = "select as_int, as_string from
dfs.`data-reader-partition-values` limit 1";
+
+ // Note that both of the following two limits are expected because this
format plugin supports an "artificial" limit.
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include("Limit\\(fetch\\=\\[1\\]\\)")
+ .include("limit\\=1")
+ .match();
+
+ long count = queryBuilder().sql(query).run().recordCount();
+ assertEquals(1, count);
+ }
+}
diff --git
a/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/.part-00000-f2547b28-9219-4628-8462-cc9c56edfebb-c000.snappy.parquet.crc
b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/.part-00000-f2547b28-9219-4628-8462-cc9c56edfebb-c000.snappy.parquet.crc
new file mode 100644
index 0000000000..0dacf51902
Binary files /dev/null and
b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/.part-00000-f2547b28-9219-4628-8462-cc9c56edfebb-c000.snappy.parquet.crc
differ
diff --git
a/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/.part-00001-0f755735-3b5b-449a-8f93-92a40d9f065d-c000.snappy.parquet.crc
b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/.part-00001-0f755735-3b5b-449a-8f93-92a40d9f065d-c000.snappy.parquet.crc
new file mode 100644
index 0000000000..976c62f866
Binary files /dev/null and
b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/.part-00001-0f755735-3b5b-449a-8f93-92a40d9f065d-c000.snappy.parquet.crc
differ
diff --git
a/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/_delta_log/00000000000000000000.json
b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/_delta_log/00000000000000000000.json
new file mode 100644
index 0000000000..4046b145ff
--- /dev/null
+++
b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/_delta_log/00000000000000000000.json
@@ -0,0 +1,5 @@
+{"commitInfo":{"timestamp":1603724040818,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}
+{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
+{"metaData":{"id":"975ef365-8dec-4bbf-ab88-264c10987001","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"a\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"aa\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ab\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ac\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"aca\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"nam
[...]
+{"add":{"path":"part-00000-f2547b28-9219-4628-8462-cc9c56edfebb-c000.snappy.parquet","partitionValues":{},"size":1432,"modificationTime":1603724040000,"dataChange":true}}
+{"add":{"path":"part-00001-0f755735-3b5b-449a-8f93-92a40d9f065d-c000.snappy.parquet","partitionValues":{},"size":1439,"modificationTime":1603724040000,"dataChange":true}}
diff --git
a/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/part-00000-f2547b28-9219-4628-8462-cc9c56edfebb-c000.snappy.parquet
b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/part-00000-f2547b28-9219-4628-8462-cc9c56edfebb-c000.snappy.parquet
new file mode 100644
index 0000000000..d1b86143ec
Binary files /dev/null and
b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/part-00000-f2547b28-9219-4628-8462-cc9c56edfebb-c000.snappy.parquet
differ
diff --git
a/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/part-00001-0f755735-3b5b-449a-8f93-92a40d9f065d-c000.snappy.parquet
b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/part-00001-0f755735-3b5b-449a-8f93-92a40d9f065d-c000.snappy.parquet
new file mode 100644
index 0000000000..b2114ea6d0
Binary files /dev/null and
b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/part-00001-0f755735-3b5b-449a-8f93-92a40d9f065d-c000.snappy.parquet
differ
diff --git
a/contrib/format-deltalake/src/test/resources/data-reader-partition-values/_delta_log/00000000000000000000.json
b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/_delta_log/00000000000000000000.json
new file mode 100644
index 0000000000..c0cc5a308d
--- /dev/null
+++
b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/_delta_log/00000000000000000000.json
@@ -0,0 +1,6 @@
+{"commitInfo":{"timestamp":1636147668568,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[\"as_int\",\"as_long\",\"as_byte\",\"as_short\",\"as_boolean\",\"as_float\",\"as_double\",\"as_string\",\"as_string_lit_null\",\"as_date\",\"as_timestamp\",\"as_big_decimal\"]"},"isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputBytes":"5832","numOutputRows":"3"}}}
+{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
+{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"as_int\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_long\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_byte\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_short\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_boolean\",\"type\":\"boolean\",\"nullable\":true,\"m
[...]
+{"add":{"path":"as_int=0/as_long=0/as_byte=0/as_short=0/as_boolean=true/as_float=0.0/as_double=0.0/as_string=0/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08%2011%253A11%253A11/as_big_decimal=0/part-00000-b9dc86ae-0134-4363-bd87-19cfb3403e9a.c000.snappy.parquet","partitionValues":{"as_big_decimal":"0","as_int":"0","as_byte":"0","as_long":"0","as_date":"2021-09-08","as_string":"0","as_timestamp":"2021-09-08
11:11:11","as_float":"0.0","as_short":"0","as_boolean":"true", [...]
+{"add":{"path":"as_int=__HIVE_DEFAULT_PARTITION__/as_long=__HIVE_DEFAULT_PARTITION__/as_byte=__HIVE_DEFAULT_PARTITION__/as_short=__HIVE_DEFAULT_PARTITION__/as_boolean=__HIVE_DEFAULT_PARTITION__/as_float=__HIVE_DEFAULT_PARTITION__/as_double=__HIVE_DEFAULT_PARTITION__/as_string=__HIVE_DEFAULT_PARTITION__/as_string_lit_null=__HIVE_DEFAULT_PARTITION__/as_date=__HIVE_DEFAULT_PARTITION__/as_timestamp=__HIVE_DEFAULT_PARTITION__/as_big_decimal=__HIVE_DEFAULT_PARTITION__/part-00001-9ee474eb-385b-
[...]
+{"add":{"path":"as_int=1/as_long=1/as_byte=1/as_short=1/as_boolean=false/as_float=1.0/as_double=1.0/as_string=1/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08%2011%253A11%253A11/as_big_decimal=1/part-00001-cb007d48-a9f5-40e7-adbe-60920680770f.c000.snappy.parquet","partitionValues":{"as_big_decimal":"1","as_int":"1","as_byte":"1","as_long":"1","as_date":"2021-09-08","as_string":"1","as_timestamp":"2021-09-08
11:11:11","as_float":"1.0","as_short":"1","as_boolean":"false [...]
diff --git
a/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=0/as_long=0/as_byte=0/as_short=0/as_boolean=true/as_float=0.0/as_double=0.0/as_string=0/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08
11%3A11%3A11/as_big_decimal=0/.part-00000-b9dc86ae-0134-4363-bd87-19cfb3403e9a.c000.snappy.parquet.crc
b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=0/as_long=0/as_byte=0/as_short=0/as_boolean=true/as_float=0.
[...]
new file mode 100644
index 0000000000..0191061780
Binary files /dev/null and
b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=0/as_long=0/as_byte=0/as_short=0/as_boolean=true/as_float=0.0/as_double=0.0/as_string=0/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08
11%3A11%3A11/as_big_decimal=0/.part-00000-b9dc86ae-0134-4363-bd87-19cfb3403e9a.c000.snappy.parquet.crc
differ
diff --git
a/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=0/as_long=0/as_byte=0/as_short=0/as_boolean=true/as_float=0.0/as_double=0.0/as_string=0/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08
11%3A11%3A11/as_big_decimal=0/part-00000-b9dc86ae-0134-4363-bd87-19cfb3403e9a.c000.snappy.parquet
b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=0/as_long=0/as_byte=0/as_short=0/as_boolean=true/as_float=0.0/as_
[...]
new file mode 100644
index 0000000000..e4919ae68f
Binary files /dev/null and
b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=0/as_long=0/as_byte=0/as_short=0/as_boolean=true/as_float=0.0/as_double=0.0/as_string=0/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08
11%3A11%3A11/as_big_decimal=0/part-00000-b9dc86ae-0134-4363-bd87-19cfb3403e9a.c000.snappy.parquet
differ
diff --git
a/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=1/as_long=1/as_byte=1/as_short=1/as_boolean=false/as_float=1.0/as_double=1.0/as_string=1/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08
11%3A11%3A11/as_big_decimal=1/.part-00001-cb007d48-a9f5-40e7-adbe-60920680770f.c000.snappy.parquet.crc
b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=1/as_long=1/as_byte=1/as_short=1/as_boolean=false/as_float=
[...]
new file mode 100644
index 0000000000..b79ff09a32
Binary files /dev/null and
b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=1/as_long=1/as_byte=1/as_short=1/as_boolean=false/as_float=1.0/as_double=1.0/as_string=1/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08
11%3A11%3A11/as_big_decimal=1/.part-00001-cb007d48-a9f5-40e7-adbe-60920680770f.c000.snappy.parquet.crc
differ
diff --git
a/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=1/as_long=1/as_byte=1/as_short=1/as_boolean=false/as_float=1.0/as_double=1.0/as_string=1/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08
11%3A11%3A11/as_big_decimal=1/part-00001-cb007d48-a9f5-40e7-adbe-60920680770f.c000.snappy.parquet
b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=1/as_long=1/as_byte=1/as_short=1/as_boolean=false/as_float=1.0/a
[...]
new file mode 100644
index 0000000000..b67fcb7a8c
Binary files /dev/null and
b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=1/as_long=1/as_byte=1/as_short=1/as_boolean=false/as_float=1.0/as_double=1.0/as_string=1/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08
11%3A11%3A11/as_big_decimal=1/part-00001-cb007d48-a9f5-40e7-adbe-60920680770f.c000.snappy.parquet
differ
diff --git
a/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=__HIVE_DEFAULT_PARTITION__/as_long=__HIVE_DEFAULT_PARTITION__/as_byte=__HIVE_DEFAULT_PARTITION__/as_short=__HIVE_DEFAULT_PARTITION__/as_boolean=__HIVE_DEFAULT_PARTITION__/as_float=__HIVE_DEFAULT_PARTITION__/as_double=__HIVE_DEFAULT_PARTITION__/as_string=__HIVE_DEFAULT_PARTITION__/as_string_lit_null=__HIVE_DEFAULT_PARTITION__/as_date=__HIVE_DEFAULT_PARTITION__/as_timestamp=__HIVE_DEFAULT_PARTITION_
[...]
new file mode 100644
index 0000000000..bf418f64df
Binary files /dev/null and
b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=__HIVE_DEFAULT_PARTITION__/as_long=__HIVE_DEFAULT_PARTITION__/as_byte=__HIVE_DEFAULT_PARTITION__/as_short=__HIVE_DEFAULT_PARTITION__/as_boolean=__HIVE_DEFAULT_PARTITION__/as_float=__HIVE_DEFAULT_PARTITION__/as_double=__HIVE_DEFAULT_PARTITION__/as_string=__HIVE_DEFAULT_PARTITION__/as_string_lit_null=__HIVE_DEFAULT_PARTITION__/as_date=__HIVE_DEFAULT_PARTITION__/as_timestamp=__HIVE_DE
[...]
diff --git
a/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=__HIVE_DEFAULT_PARTITION__/as_long=__HIVE_DEFAULT_PARTITION__/as_byte=__HIVE_DEFAULT_PARTITION__/as_short=__HIVE_DEFAULT_PARTITION__/as_boolean=__HIVE_DEFAULT_PARTITION__/as_float=__HIVE_DEFAULT_PARTITION__/as_double=__HIVE_DEFAULT_PARTITION__/as_string=__HIVE_DEFAULT_PARTITION__/as_string_lit_null=__HIVE_DEFAULT_PARTITION__/as_date=__HIVE_DEFAULT_PARTITION__/as_timestamp=__HIVE_DEFAULT_PARTITION_
[...]
new file mode 100644
index 0000000000..4387e52326
Binary files /dev/null and
b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=__HIVE_DEFAULT_PARTITION__/as_long=__HIVE_DEFAULT_PARTITION__/as_byte=__HIVE_DEFAULT_PARTITION__/as_short=__HIVE_DEFAULT_PARTITION__/as_boolean=__HIVE_DEFAULT_PARTITION__/as_float=__HIVE_DEFAULT_PARTITION__/as_double=__HIVE_DEFAULT_PARTITION__/as_string=__HIVE_DEFAULT_PARTITION__/as_string_lit_null=__HIVE_DEFAULT_PARTITION__/as_date=__HIVE_DEFAULT_PARTITION__/as_timestamp=__HIVE_DE
[...]
diff --git
a/contrib/format-deltalake/src/test/resources/data-reader-primitives/.part-00000-4f2f0b9f-50b3-4e7b-96a1-e2bb0f246b06-c000.snappy.parquet.crc
b/contrib/format-deltalake/src/test/resources/data-reader-primitives/.part-00000-4f2f0b9f-50b3-4e7b-96a1-e2bb0f246b06-c000.snappy.parquet.crc
new file mode 100644
index 0000000000..11f8928c27
Binary files /dev/null and
b/contrib/format-deltalake/src/test/resources/data-reader-primitives/.part-00000-4f2f0b9f-50b3-4e7b-96a1-e2bb0f246b06-c000.snappy.parquet.crc
differ
diff --git
a/contrib/format-deltalake/src/test/resources/data-reader-primitives/.part-00001-09e47b80-36c2-4475-a810-fbd8e7994971-c000.snappy.parquet.crc
b/contrib/format-deltalake/src/test/resources/data-reader-primitives/.part-00001-09e47b80-36c2-4475-a810-fbd8e7994971-c000.snappy.parquet.crc
new file mode 100644
index 0000000000..852ffc4e2f
Binary files /dev/null and
b/contrib/format-deltalake/src/test/resources/data-reader-primitives/.part-00001-09e47b80-36c2-4475-a810-fbd8e7994971-c000.snappy.parquet.crc
differ
diff --git
a/contrib/format-deltalake/src/test/resources/data-reader-primitives/_delta_log/00000000000000000000.json
b/contrib/format-deltalake/src/test/resources/data-reader-primitives/_delta_log/00000000000000000000.json
new file mode 100644
index 0000000000..9c9a0d1155
--- /dev/null
+++
b/contrib/format-deltalake/src/test/resources/data-reader-primitives/_delta_log/00000000000000000000.json
@@ -0,0 +1,5 @@
+{"commitInfo":{"timestamp":1607520163636,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true,"operationMetrics":{"numFiles":"2","numOutputBytes":"5050","numOutputRows":"11"}}}
+{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
+{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"as_int\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_long\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_byte\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_short\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_boolean\",\"type\":\"boolean\",\"nullable\":true,\"m
[...]
+{"add":{"path":"part-00000-4f2f0b9f-50b3-4e7b-96a1-e2bb0f246b06-c000.snappy.parquet","partitionValues":{},"size":2482,"modificationTime":1607520163000,"dataChange":true}}
+{"add":{"path":"part-00001-09e47b80-36c2-4475-a810-fbd8e7994971-c000.snappy.parquet","partitionValues":{},"size":2568,"modificationTime":1607520163000,"dataChange":true}}
diff --git
a/contrib/format-deltalake/src/test/resources/data-reader-primitives/part-00000-4f2f0b9f-50b3-4e7b-96a1-e2bb0f246b06-c000.snappy.parquet
b/contrib/format-deltalake/src/test/resources/data-reader-primitives/part-00000-4f2f0b9f-50b3-4e7b-96a1-e2bb0f246b06-c000.snappy.parquet
new file mode 100644
index 0000000000..b0442b0085
Binary files /dev/null and
b/contrib/format-deltalake/src/test/resources/data-reader-primitives/part-00000-4f2f0b9f-50b3-4e7b-96a1-e2bb0f246b06-c000.snappy.parquet
differ
diff --git
a/contrib/format-deltalake/src/test/resources/data-reader-primitives/part-00001-09e47b80-36c2-4475-a810-fbd8e7994971-c000.snappy.parquet
b/contrib/format-deltalake/src/test/resources/data-reader-primitives/part-00001-09e47b80-36c2-4475-a810-fbd8e7994971-c000.snappy.parquet
new file mode 100644
index 0000000000..745394ca6c
Binary files /dev/null and
b/contrib/format-deltalake/src/test/resources/data-reader-primitives/part-00001-09e47b80-36c2-4475-a810-fbd8e7994971-c000.snappy.parquet
differ
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 6fd6925aae..e728da95d3 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -56,6 +56,8 @@
<module>format-xml</module>
<module>format-image</module>
<module>format-pcapng</module>
+ <module>format-iceberg</module>
+ <module>format-deltalake</module>
<module>storage-phoenix</module>
<module>storage-googlesheets</module>
<module>storage-hive</module>
@@ -69,7 +71,6 @@
<module>storage-druid</module>
<module>storage-elasticsearch</module>
<module>storage-cassandra</module>
- <module>format-iceberg</module>
</modules>
</project>
diff --git a/distribution/pom.xml b/distribution/pom.xml
index ce39807138..f0fb9537c9 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -484,6 +484,11 @@
<artifactId>drill-iceberg-format</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.drill.contrib</groupId>
+ <artifactId>drill-deltalake-format</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</profile>
diff --git a/distribution/src/assemble/component.xml
b/distribution/src/assemble/component.xml
index 38d79f01ae..7708be4929 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -69,6 +69,7 @@
<include>org.apache.drill.contrib:drill-udfs:jar</include>
<include>org.apache.drill.contrib:drill-druid-storage:jar</include>
<include>org.apache.drill.contrib:drill-iceberg-format:jar</include>
+ <include>org.apache.drill.contrib:drill-deltalake-format:jar</include>
</includes>
<outputDirectory>jars</outputDirectory>
<useProjectArtifact>false</useProjectArtifact>
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/StatisticsProvider.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/StatisticsProvider.java
index 5379760960..93bb04b300 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/StatisticsProvider.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/StatisticsProvider.java
@@ -31,11 +31,17 @@ import org.apache.drill.common.types.Types;
import org.apache.drill.exec.expr.fn.DrillSimpleFuncHolder;
import org.apache.drill.exec.expr.fn.interpreter.InterpreterEvaluator;
import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
import org.apache.drill.exec.expr.holders.Float4Holder;
import org.apache.drill.exec.expr.holders.Float8Holder;
import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
import org.apache.drill.exec.expr.holders.TimeStampHolder;
import org.apache.drill.exec.expr.holders.ValueHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.UdfUtilities;
+import org.apache.drill.exec.util.DecimalUtility;
import org.apache.drill.exec.vector.ValueHolderHelper;
import org.apache.drill.metastore.statistics.ColumnStatistics;
import org.apache.drill.metastore.statistics.ColumnStatisticsKind;
@@ -52,10 +58,13 @@ public class StatisticsProvider<T extends Comparable<T>>
extends AbstractExprVis
private final Map<SchemaPath, ColumnStatistics<?>> columnStatMap;
private final long rowCount;
+ private final UdfUtilities udfUtilities;
- public StatisticsProvider(Map<SchemaPath, ColumnStatistics<?>>
columnStatMap, long rowCount) {
+ public StatisticsProvider(Map<SchemaPath, ColumnStatistics<?>>
columnStatMap, long rowCount,
+ UdfUtilities udfUtilities) {
this.columnStatMap = columnStatMap;
this.rowCount = rowCount;
+ this.udfUtilities = udfUtilities;
}
public long getRowCount() {
@@ -154,12 +163,13 @@ public class StatisticsProvider<T extends Comparable<T>>
extends AbstractExprVis
private ColumnStatistics<?> evalCastFunc(FunctionHolderExpression
holderExpr, ColumnStatistics<T> input) {
try {
- DrillSimpleFuncHolder funcHolder = (DrillSimpleFuncHolder)
holderExpr.getHolder();
-
- DrillSimpleFunc interpreter = funcHolder.createInterpreter();
- ValueHolder minHolder;
- ValueHolder maxHolder;
+ T minValue = ComparisonPredicate.getMinValue(input);
+ T maxValue = ComparisonPredicate.getMaxValue(input);
+ if (minValue == null && maxValue == null) {
+ // no need to evaluate cast for null arguments
+ return input;
+ }
TypeProtos.MinorType srcType =
holderExpr.args.get(0).getMajorType().getMinorType();
TypeProtos.MinorType destType = holderExpr.getMajorType().getMinorType();
@@ -171,26 +181,33 @@ public class StatisticsProvider<T extends Comparable<T>>
extends AbstractExprVis
return null; // cast func between srcType and destType is NOT allowed.
}
+ ValueHolder minHolder;
+ ValueHolder maxHolder;
+
switch (srcType) {
case INT :
- minHolder = ValueHolderHelper.getIntHolder((Integer)
ComparisonPredicate.getMinValue(input));
- maxHolder = ValueHolderHelper.getIntHolder((Integer)
ComparisonPredicate.getMaxValue(input));
+ minHolder = ValueHolderHelper.getIntHolder((Integer) minValue);
+ maxHolder = ValueHolderHelper.getIntHolder((Integer) maxValue);
break;
case BIGINT:
- minHolder = ValueHolderHelper.getBigIntHolder((Long)
ComparisonPredicate.getMinValue(input));
- maxHolder = ValueHolderHelper.getBigIntHolder((Long)
ComparisonPredicate.getMaxValue(input));
+ minHolder = ValueHolderHelper.getBigIntHolder((Long) minValue);
+ maxHolder = ValueHolderHelper.getBigIntHolder((Long) maxValue);
break;
case FLOAT4:
- minHolder = ValueHolderHelper.getFloat4Holder((Float)
ComparisonPredicate.getMinValue(input));
- maxHolder = ValueHolderHelper.getFloat4Holder((Float)
ComparisonPredicate.getMaxValue(input));
+ minHolder = ValueHolderHelper.getFloat4Holder((Float) minValue);
+ maxHolder = ValueHolderHelper.getFloat4Holder((Float) maxValue);
break;
case FLOAT8:
- minHolder = ValueHolderHelper.getFloat8Holder((Double)
ComparisonPredicate.getMinValue(input));
- maxHolder = ValueHolderHelper.getFloat8Holder((Double)
ComparisonPredicate.getMaxValue(input));
+ minHolder = ValueHolderHelper.getFloat8Holder((Double) minValue);
+ maxHolder = ValueHolderHelper.getFloat8Holder((Double) maxValue);
break;
case DATE:
- minHolder = ValueHolderHelper.getDateHolder((Long)
ComparisonPredicate.getMinValue(input));
- maxHolder = ValueHolderHelper.getDateHolder((Long)
ComparisonPredicate.getMaxValue(input));
+ minHolder = ValueHolderHelper.getDateHolder((Long) minValue);
+ maxHolder = ValueHolderHelper.getDateHolder((Long) maxValue);
+ break;
+ case VARCHAR:
+ minHolder =
ValueHolderHelper.getVarCharHolder(udfUtilities.getManagedBuffer(), (String)
minValue);
+ maxHolder =
ValueHolderHelper.getVarCharHolder(udfUtilities.getManagedBuffer(), (String)
maxValue);
break;
default:
return null;
@@ -199,10 +216,20 @@ public class StatisticsProvider<T extends Comparable<T>>
extends AbstractExprVis
ValueHolder[] args1 = {minHolder};
ValueHolder[] args2 = {maxHolder};
+ DrillSimpleFuncHolder funcHolder = (DrillSimpleFuncHolder)
holderExpr.getHolder();
+
+ DrillSimpleFunc interpreter = funcHolder.createInterpreter();
+
ValueHolder minFuncHolder =
InterpreterEvaluator.evaluateFunction(interpreter, args1, holderExpr.getName());
ValueHolder maxFuncHolder =
InterpreterEvaluator.evaluateFunction(interpreter, args2, holderExpr.getName());
switch (destType) {
+ case BIT:
+ return StatisticsProvider.getColumnStatistics(
+ ((BitHolder) minFuncHolder).value,
+ ((BitHolder) maxFuncHolder).value,
+ ColumnStatisticsKind.NULLS_COUNT.getFrom(input),
+ destType);
case INT:
return StatisticsProvider.getColumnStatistics(
((IntHolder) minFuncHolder).value,
@@ -227,12 +254,32 @@ public class StatisticsProvider<T extends Comparable<T>>
extends AbstractExprVis
((Float8Holder) maxFuncHolder).value,
ColumnStatisticsKind.NULLS_COUNT.getFrom(input),
destType);
+ case DATE:
+ return StatisticsProvider.getColumnStatistics(
+ ((DateHolder) minFuncHolder).value,
+ ((DateHolder) maxFuncHolder).value,
+ ColumnStatisticsKind.NULLS_COUNT.getFrom(input),
+ destType);
+ case TIME:
+ return StatisticsProvider.getColumnStatistics(
+ ((TimeHolder) minFuncHolder).value,
+ ((TimeHolder) maxFuncHolder).value,
+ ColumnStatisticsKind.NULLS_COUNT.getFrom(input),
+ destType);
case TIMESTAMP:
return StatisticsProvider.getColumnStatistics(
((TimeStampHolder) minFuncHolder).value,
((TimeStampHolder) maxFuncHolder).value,
ColumnStatisticsKind.NULLS_COUNT.getFrom(input),
destType);
+ case VARDECIMAL:
+ VarDecimalHolder minVarDecimalHolder = (VarDecimalHolder)
minFuncHolder;
+ VarDecimalHolder maxVarDecimalHolder = (VarDecimalHolder)
maxFuncHolder;
+ return StatisticsProvider.getColumnStatistics(
+
DecimalUtility.getBigDecimalFromDrillBuf(minVarDecimalHolder.buffer,
minVarDecimalHolder.start, minVarDecimalHolder.scale,
minVarDecimalHolder.precision),
+
DecimalUtility.getBigDecimalFromDrillBuf(maxVarDecimalHolder.buffer,
maxVarDecimalHolder.start, maxVarDecimalHolder.scale,
maxVarDecimalHolder.precision),
+ ColumnStatisticsKind.NULLS_COUNT.getFrom(input),
+ destType);
default:
return null;
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
index a11f834761..2a701ce174 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
@@ -41,6 +41,7 @@ import
org.apache.drill.exec.metastore.MetadataProviderManager;
import org.apache.drill.exec.metastore.analyze.FileMetadataInfoCollector;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.ops.UdfUtilities;
+import org.apache.drill.exec.physical.impl.scan.v3.FixedReceiver;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
@@ -1076,25 +1077,21 @@ public abstract class AbstractGroupScanWithMetadata<P
extends TableMetadataProvi
for (T metadata : metadataList) {
TupleMetadata schema = metadata.getSchema();
if (schema != null && !tableSchema.isEquivalent(schema)) {
+ schema = FixedReceiver.Builder.mergeSchemas(schema, tableSchema);
filterPredicate = getFilterPredicate(filterExpression, udfUtilities,
context, optionManager, true, true, schema);
}
Map<SchemaPath, ColumnStatistics<?>> columnsStatistics =
metadata.getColumnsStatistics();
// adds partition (dir) column statistics if it may be used during
filter evaluation
- if (metadata instanceof LocationProvider && optionManager != null) {
- LocationProvider locationProvider = (LocationProvider) metadata;
- columnsStatistics =
ParquetTableMetadataUtils.addImplicitColumnsStatistics(columnsStatistics,
- source.columns, source.getPartitionValues(locationProvider),
optionManager,
- locationProvider.getPath(),
source.supportsFileImplicitColumns());
- }
+ columnsStatistics = getImplicitColumnStatistics(optionManager,
metadata, columnsStatistics);
if (source.getNonInterestingColumnsMetadata() != null) {
columnsStatistics.putAll(source.getNonInterestingColumnsMetadata().getColumnsStatistics());
}
RowsMatch match = FilterEvaluatorUtils.matches(filterPredicate,
columnsStatistics,
TableStatisticsKind.ROW_COUNT.getValue(metadata),
- schema, schemaPathsInExpr);
+ schema, schemaPathsInExpr, udfUtilities);
if (match == RowsMatch.NONE) {
continue; // No file comply to the filter => drop the file
}
@@ -1109,6 +1106,17 @@ public abstract class AbstractGroupScanWithMetadata<P
extends TableMetadataProvi
return qualifiedMetadata;
}
+ protected <T extends Metadata> Map<SchemaPath, ColumnStatistics<?>>
getImplicitColumnStatistics(
+ OptionManager optionManager, T metadata, Map<SchemaPath,
ColumnStatistics<?>> columnsStatistics) {
+ if (metadata instanceof LocationProvider && optionManager != null) {
+ LocationProvider locationProvider = (LocationProvider) metadata;
+ columnsStatistics =
ParquetTableMetadataUtils.addImplicitColumnsStatistics(columnsStatistics,
+ source.columns, source.getPartitionValues(locationProvider),
optionManager,
+ locationProvider.getPath(), source.supportsFileImplicitColumns());
+ }
+ return columnsStatistics;
+ }
+
protected abstract B self();
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java
index 09045f550f..cefd727aaf 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java
@@ -127,6 +127,10 @@ public class DrillConstExecutor implements RexExecutor {
this.plannerSettings = plannerSettings;
}
+ public UdfUtilities getUdfUtilities() {
+ return udfUtilities;
+ }
+
@Override
@SuppressWarnings("deprecation")
public void reduce(RexBuilder rexBuilder, List<RexNode> constExps,
List<RexNode> reducedValues) {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
index 3a3800d28c..cf1fd967a1 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
@@ -663,7 +663,7 @@ public abstract class AbstractParquetGroupScan extends
AbstractGroupScanWithMeta
AbstractParquetGroupScan abstractParquetGroupScan =
(AbstractParquetGroupScan) source;
Map<Path, FileMetadata> filesToFilter = new HashMap<>(prunedFiles);
- if (!abstractParquetGroupScan.rowGroups.isEmpty()) {
+ if (!abstractParquetGroupScan.getRowGroupsMetadata().isEmpty()) {
prunedFiles.forEach((path, fileMetadata) -> {
if (abstractParquetGroupScan.rowGroups.get(path).size() == 1) {
omittedFiles.put(path, fileMetadata);
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
index 460fa6cf55..52a5e4a4a0 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
@@ -111,6 +111,10 @@ public abstract class AbstractParquetRowGroupScan extends
AbstractBase implement
@JsonProperty
public TupleMetadata getSchema() { return schema; }
+ public boolean isImplicitColumn(SchemaPath path, String
partitionColumnLabel) {
+ return path.toString().matches(partitionColumnLabel + "\\d+");
+ }
+
public abstract AbstractParquetRowGroupScan copy(List<SchemaPath> columns);
@JsonIgnore
public abstract Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry)
throws IOException;
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
index ffba7f2316..f5c252c617 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -123,7 +123,7 @@ public abstract class AbstractParquetScanBatchCreator {
String partitionColumnLabel =
context.getOptions().getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
for (SchemaPath path : schemaPathsInExpr) {
if (rowGroupScan.supportsFileImplicitColumns() &&
- path.toString().matches(partitionColumnLabel+"\\d+")) {
+ rowGroupScan.isImplicitColumn(path, partitionColumnLabel)) {
continue; // skip implicit columns like dir0, dir1
}
columnsInExpr.add(SchemaPath.getSimplePath(path.getRootSegmentPath()));
@@ -210,7 +210,7 @@ public abstract class AbstractParquetScanBatchCreator {
rowGroupSchema);
}
- matchResult = FilterEvaluatorUtils.matches(filterPredicate,
columnsStatistics, footerRowCount, rowGroupSchema, schemaPathsInExpr);
+ matchResult = FilterEvaluatorUtils.matches(filterPredicate,
columnsStatistics, footerRowCount, rowGroupSchema, schemaPathsInExpr, context);
// collect logging info
long timeToRead = pruneTimer.elapsed(TimeUnit.MICROSECONDS);
@@ -343,10 +343,7 @@ public abstract class AbstractParquetScanBatchCreator {
reader.getClass().getSimpleName());
readers.add(reader);
- List<String> partitionValues = rowGroupScan.getPartitionValues(rowGroup);
- Map<String, String> implicitValues =
- columnExplorer.populateColumns(rowGroup.getPath(), partitionValues,
- rowGroupScan.supportsFileImplicitColumns(), fs,
rowGroup.getRowGroupIndex(), rowGroup.getStart(), rowGroup.getLength());
+ Map<String, String> implicitValues = getImplicitValues(rowGroupScan,
columnExplorer, rowGroup, fs);
implicitColumns.add(implicitValues);
if (implicitValues.size() > mapWithMaxColumns.size()) {
mapWithMaxColumns = implicitValues;
@@ -354,6 +351,12 @@ public abstract class AbstractParquetScanBatchCreator {
return mapWithMaxColumns;
}
+ protected Map<String, String> getImplicitValues(AbstractParquetRowGroupScan
rowGroupScan, ColumnExplorer columnExplorer, RowGroupReadEntry rowGroup,
DrillFileSystem fs) {
+ List<String> partitionValues = rowGroupScan.getPartitionValues(rowGroup);
+ return columnExplorer.populateColumns(rowGroup.getPath(), partitionValues,
+ rowGroupScan.supportsFileImplicitColumns(), fs,
rowGroup.getRowGroupIndex(), rowGroup.getStart(), rowGroup.getLength());
+ }
+
protected abstract AbstractDrillFileSystemManager
getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager
optionManager);
private ParquetMetadata readFooter(Configuration conf, Path path,
ParquetReaderConfig readerConfig) throws IOException {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java
index 8d35b1ddcb..2c0942def6 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java
@@ -98,7 +98,7 @@ public class FilterEvaluatorUtils {
FilterPredicate<?> parquetPredicate = FilterBuilder.buildFilterPredicate(
materializedFilter, constantBoundaries, udfUtilities, true);
- return matches(parquetPredicate, columnsStatistics, rowCount, schema,
schemaPathsInExpr);
+ return matches(parquetPredicate, columnsStatistics, rowCount, schema,
schemaPathsInExpr, udfUtilities);
}
@SuppressWarnings("unchecked")
@@ -106,12 +106,13 @@ public class FilterEvaluatorUtils {
Map<SchemaPath, ColumnStatistics<?>>
columnsStatistics,
long rowCount,
TupleMetadata fileMetadata,
- Set<SchemaPath> schemaPathsInExpr) {
+ Set<SchemaPath> schemaPathsInExpr,
+ UdfUtilities udfUtilities) {
if (parquetPredicate == null) {
return RowsMatch.SOME;
}
@SuppressWarnings("rawtypes")
- StatisticsProvider<T> rangeExprEvaluator = new
StatisticsProvider(columnsStatistics, rowCount);
+ StatisticsProvider<T> rangeExprEvaluator = new
StatisticsProvider(columnsStatistics, rowCount, udfUtilities);
RowsMatch rowsMatch = parquetPredicate.matches(rangeExprEvaluator);
if (rowsMatch == RowsMatch.ALL && isMetaNotApplicable(schemaPathsInExpr,
fileMetadata)) {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 8c91200d3e..d6fade86b1 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -51,12 +51,12 @@ public class ParquetScanBatchCreator extends
AbstractParquetScanBatchCreator imp
/**
* Creates file system only if it was not created before, otherwise returns
already created instance.
*/
- private class ParquetDrillFileSystemManager extends
AbstractDrillFileSystemManager {
+ public static class ParquetDrillFileSystemManager extends
AbstractDrillFileSystemManager {
private final boolean useAsyncPageReader;
private DrillFileSystem fs;
- ParquetDrillFileSystemManager(OperatorContext operatorContext, boolean
useAsyncPageReader) {
+ public ParquetDrillFileSystemManager(OperatorContext operatorContext,
boolean useAsyncPageReader) {
super(operatorContext);
this.useAsyncPageReader = useAsyncPageReader;
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java
index 06384e5d3e..17572f546d 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java
@@ -143,6 +143,11 @@ public abstract class AbstractPluginImplementor implements
PluginImplementor {
return false;
}
+ @Override
+ public boolean artificialFilter() {
+ return false;
+ }
+
private UserException getUnsupported(String rel) {
return UserException.unsupportedError()
.message("Plugin implementor doesn't support push down for %s", rel)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java
index f4589a6037..7b54a6f3fc 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java
@@ -93,4 +93,12 @@ public interface PluginImplementor {
* to ensure returning the correct rows number.
*/
boolean artificialLimit();
+
+ /**
+ * If the plugin doesn't support native filter pushdown,
+ * but the reader can prune the set of rows to read.
+ * In this case filter operator on top of the scan should be preserved
+ * to ensure returning the correct subset of rows.
+ */
+ boolean artificialFilter();
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java
index e5e02e76c8..8a2cf7ca32 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java
@@ -24,6 +24,8 @@ import org.apache.calcite.rel.core.Filter;
import org.apache.drill.exec.store.plan.PluginImplementor;
import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+import java.util.Collections;
+
/**
* The rule that converts provided filter operator to plugin-specific
implementation.
*/
@@ -36,11 +38,15 @@ public class PluginFilterRule extends PluginConverterRule {
@Override
public RelNode convert(RelNode rel) {
Filter filter = (Filter) rel;
- return new PluginFilterRel(
- getOutConvention(),
- rel.getCluster(),
- filter.getTraitSet().replace(getOutConvention()),
- convert(filter.getInput(),
filter.getTraitSet().replace(getOutConvention())),
- filter.getCondition());
+ PluginFilterRel pluginFilterRel = new PluginFilterRel(
+ getOutConvention(),
+ rel.getCluster(),
+ filter.getTraitSet().replace(getOutConvention()),
+ convert(filter.getInput(),
filter.getTraitSet().replace(getOutConvention())),
+ filter.getCondition());
+ if (getPluginImplementor().artificialFilter()) {
+ return filter.copy(filter.getTraitSet(),
Collections.singletonList(pluginFilterRel));
+ }
+ return pluginFilterRel;
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginJoinRule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginJoinRule.java
index a71060b5db..9df71ae0e9 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginJoinRule.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginJoinRule.java
@@ -31,7 +31,7 @@ import org.apache.drill.exec.store.plan.rel.PluginJoinRel;
public class PluginJoinRule extends PluginConverterRule {
public PluginJoinRule(RelTrait in, Convention out, PluginImplementor
pluginImplementor) {
- super(Join.class, in, out, "PluginProjectRule", pluginImplementor);
+ super(Join.class, in, out, "PluginJoinRule", pluginImplementor);
}
@Override