This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 1b005f379e DRILL-8542: Support Paimon Format Plugin (#3035)
1b005f379e is described below
commit 1b005f379ee6d4cbbb24c2544f46a7a57c54dde7
Author: Letian Jiang <[email protected]>
AuthorDate: Fri Jan 30 00:52:23 2026 +0800
DRILL-8542: Support Paimon Format Plugin (#3035)
---
contrib/format-paimon/README.md | 96 +++
contrib/format-paimon/pom.xml | 123 ++++
.../exec/store/paimon/PaimonCompleteWork.java | 60 ++
.../drill/exec/store/paimon/PaimonGroupScan.java | 401 ++++++++++++
.../drill/exec/store/paimon/PaimonReadUtils.java | 107 ++++
.../drill/exec/store/paimon/PaimonSubScan.java | 232 +++++++
.../drill/exec/store/paimon/PaimonTableUtils.java | 105 ++++
.../apache/drill/exec/store/paimon/PaimonWork.java | 132 ++++
.../format/PaimonFormatLocationTransformer.java | 61 ++
.../store/paimon/format/PaimonFormatMatcher.java | 85 +++
.../store/paimon/format/PaimonFormatPlugin.java | 221 +++++++
.../paimon/format/PaimonFormatPluginConfig.java | 123 ++++
.../paimon/format/PaimonMetadataFileSelection.java | 33 +
.../store/paimon/format/PaimonMetadataType.java | 54 ++
.../paimon/plan/DrillExprToPaimonTranslator.java | 341 +++++++++++
.../store/paimon/plan/PaimonPluginImplementor.java | 203 ++++++
.../paimon/read/PaimonColumnConverterFactory.java | 183 ++++++
.../exec/store/paimon/read/PaimonRecordReader.java | 215 +++++++
.../store/paimon/read/PaimonScanBatchCreator.java | 88 +++
.../main/resources/bootstrap-format-plugins.json | 20 +
.../src/main/resources/drill-module.conf | 25 +
.../drill/exec/store/paimon/PaimonQueriesTest.java | 679 +++++++++++++++++++++
contrib/pom.xml | 1 +
distribution/pom.xml | 5 +
distribution/src/assemble/component.xml | 1 +
pom.xml | 7 +
26 files changed, 3601 insertions(+)
diff --git a/contrib/format-paimon/README.md b/contrib/format-paimon/README.md
new file mode 100644
index 0000000000..224405dacc
--- /dev/null
+++ b/contrib/format-paimon/README.md
@@ -0,0 +1,96 @@
+# Apache Paimon format plugin
+
+This format plugin enables Drill to query Apache Paimon tables.
+
+Unlike regular format plugins, the Paimon table is a folder with data and
metadata files, but Drill checks the presence
+of the `snapshot` directory and `schema` directory to ensure that the table is
a Paimon one.
+
+Drill supports reading all formats of Paimon tables currently supported via
Paimon Java API: Parquet and ORC.
+No need to provide actual table format, it will be discovered automatically.
+
+For details related to Apache Paimon table format, please refer to [official
docs](https://paimon.apache.org/).
+
+## Supported optimizations and features
+
+### Project pushdown
+
+This format plugin supports project pushdown optimization.
+
+For the case of project pushdown, only columns specified in the query will be
read. In conjunction with
+column-oriented formats like Parquet or ORC, it allows improving reading
performance significantly.
+
+### Filter pushdown
+
+This format plugin supports filter pushdown optimization.
+
+For the case of filter pushdown, expressions supported by Paimon API will be
pushed down, so only data that matches
+the filter expression will be read.
+
+### Limit pushdown
+
+This format plugin supports limit pushdown optimization.
+
+The limit is pushed down to Paimon scan planning to reduce the amount of data
read.
+
+### Querying table metadata
+
+Apache Drill provides the ability to query table metadata exposed by Paimon.
+
+At this point, Apache Paimon has the following metadata kinds:
+
+* SNAPSHOTS
+* SCHEMAS
+* FILES
+* MANIFESTS
+
+To query specific metadata, just add the `#metadata_name` suffix to the table
location, like in the following example:
+
+```sql
+SELECT *
+FROM dfs.tmp.`testTable#snapshots`;
+```
+
+### Querying specific table versions (time travel)
+
+Apache Paimon has the ability to track the table modifications and read
specific version before or after modifications
+or modifications itself.
+
+This format plugin embraces this ability and provides an easy-to-use way of
triggering it.
+
+The following ways of specifying table version are supported:
+
+- `snapshotId` - id of the specific snapshot
+- `snapshotAsOfTime` - the most recent snapshot as of the given time in
milliseconds
+
+Table function can be used to specify one of the above configs in the
following way:
+
+```sql
+SELECT *
+FROM table(dfs.tmp.testTable(type => 'paimon', snapshotId => 1));
+
+SELECT *
+FROM table(dfs.tmp.testTable(type => 'paimon', snapshotAsOfTime =>
1736345510000));
+```
+
+Note: `snapshotId` and `snapshotAsOfTime` are mutually exclusive and cannot be
specified at the same time.
+
+## Configuration
+
+The only required configuration option is:
+
+- `type` - format plugin type, should be `'paimon'`
+
+Note: `snapshotId` and `snapshotAsOfTime` for time travel queries are
specified at query time using the `table()` function.
+
+### Format config example:
+
+```json
+{
+ "type": "file",
+ "formats": {
+ "paimon": {
+ "type": "paimon"
+ }
+ }
+}
+```
diff --git a/contrib/format-paimon/pom.xml b/contrib/format-paimon/pom.xml
new file mode 100644
index 0000000000..0bc02a9da0
--- /dev/null
+++ b/contrib/format-paimon/pom.xml
@@ -0,0 +1,123 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>drill-contrib-parent</artifactId>
+ <groupId>org.apache.drill.contrib</groupId>
+ <version>1.23.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>drill-paimon-format</artifactId>
+
+ <name>Drill : Contrib : Format : Paimon</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>drill-java-exec</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-core</artifactId>
+ <version>${paimon.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-common</artifactId>
+ <version>${paimon.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-api</artifactId>
+ <version>${paimon.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-codegen-loader</artifactId>
+ <version>${paimon.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-format</artifactId>
+ <version>${paimon.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-shade-jackson-2</artifactId>
+ <version>2.14.2-0.8.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-shade-guava-30</artifactId>
+ <version>30.1.1-jre-0.8.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-shade-caffeine-2</artifactId>
+ <version>2.9.3-0.8.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-shade-netty-4</artifactId>
+ <version>4.1.100.Final-0.8.0</version>
+ </dependency>
+ <dependency>
+ <groupId>io.airlift</groupId>
+ <artifactId>aircompressor</artifactId>
+ <version>0.27</version>
+ </dependency>
+ <dependency>
+ <groupId>org.lz4</groupId>
+ <artifactId>lz4-java</artifactId>
+ <version>1.8.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.github.luben</groupId>
+ <artifactId>zstd-jni</artifactId>
+ <version>1.5.5-11</version>
+ </dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>1.1.8.4</version>
+ </dependency>
+
+ <!-- Test dependency -->
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>drill-java-exec</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill</groupId>
+ <artifactId>drill-common</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonCompleteWork.java
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonCompleteWork.java
new file mode 100644
index 0000000000..7d410ae5f3
--- /dev/null
+++
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonCompleteWork.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon;
+
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.paimon.table.source.Split;
+
+public class PaimonCompleteWork implements CompleteWork {
+ private final EndpointByteMap byteMap;
+
+ private final Split split;
+
+ private final long totalBytes;
+
+ public PaimonCompleteWork(EndpointByteMap byteMap, Split split) {
+ this.byteMap = byteMap;
+ this.split = split;
+ long rowCount = split.rowCount();
+ this.totalBytes = rowCount > 0 ? rowCount : 1;
+ }
+
+ public Split getSplit() {
+ return split;
+ }
+
+ public long getRowCount() {
+ return split.rowCount();
+ }
+
+ @Override
+ public long getTotalBytes() {
+ return totalBytes;
+ }
+
+ @Override
+ public EndpointByteMap getByteMap() {
+ return byteMap;
+ }
+
+ @Override
+ public int compareTo(CompleteWork o) {
+ return 0;
+ }
+}
diff --git
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonGroupScan.java
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonGroupScan.java
new file mode 100644
index 0000000000..d344f9aafe
--- /dev/null
+++
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonGroupScan.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ExpressionStringBuilder;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.paimon.format.PaimonFormatPlugin;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ListMultimap;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@JsonTypeName("paimon-scan")
+@SuppressWarnings("unused")
+public class PaimonGroupScan extends AbstractGroupScan {
+
+ private static final Logger logger =
LoggerFactory.getLogger(PaimonGroupScan.class);
+
+ private final PaimonFormatPlugin formatPlugin;
+
+ private final String path;
+
+ private final TupleMetadata schema;
+
+ private final LogicalExpression condition;
+
+ private final List<SchemaPath> columns;
+
+ private int maxRecords;
+
+ private List<PaimonCompleteWork> chunks;
+
+ private List<EndpointAffinity> endpointAffinities;
+
+ private ListMultimap<Integer, PaimonCompleteWork> mappings;
+
+ @JsonCreator
+ public PaimonGroupScan(
+ @JsonProperty("userName") String userName,
+ @JsonProperty("storage") StoragePluginConfig storageConfig,
+ @JsonProperty("format") FormatPluginConfig formatConfig,
+ @JsonProperty("columns") List<SchemaPath> columns,
+ @JsonProperty("schema") TupleMetadata schema,
+ @JsonProperty("path") String path,
+ @JsonProperty("condition") LogicalExpression condition,
+ @JsonProperty("maxRecords") Integer maxRecords,
+ @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException {
+ this(builder()
+ .userName(userName)
+ .formatPlugin(pluginRegistry.resolveFormat(storageConfig, formatConfig,
PaimonFormatPlugin.class))
+ .schema(schema)
+ .path(path)
+ .condition(condition)
+ .columns(columns)
+ .maxRecords(maxRecords));
+ }
+
+ private PaimonGroupScan(PaimonGroupScanBuilder builder) throws IOException {
+ super(builder.userName);
+ this.formatPlugin = builder.formatPlugin;
+ this.columns = builder.columns;
+ this.path = builder.path;
+ this.schema = builder.schema;
+ this.condition = builder.condition;
+ this.maxRecords = builder.maxRecords;
+
+ init();
+ }
+
+ private PaimonGroupScan(PaimonGroupScan that) {
+ super(that);
+ this.columns = that.columns;
+ this.formatPlugin = that.formatPlugin;
+ this.path = that.path;
+ this.condition = that.condition;
+ this.schema = that.schema;
+ this.maxRecords = that.maxRecords;
+ this.chunks = that.chunks;
+ this.endpointAffinities = that.endpointAffinities;
+ this.mappings = that.mappings;
+ }
+
+ public static PaimonGroupScanBuilder builder() {
+ return new PaimonGroupScanBuilder();
+ }
+
+ @Override
+ public PaimonGroupScan clone(List<SchemaPath> columns) {
+ try {
+ return toBuilder().columns(columns).build();
+ } catch (IOException e) {
+ throw UserException.dataReadError(e)
+ .message("Failed to clone Paimon group scan")
+ .build(logger);
+ }
+ }
+
+ @Override
+ public PaimonGroupScan applyLimit(int maxRecords) {
+ PaimonGroupScan clone = new PaimonGroupScan(this);
+ clone.maxRecords = maxRecords;
+ return clone;
+ }
+
+ @Override
+ public void applyAssignments(List<DrillbitEndpoint> endpoints) {
+ mappings = AssignmentCreator.getMappings(endpoints, chunks);
+ }
+
+ private void createMappings(List<EndpointAffinity> affinities) {
+ List<DrillbitEndpoint> endpoints = affinities.stream()
+ .map(EndpointAffinity::getEndpoint)
+ .collect(Collectors.toList());
+ applyAssignments(endpoints);
+ }
+
+ @Override
+ public PaimonSubScan getSpecificScan(int minorFragmentId) {
+ if (chunks.isEmpty()) {
+ return emptySubScan();
+ }
+
+ if (mappings == null) {
+ createMappings(endpointAffinities);
+ }
+
+ List<PaimonCompleteWork> workList = mappings.get(minorFragmentId);
+ List<PaimonWork> paimonWorkList = workList == null
+ ? Collections.emptyList()
+ : convertWorkList(workList);
+
+ PaimonSubScan subScan = PaimonSubScan.builder()
+ .userName(userName)
+ .formatPlugin(formatPlugin)
+ .columns(columns)
+ .condition(condition)
+ .schema(schema)
+ .workList(paimonWorkList)
+ .path(path)
+ .maxRecords(maxRecords)
+ .build();
+
+ subScan.setOperatorId(getOperatorId());
+ return subScan;
+ }
+
+ private PaimonSubScan emptySubScan() {
+ PaimonSubScan subScan = PaimonSubScan.builder()
+ .userName(userName)
+ .formatPlugin(formatPlugin)
+ .columns(columns)
+ .condition(condition)
+ .schema(schema)
+ .workList(Collections.emptyList())
+ .path(path)
+ .maxRecords(maxRecords)
+ .build();
+ subScan.setOperatorId(getOperatorId());
+ return subScan;
+ }
+
+ private List<PaimonWork> convertWorkList(List<PaimonCompleteWork> workList) {
+ return workList.stream()
+ .map(PaimonCompleteWork::getSplit)
+ .map(PaimonWork::new)
+ .collect(Collectors.toList());
+ }
+
+ @JsonProperty("maxRecords")
+ public int getMaxRecords() {
+ return maxRecords;
+ }
+
+ @Override
+ public int getMaxParallelizationWidth() {
+ return Math.max(chunks.size(), 1);
+ }
+
+ @Override
+ public String getDigest() {
+ return toString();
+ }
+
+ @Override
+ public ScanStats getScanStats() {
+ long rowCount = chunks.stream()
+ .mapToLong(PaimonCompleteWork::getRowCount)
+ .sum();
+ long estimatedRecords = rowCount > 0
+ ? rowCount
+ : Math.max(chunks.size(), 1) * 1_000_000L;
+ return new ScanStats(ScanStats.GroupScanProperty.NO_EXACT_ROW_COUNT,
estimatedRecords, 1, 0);
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ Preconditions.checkArgument(children.isEmpty());
+ return new PaimonGroupScan(this);
+ }
+
+ private void init() throws IOException {
+ Table table = PaimonTableUtils.loadTable(formatPlugin, path);
+ String fileFormat = new CoreOptions(table.options()).fileFormatString();
+ // Paimon supports multiple formats; Drill currently reads Parquet/ORC
only.
+ if (!"parquet".equalsIgnoreCase(fileFormat) &&
!"orc".equalsIgnoreCase(fileFormat)) {
+ throw UserException.unsupportedError()
+ .message("Paimon file format '%s' is not supported. Only parquet and
orc are supported.", fileFormat)
+ .build(logger);
+ }
+
+ RowType rowType = table.rowType();
+ ReadBuilder readBuilder = table.newReadBuilder();
+ PaimonReadUtils.applyFilter(readBuilder, rowType, condition);
+ PaimonReadUtils.applyProjection(readBuilder, rowType, columns);
+ TableScan tableScan = readBuilder.newScan();
+ List<Split> splits = tableScan.plan().splits();
+ chunks = splits.stream()
+ .map(split -> new PaimonCompleteWork(new EndpointByteMapImpl(), split))
+ .collect(Collectors.toList());
+ endpointAffinities = AffinityCreator.getAffinityMap(chunks);
+ }
+
+ @Override
+ public List<EndpointAffinity> getOperatorAffinity() {
+ if (endpointAffinities == null) {
+ logger.debug("Chunks size: {}", chunks.size());
+ endpointAffinities = AffinityCreator.getAffinityMap(chunks);
+ }
+ return endpointAffinities;
+ }
+
+ @Override
+ public boolean supportsLimitPushdown() {
+ return true;
+ }
+
+ @Override
+ @JsonProperty("columns")
+ public List<SchemaPath> getColumns() {
+ return columns;
+ }
+
+ @JsonProperty("schema")
+ public TupleMetadata getSchema() {
+ return schema;
+ }
+
+ @JsonProperty("storage")
+ public StoragePluginConfig getStorageConfig() {
+ return formatPlugin.getStorageConfig();
+ }
+
+ @JsonProperty("format")
+ public FormatPluginConfig getFormatConfig() {
+ return formatPlugin.getConfig();
+ }
+
+ @JsonProperty("path")
+ public String getPath() {
+ return path;
+ }
+
+ @JsonProperty("condition")
+ public LogicalExpression getCondition() {
+ return condition;
+ }
+
+ @JsonIgnore
+ public PaimonFormatPlugin getFormatPlugin() {
+ return formatPlugin;
+ }
+
+ @Override
+ public String getOperatorType() {
+ return "PAIMON_GROUP_SCAN";
+ }
+
+ @Override
+ public String toString() {
+ String conditionString = condition == null ? null :
ExpressionStringBuilder.toString(condition).trim();
+ return new PlanStringBuilder(this)
+ .field("path", path)
+ .field("schema", schema)
+ .field("columns", columns)
+ .field("condition", conditionString)
+ .field("maxRecords", maxRecords)
+ .toString();
+ }
+
+ public PaimonGroupScanBuilder toBuilder() {
+ return new PaimonGroupScanBuilder()
+ .userName(this.userName)
+ .formatPlugin(this.formatPlugin)
+ .schema(this.schema)
+ .path(this.path)
+ .condition(this.condition)
+ .columns(this.columns)
+ .maxRecords(this.maxRecords);
+ }
+
+ public static class PaimonGroupScanBuilder {
+ private String userName;
+
+ private PaimonFormatPlugin formatPlugin;
+
+ private TupleMetadata schema;
+
+ private String path;
+
+ private LogicalExpression condition;
+
+ private List<SchemaPath> columns;
+
+ private int maxRecords;
+
+ public PaimonGroupScanBuilder userName(String userName) {
+ this.userName = userName;
+ return this;
+ }
+
+ public PaimonGroupScanBuilder formatPlugin(PaimonFormatPlugin
formatPlugin) {
+ this.formatPlugin = formatPlugin;
+ return this;
+ }
+
+ public PaimonGroupScanBuilder schema(TupleMetadata schema) {
+ this.schema = schema;
+ return this;
+ }
+
+ public PaimonGroupScanBuilder path(String path) {
+ this.path = path;
+ return this;
+ }
+
+ public PaimonGroupScanBuilder condition(LogicalExpression condition) {
+ this.condition = condition;
+ return this;
+ }
+
+ public PaimonGroupScanBuilder columns(List<SchemaPath> columns) {
+ this.columns = columns;
+ return this;
+ }
+
+ public PaimonGroupScanBuilder maxRecords(int maxRecords) {
+ this.maxRecords = maxRecords;
+ return this;
+ }
+
+ public PaimonGroupScan build() throws IOException {
+ return new PaimonGroupScan(this);
+ }
+ }
+}
diff --git
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonReadUtils.java
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonReadUtils.java
new file mode 100644
index 0000000000..1c69642265
--- /dev/null
+++
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonReadUtils.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.paimon.plan.DrillExprToPaimonTranslator;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Utility class for common Paimon read operations such as applying filters
+ * and projections to ReadBuilder instances.
+ */
+public final class PaimonReadUtils {
+ private static final Logger logger =
LoggerFactory.getLogger(PaimonReadUtils.class);
+
+ private PaimonReadUtils() {
+ // Utility class
+ }
+
+ /**
+ * Applies a filter expression to the Paimon ReadBuilder.
+ *
+ * @param readBuilder the Paimon ReadBuilder
+ * @param rowType the table row type
+ * @param condition the filter condition
+ */
+ public static void applyFilter(ReadBuilder readBuilder, RowType rowType,
LogicalExpression condition) {
+ if (condition == null) {
+ return;
+ }
+ // Translate Drill expression into a Paimon predicate (if supported).
+ Predicate predicate = DrillExprToPaimonTranslator.translate(condition,
rowType);
+ if (predicate != null) {
+ readBuilder.withFilter(predicate);
+ }
+ }
+
+ /**
+ * Applies column projection to the Paimon ReadBuilder.
+ *
+ * @param readBuilder the Paimon ReadBuilder
+ * @param rowType the table row type
+ * @param columns the columns to project
+ */
+ public static void applyProjection(ReadBuilder readBuilder, RowType rowType,
List<SchemaPath> columns) {
+ if (columns == null || columns.isEmpty()) {
+ return;
+ }
+
+ boolean hasStar = columns.stream().anyMatch(SchemaPath::isDynamicStar);
+ if (hasStar) {
+ return;
+ }
+
+ Set<String> projectedNames = new HashSet<>();
+ List<Integer> projection = new ArrayList<>();
+ for (SchemaPath column : columns) {
+ PathSegment segment = column.getRootSegment();
+ if (segment == null || !segment.isNamed()) {
+ continue;
+ }
+ String name = segment.getNameSegment().getPath();
+ if (!projectedNames.add(name)) {
+ continue;
+ }
+ int index = rowType.getFieldIndex(name);
+ if (index < 0) {
+ throw UserException.validationError()
+ .message("Paimon column not found: %s", name)
+ .build(logger);
+ }
+ projection.add(index);
+ }
+
+ if (!projection.isEmpty()) {
+ int[] projectionArray =
projection.stream().mapToInt(Integer::intValue).toArray();
+ readBuilder.withProjection(projectionArray);
+ }
+ }
+}
diff --git
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonSubScan.java
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonSubScan.java
new file mode 100644
index 0000000000..6f94462c1c
--- /dev/null
+++
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonSubScan.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.paimon.format.PaimonFormatPlugin;
+import com.google.common.base.Preconditions;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+@JsonTypeName("paimon-read")
+@SuppressWarnings("unused")
+public class PaimonSubScan extends AbstractBase implements SubScan {
+
+ private static final String OPERATOR_TYPE = "PAIMON_SUB_SCAN";
+
+ private final PaimonFormatPlugin formatPlugin;
+
+ private final List<SchemaPath> columns;
+
+ private final LogicalExpression condition;
+
+ private final TupleMetadata schema;
+
+ private final List<PaimonWork> workList;
+
+ private final String path;
+
+ private final int maxRecords;
+
+ @JsonCreator
+ public PaimonSubScan(
+ @JsonProperty("userName") String userName,
+ @JsonProperty("storage") StoragePluginConfig storageConfig,
+ @JsonProperty("format") FormatPluginConfig formatConfig,
+ @JsonProperty("columns") List<SchemaPath> columns,
+ @JsonProperty("path") String path,
+ @JsonProperty("workList") List<PaimonWork> workList,
+ @JsonProperty("schema") TupleMetadata schema,
+ @JsonProperty("condition") LogicalExpression condition,
+ @JsonProperty("maxRecords") Integer maxRecords,
+ @JacksonInject StoragePluginRegistry pluginRegistry) {
+ this.formatPlugin = pluginRegistry.resolveFormat(storageConfig,
formatConfig, PaimonFormatPlugin.class);
+ this.columns = columns;
+ this.workList = workList;
+ this.path = path;
+ this.condition = condition;
+ this.schema = schema;
+ this.maxRecords = maxRecords;
+ }
+
+ private PaimonSubScan(PaimonSubScanBuilder builder) {
+ super(builder.userName);
+ this.formatPlugin = builder.formatPlugin;
+ this.columns = builder.columns;
+ this.condition = builder.condition;
+ this.schema = builder.schema;
+ this.workList = builder.workList;
+ this.path = builder.path;
+ this.maxRecords = builder.maxRecords;
+ }
+
+ public static PaimonSubScanBuilder builder() {
+ return new PaimonSubScanBuilder();
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(
+ PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return physicalVisitor.visitSubScan(this, value);
+ }
+
+ public List<PaimonWork> getWorkList() {
+ return workList;
+ }
+
+ public int getMaxRecords() {
+ return maxRecords;
+ }
+
+ public List<SchemaPath> getColumns() {
+ return columns;
+ }
+
+ public LogicalExpression getCondition() {
+ return condition;
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ Preconditions.checkArgument(children.isEmpty());
+ return this.toBuilder().build();
+ }
+
+ @JsonProperty("storage")
+ public StoragePluginConfig getStorageConfig() {
+ return formatPlugin.getStorageConfig();
+ }
+
+ @JsonProperty("format")
+ public FormatPluginConfig getFormatConfig() {
+ return formatPlugin.getConfig();
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ @Override
+ public String getOperatorType() {
+ return OPERATOR_TYPE;
+ }
+
+ @Override
+ public Iterator<PhysicalOperator> iterator() {
+ return Collections.emptyIterator();
+ }
+
+ public TupleMetadata getSchema() {
+ return schema;
+ }
+
+ @JsonIgnore
+ public PaimonFormatPlugin getFormatPlugin() {
+ return formatPlugin;
+ }
+
+ public PaimonSubScanBuilder toBuilder() {
+ return new PaimonSubScanBuilder()
+ .userName(this.userName)
+ .formatPlugin(this.formatPlugin)
+ .columns(this.columns)
+ .condition(this.condition)
+ .schema(this.schema)
+ .workList(this.workList)
+ .path(this.path)
+ .maxRecords(this.maxRecords);
+ }
+
+ public static class PaimonSubScanBuilder {
+ private String userName;
+
+ private PaimonFormatPlugin formatPlugin;
+
+ private List<SchemaPath> columns;
+
+ private LogicalExpression condition;
+
+ private TupleMetadata schema;
+
+ private List<PaimonWork> workList;
+
+ private String path;
+
+ private int maxRecords;
+
+ public PaimonSubScanBuilder userName(String userName) {
+ this.userName = userName;
+ return this;
+ }
+
+ public PaimonSubScanBuilder formatPlugin(PaimonFormatPlugin formatPlugin) {
+ this.formatPlugin = formatPlugin;
+ return this;
+ }
+
+ public PaimonSubScanBuilder columns(List<SchemaPath> columns) {
+ this.columns = columns;
+ return this;
+ }
+
+ public PaimonSubScanBuilder condition(LogicalExpression condition) {
+ this.condition = condition;
+ return this;
+ }
+
+ public PaimonSubScanBuilder schema(TupleMetadata schema) {
+ this.schema = schema;
+ return this;
+ }
+
+ public PaimonSubScanBuilder workList(List<PaimonWork> workList) {
+ this.workList = workList;
+ return this;
+ }
+
+ public PaimonSubScanBuilder path(String path) {
+ this.path = path;
+ return this;
+ }
+
+ public PaimonSubScanBuilder maxRecords(int maxRecords) {
+ this.maxRecords = maxRecords;
+ return this;
+ }
+
+ public PaimonSubScan build() {
+ return new PaimonSubScan(this);
+ }
+ }
+}
diff --git
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonTableUtils.java
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonTableUtils.java
new file mode 100644
index 0000000000..6979799a3b
--- /dev/null
+++
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonTableUtils.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.exec.store.paimon.format.PaimonFormatPlugin;
+import
org.apache.drill.exec.store.paimon.format.PaimonFormatLocationTransformer;
+import org.apache.drill.exec.store.paimon.format.PaimonFormatPluginConfig;
+import org.apache.drill.exec.store.paimon.format.PaimonMetadataType;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.system.SystemTableLoader;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public final class PaimonTableUtils {
+ private PaimonTableUtils() {
+ }
+
+ /**
+ * Load a Paimon table directly from a filesystem path. If a metadata suffix
is present,
+ * returns the corresponding system table; otherwise returns the data table.
+ */
+ public static Table loadTable(PaimonFormatPlugin formatPlugin, String path)
throws IOException {
+ PaimonMetadataType metadataType = extractMetadataType(path);
+ String tableLocation = metadataType == null ? path :
stripMetadataType(path);
+ Path tablePath = new Path(tableLocation);
+ Options options = new Options();
+ CatalogContext context = CatalogContext.create(options,
formatPlugin.getFsConf());
+ FileIO fileIO = FileIO.get(tablePath, context);
+ FileStoreTable table = FileStoreTableFactory.create(fileIO, tablePath);
+ // Apply time-travel and custom options at table load time.
+ Map<String, String> dynamicOptions =
buildDynamicOptions(formatPlugin.getConfig());
+ if (!dynamicOptions.isEmpty()) {
+ table = table.copy(dynamicOptions);
+ }
+ if (metadataType == null) {
+ return table;
+ }
+ Table metadataTable = SystemTableLoader.load(metadataType.getName(),
table);
+ Preconditions.checkArgument(metadataTable != null, "Unsupported metadata
table: %s", metadataType.getName());
+ return metadataTable;
+ }
+
+ private static PaimonMetadataType extractMetadataType(String location) {
+ int index =
location.lastIndexOf(PaimonFormatLocationTransformer.METADATA_SEPARATOR);
+ if (index < 0) {
+ return null;
+ }
+ String metadataName = location.substring(index + 1);
+ return PaimonMetadataType.from(metadataName);
+ }
+
+ private static String stripMetadataType(String location) {
+ int index =
location.lastIndexOf(PaimonFormatLocationTransformer.METADATA_SEPARATOR);
+ return index < 0 ? location : location.substring(0, index);
+ }
+
+ private static Map<String, String>
buildDynamicOptions(PaimonFormatPluginConfig config) {
+ Map<String, String> dynamicOptions = new HashMap<>();
+ if (config == null) {
+ return dynamicOptions;
+ }
+ if (config.getProperties() != null) {
+ dynamicOptions.putAll(config.getProperties());
+ }
+
+ Long snapshotId = config.getSnapshotId();
+ Long snapshotAsOfTime = config.getSnapshotAsOfTime();
+ Preconditions.checkArgument(snapshotId == null || snapshotAsOfTime == null,
+ "Both 'snapshotId' and 'snapshotAsOfTime' cannot be specified");
+ if (snapshotId != null) {
+ dynamicOptions.put(CoreOptions.SCAN_MODE.key(),
CoreOptions.StartupMode.FROM_SNAPSHOT.toString());
+ dynamicOptions.put(CoreOptions.SCAN_SNAPSHOT_ID.key(),
snapshotId.toString());
+ } else if (snapshotAsOfTime != null) {
+ dynamicOptions.put(CoreOptions.SCAN_MODE.key(),
CoreOptions.StartupMode.FROM_TIMESTAMP.toString());
+ dynamicOptions.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(),
snapshotAsOfTime.toString());
+ }
+
+ return dynamicOptions;
+ }
+}
diff --git
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonWork.java
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonWork.java
new file mode 100644
index 0000000000..ff06d60de8
--- /dev/null
+++
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonWork.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.paimon.table.source.Split;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Base64;
+import java.util.Objects;
+
+@JsonSerialize(using = PaimonWork.PaimonWorkSerializer.class)
+@JsonDeserialize(using = PaimonWork.PaimonWorkDeserializer.class)
+public class PaimonWork {
+ private final Split split;
+
+ public PaimonWork(Split split) {
+ this.split = split;
+ }
+
+ public Split getSplit() {
+ return split;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PaimonWork that = (PaimonWork) o;
+ return Objects.equals(split, that.split);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(split);
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("split", split)
+ .toString();
+ }
+
+ public static class PaimonWorkDeserializer extends
StdDeserializer<PaimonWork> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(PaimonWorkDeserializer.class);
+
+ public PaimonWorkDeserializer() {
+ super(PaimonWork.class);
+ }
+
+ @Override
+ public PaimonWork deserialize(JsonParser p, DeserializationContext ctxt)
throws IOException {
+ JsonNode node = p.getCodec().readTree(p);
+ String splitString = node.get(PaimonWorkSerializer.SPLIT_FIELD).asText();
+
+ byte[] decoded = Base64.getDecoder().decode(splitString);
+ try (ObjectInputStream ois = new ObjectInputStream(new
ByteArrayInputStream(decoded))) {
+ Object split = ois.readObject();
+ if (!(split instanceof Split)) {
+ throw UserException.dataReadError()
+ .message("Deserialized object is not a Paimon Split: %s",
split.getClass().getName())
+ .build(logger);
+ }
+ return new PaimonWork((Split) split);
+ } catch (ClassNotFoundException e) {
+ logger.error("Failed to deserialize Paimon Split: {}", e.getMessage(),
e);
+ throw UserException.dataReadError(e)
+ .message("Failed to deserialize Paimon Split: %s", e.getMessage())
+ .build(logger);
+ }
+ }
+ }
+
+ public static class PaimonWorkSerializer extends StdSerializer<PaimonWork> {
+
+ public static final String SPLIT_FIELD = "split";
+
+ public PaimonWorkSerializer() {
+ super(PaimonWork.class);
+ }
+
+ @Override
+ public void serialize(PaimonWork value, JsonGenerator gen,
SerializerProvider provider) throws IOException {
+ gen.writeStartObject();
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(value.split);
+ oos.flush();
+ gen.writeStringField(SPLIT_FIELD,
Base64.getEncoder().encodeToString(baos.toByteArray()));
+ }
+ gen.writeEndObject();
+ }
+ }
+
+}
diff --git
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatLocationTransformer.java
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatLocationTransformer.java
new file mode 100644
index 0000000000..43b0e58854
--- /dev/null
+++
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatLocationTransformer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FormatLocationTransformer;
+
+import java.util.function.Function;
+
+public class PaimonFormatLocationTransformer implements
FormatLocationTransformer {
+ public static final FormatLocationTransformer INSTANCE = new
PaimonFormatLocationTransformer();
+
+ // Paimon metadata tables are addressed via suffix: e.g.
/path/to/table#snapshots
+ public static final String METADATA_SEPARATOR = "#";
+
+ @Override
+ public boolean canTransform(String location) {
+ PaimonMetadataType metadataType = getMetadataType(location);
+ if (metadataType == null) {
+ return false;
+ }
+ return true;
+ }
+
+ private PaimonMetadataType getMetadataType(String location) {
+ String metadataType = StringUtils.substringAfterLast(location,
METADATA_SEPARATOR);
+ if (StringUtils.isNotEmpty(metadataType)) {
+ return PaimonMetadataType.from(metadataType);
+ }
+ return null;
+ }
+
+ @Override
+ public FileSelection transform(String location, Function<String,
FileSelection> selectionFactory) {
+ PaimonMetadataType metadataType = getMetadataType(location);
+ location = StringUtils.substringBeforeLast(location, METADATA_SEPARATOR);
+ FileSelection fileSelection = selectionFactory.apply(location);
+ if (fileSelection == null) {
+ return null;
+ }
+ // Preserve metadata type while keeping the base path selection.
+ return new PaimonMetadataFileSelection(fileSelection, metadataType);
+ }
+
+}
diff --git
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatMatcher.java
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatMatcher.java
new file mode 100644
index 0000000000..bacdbe3c73
--- /dev/null
+++
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatMatcher.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatLocationTransformer;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.plan.rel.PluginDrillTable;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+public class PaimonFormatMatcher extends FormatMatcher {
+ private static final String SNAPSHOT_DIR_NAME = "snapshot";
+ private static final String SCHEMA_DIR_NAME = "schema";
+
+ private final PaimonFormatPlugin formatPlugin;
+
+ public PaimonFormatMatcher(PaimonFormatPlugin formatPlugin) {
+ this.formatPlugin = formatPlugin;
+ }
+
+ @Override
+ public boolean supportDirectoryReads() {
+ return true;
+ }
+
+ @Override
+ public DrillTable isReadable(DrillFileSystem fs, FileSelection selection,
FileSystemPlugin fsPlugin,
+ String storageEngineName, SchemaConfig schemaConfig) throws IOException {
+ Path selectionRoot = selection.getSelectionRoot();
+ Path snapshotDir = new Path(selectionRoot, SNAPSHOT_DIR_NAME);
+ Path schemaDir = new Path(selectionRoot, SCHEMA_DIR_NAME);
+ if (fs.isDirectory(selectionRoot)
+ && fs.exists(snapshotDir) && fs.isDirectory(snapshotDir)
+ && fs.exists(schemaDir) && fs.isDirectory(schemaDir)) {
+ FormatSelection formatSelection = new
FormatSelection(formatPlugin.getConfig(), selection);
+ return new PluginDrillTable(fsPlugin, storageEngineName,
schemaConfig.getUserName(),
+ formatSelection, formatPlugin.getConvention());
+ }
+ return null;
+ }
+
+ @Override
+ public boolean isFileReadable(DrillFileSystem fs, FileStatus status) {
+ return false;
+ }
+
+ @Override
+ public FormatPlugin getFormatPlugin() {
+ return formatPlugin;
+ }
+
+ @Override
+ public int priority() {
+ return HIGH_PRIORITY;
+ }
+
+ @Override
+ public FormatLocationTransformer getFormatLocationTransformer() {
+ return PaimonFormatLocationTransformer.INSTANCE;
+ }
+}
diff --git
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPlugin.java
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPlugin.java
new file mode 100644
index 0000000000..d778b3f88c
--- /dev/null
+++
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPlugin.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.planner.common.DrillStatsTable;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.PluginRulesProviderImpl;
+import org.apache.drill.exec.store.StoragePluginRulesSupplier;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.paimon.PaimonGroupScan;
+import org.apache.drill.exec.store.paimon.plan.PaimonPluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginRel;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PaimonFormatPlugin implements FormatPlugin {
+
+ private static final String PAIMON_CONVENTION_PREFIX = "PAIMON.";
+
+ private static final AtomicInteger NEXT_ID = new AtomicInteger(0);
+
+ private final FileSystemConfig storageConfig;
+
+ private final PaimonFormatPluginConfig config;
+
+ private final Configuration fsConf;
+
+ private final DrillbitContext context;
+
+ private final String name;
+
+ private final PaimonFormatMatcher matcher;
+
+ private final StoragePluginRulesSupplier storagePluginRulesSupplier;
+
+ public PaimonFormatPlugin(
+ String name,
+ DrillbitContext context,
+ Configuration fsConf,
+ FileSystemConfig storageConfig,
+ PaimonFormatPluginConfig config) {
+ this.storageConfig = storageConfig;
+ this.config = config;
+ this.fsConf = fsConf;
+ this.context = context;
+ this.name = name;
+ this.matcher = new PaimonFormatMatcher(this);
+ this.storagePluginRulesSupplier = storagePluginRulesSupplier(name +
NEXT_ID.getAndIncrement());
+ }
+
+ private static StoragePluginRulesSupplier storagePluginRulesSupplier(String
name) {
+ Convention convention = new Convention.Impl(PAIMON_CONVENTION_PREFIX +
name, PluginRel.class);
+ return StoragePluginRulesSupplier.builder()
+ .rulesProvider(new PluginRulesProviderImpl(convention,
PaimonPluginImplementor::new))
+ .supportsFilterPushdown(true)
+ .supportsProjectPushdown(true)
+ .supportsLimitPushdown(true)
+ .convention(convention)
+ .build();
+ }
+
+ @Override
+ public boolean supportsRead() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsWrite() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsAutoPartitioning() {
+ return false;
+ }
+
+ @Override
+ public FormatMatcher getMatcher() {
+ return matcher;
+ }
+
+ @Override
+ public AbstractWriter getWriter(PhysicalOperator child, String location,
List<String> partitionColumns) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<? extends RelOptRule> getOptimizerRules(PlannerPhase phase) {
+ switch (phase) {
+ case PHYSICAL:
+ case LOGICAL:
+ return storagePluginRulesSupplier.getOptimizerRules();
+ case LOGICAL_PRUNE_AND_JOIN:
+ case LOGICAL_PRUNE:
+ case PARTITION_PRUNING:
+ case JOIN_PLANNING:
+ default:
+ return Collections.emptySet();
+ }
+ }
+
+ @Override
+ public AbstractGroupScan getGroupScan(String userName, FileSelection
selection, List<SchemaPath> columns) throws IOException {
+ return PaimonGroupScan.builder()
+ .userName(userName)
+ .formatPlugin(this)
+ .path(getPath(selection))
+ .columns(columns)
+ .maxRecords(-1)
+ .build();
+ }
+
+ @Override
+ public AbstractGroupScan getGroupScan(String userName, FileSelection
selection,
+ List<SchemaPath> columns, MetadataProviderManager metadataProviderManager)
throws IOException {
+ SchemaProvider schemaProvider =
metadataProviderManager.getSchemaProvider();
+ TupleMetadata schema = null;
+ if (schemaProvider != null) {
+ schema = schemaProvider.read().getSchema();
+ }
+ return PaimonGroupScan.builder()
+ .userName(userName)
+ .formatPlugin(this)
+ .schema(schema)
+ .path(getPath(selection))
+ .columns(columns)
+ .maxRecords(-1)
+ .build();
+ }
+
+ @Override
+ public boolean supportsStatistics() {
+ return false;
+ }
+
+ @Override
+ public DrillStatsTable.TableStatistics readStatistics(FileSystem fs, Path
statsTablePath) {
+ throw new UnsupportedOperationException("unimplemented");
+ }
+
+ @Override
+ public void writeStatistics(DrillStatsTable.TableStatistics statistics,
FileSystem fs, Path statsTablePath) {
+ throw new UnsupportedOperationException("unimplemented");
+ }
+
+ @Override
+ public PaimonFormatPluginConfig getConfig() {
+ return config;
+ }
+
+ @Override
+ public FileSystemConfig getStorageConfig() {
+ return storageConfig;
+ }
+
+ @Override
+ public Configuration getFsConf() {
+ return fsConf;
+ }
+
+ @Override
+ public DrillbitContext getContext() {
+ return context;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ public Convention getConvention() {
+ return storagePluginRulesSupplier.convention();
+ }
+
+ private String getPath(FileSelection selection) {
+ String path = selection.getSelectionRoot().toString();
+ if (selection instanceof PaimonMetadataFileSelection) {
+ PaimonMetadataFileSelection metadataFileSelection =
(PaimonMetadataFileSelection) selection;
+ // Map dfs.`/path/table#snapshots` to Paimon system table.
+ path = String.format("%s%s%s", path,
PaimonFormatLocationTransformer.METADATA_SEPARATOR,
+ metadataFileSelection.getMetadataType().getName());
+ }
+ return path;
+ }
+
+}
diff --git
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPluginConfig.java
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPluginConfig.java
new file mode 100644
index 0000000000..f30c0b4f82
--- /dev/null
+++
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPluginConfig.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+import java.util.Map;
+import java.util.Objects;
+
+@JsonTypeName(PaimonFormatPluginConfig.NAME)
+@JsonDeserialize(builder =
PaimonFormatPluginConfig.PaimonFormatPluginConfigBuilder.class)
+public class PaimonFormatPluginConfig implements FormatPluginConfig {
+
+ public static final String NAME = "paimon";
+
+ private final Map<String, String> properties;
+
+ // Time travel: load a specific snapshot id.
+ private final Long snapshotId;
+
+ // Time travel: load the latest snapshot at or before the given timestamp
(millis).
+ private final Long snapshotAsOfTime;
+
+ @JsonCreator
+ public PaimonFormatPluginConfig(PaimonFormatPluginConfigBuilder builder) {
+ this.properties = builder.properties;
+ this.snapshotId = builder.snapshotId;
+ this.snapshotAsOfTime = builder.snapshotAsOfTime;
+ }
+
+ public static PaimonFormatPluginConfigBuilder builder() {
+ return new PaimonFormatPluginConfigBuilder();
+ }
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ public Long getSnapshotId() {
+ return snapshotId;
+ }
+
+ public Long getSnapshotAsOfTime() {
+ return snapshotAsOfTime;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PaimonFormatPluginConfig that = (PaimonFormatPluginConfig) o;
+ return Objects.equals(properties, that.properties)
+ && Objects.equals(snapshotId, that.snapshotId)
+ && Objects.equals(snapshotAsOfTime, that.snapshotAsOfTime);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(properties, snapshotId, snapshotAsOfTime);
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("properties", properties)
+ .field("snapshotId", snapshotId)
+ .field("snapshotAsOfTime", snapshotAsOfTime)
+ .toString();
+ }
+
+ @JsonPOJOBuilder(withPrefix = "")
+ public static class PaimonFormatPluginConfigBuilder {
+ private Map<String, String> properties;
+
+ private Long snapshotId;
+
+ private Long snapshotAsOfTime;
+
+ public PaimonFormatPluginConfigBuilder properties(Map<String, String>
properties) {
+ this.properties = properties;
+ return this;
+ }
+
+ public PaimonFormatPluginConfigBuilder snapshotId(Long snapshotId) {
+ this.snapshotId = snapshotId;
+ return this;
+ }
+
+ public PaimonFormatPluginConfigBuilder snapshotAsOfTime(Long
snapshotAsOfTime) {
+ this.snapshotAsOfTime = snapshotAsOfTime;
+ return this;
+ }
+
+ public PaimonFormatPluginConfig build() {
+ return new PaimonFormatPluginConfig(this);
+ }
+ }
+
+}
diff --git
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataFileSelection.java
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataFileSelection.java
new file mode 100644
index 0000000000..ed90d18d92
--- /dev/null
+++
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataFileSelection.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import org.apache.drill.exec.store.dfs.FileSelection;
+
+public class PaimonMetadataFileSelection extends FileSelection {
+ private final PaimonMetadataType metadataType;
+
+ protected PaimonMetadataFileSelection(FileSelection selection,
PaimonMetadataType metadataType) {
+ super(selection);
+ this.metadataType = metadataType;
+ }
+
+ public PaimonMetadataType getMetadataType() {
+ return metadataType;
+ }
+}
diff --git
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataType.java
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataType.java
new file mode 100644
index 0000000000..0cdec7345c
--- /dev/null
+++
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataType.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import java.util.Locale;
+
+/**
+ * Paimon system tables exposed via table path suffix, e.g.
/path/table#snapshots.
+ */
+public enum PaimonMetadataType {
+ SNAPSHOTS("snapshots"),
+ SCHEMAS("schemas"),
+ FILES("files"),
+ MANIFESTS("manifests");
+
+ private final String name;
+
+ PaimonMetadataType(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public static PaimonMetadataType from(String value) {
+ if (value == null) {
+ return null;
+ }
+ String normalized = value.toLowerCase(Locale.ROOT);
+ for (PaimonMetadataType type : values()) {
+ if (type.name.equals(normalized)) {
+ return type;
+ }
+ }
+ return null;
+ }
+
+}
diff --git
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/plan/DrillExprToPaimonTranslator.java
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/plan/DrillExprToPaimonTranslator.java
new file mode 100644
index 0000000000..16942fe4c8
--- /dev/null
+++
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/plan/DrillExprToPaimonTranslator.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.plan;
+
+import org.apache.drill.common.FunctionNames;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.NullExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.BooleanOperator;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.common.expression.visitors.ExprVisitor;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Optional;
+
+public class DrillExprToPaimonTranslator
+ extends AbstractExprVisitor<Object, DrillExprToPaimonTranslator.Context,
RuntimeException> {
+
+ // Translate Drill logical expressions into Paimon predicates for filter
pushdown.
+ public static final ExprVisitor<Object, Context, RuntimeException> INSTANCE =
+ new DrillExprToPaimonTranslator();
+
+ public static Predicate translate(LogicalExpression expression, RowType
rowType) {
+ if (expression == null || rowType == null) {
+ return null;
+ }
+ Object value = expression.accept(INSTANCE, new Context(rowType));
+ return value instanceof Predicate ? (Predicate) value : null;
+ }
+
+ @Override
+ public Object visitBooleanOperator(BooleanOperator op, Context context) {
+ if (!FunctionNames.AND.equals(op.getName()) &&
!FunctionNames.OR.equals(op.getName())) {
+ return null;
+ }
+ Predicate result = null;
+ for (LogicalExpression arg : op.args()) {
+ Predicate predicate = asPredicate(arg.accept(this, context));
+ if (predicate == null) {
+ return null;
+ }
+ if (result == null) {
+ result = predicate;
+ } else {
+ result = FunctionNames.AND.equals(op.getName())
+ ? PredicateBuilder.and(result, predicate)
+ : PredicateBuilder.or(result, predicate);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public Object visitFunctionCall(FunctionCall call, Context context) {
+ switch (call.getName()) {
+ case FunctionNames.AND: {
+ Predicate left = asPredicate(call.arg(0).accept(this, context));
+ Predicate right = asPredicate(call.arg(1).accept(this, context));
+ if (left != null && right != null) {
+ return PredicateBuilder.and(left, right);
+ }
+ return null;
+ }
+ case FunctionNames.OR: {
+ Predicate left = asPredicate(call.arg(0).accept(this, context));
+ Predicate right = asPredicate(call.arg(1).accept(this, context));
+ if (left != null && right != null) {
+ return PredicateBuilder.or(left, right);
+ }
+ return null;
+ }
+ case FunctionNames.NOT: {
+ Predicate predicate = asPredicate(call.arg(0).accept(this, context));
+ if (predicate == null) {
+ return null;
+ }
+ Optional<Predicate> negated = predicate.negate();
+ return negated.orElse(null);
+ }
+ case FunctionNames.IS_NULL: {
+ Object arg = call.arg(0).accept(this, context);
+ if (arg instanceof SchemaPath) {
+ return buildIsNullPredicate(context, (SchemaPath) arg, true);
+ }
+ return null;
+ }
+ case FunctionNames.IS_NOT_NULL: {
+ Object arg = call.arg(0).accept(this, context);
+ if (arg instanceof SchemaPath) {
+ return buildIsNullPredicate(context, (SchemaPath) arg, false);
+ }
+ return null;
+ }
+ case FunctionNames.LT:
+ return buildComparisonPredicate(context, call.arg(0), call.arg(1),
Comparison.LT);
+ case FunctionNames.LE:
+ return buildComparisonPredicate(context, call.arg(0), call.arg(1),
Comparison.LE);
+ case FunctionNames.GT:
+ return buildComparisonPredicate(context, call.arg(0), call.arg(1),
Comparison.GT);
+ case FunctionNames.GE:
+ return buildComparisonPredicate(context, call.arg(0), call.arg(1),
Comparison.GE);
+ case FunctionNames.EQ:
+ return buildComparisonPredicate(context, call.arg(0), call.arg(1),
Comparison.EQ);
+ case FunctionNames.NE:
+ return buildComparisonPredicate(context, call.arg(0), call.arg(1),
Comparison.NE);
+ default:
+ return null;
+ }
+ }
+
+ @Override
+ public Object visitSchemaPath(SchemaPath path, Context context) {
+ return path;
+ }
+
+ @Override
+ public Object visitBooleanConstant(ValueExpressions.BooleanExpression e,
Context context) {
+ return new LiteralValue(e.getBoolean());
+ }
+
+ @Override
+ public Object visitFloatConstant(ValueExpressions.FloatExpression fExpr,
Context context) {
+ return new LiteralValue(fExpr.getFloat());
+ }
+
+ @Override
+ public Object visitIntConstant(ValueExpressions.IntExpression intExpr,
Context context) {
+ return new LiteralValue(intExpr.getInt());
+ }
+
+ @Override
+ public Object visitLongConstant(ValueExpressions.LongExpression longExpr,
Context context) {
+ return new LiteralValue(longExpr.getLong());
+ }
+
+ @Override
+ public Object visitDoubleConstant(ValueExpressions.DoubleExpression dExpr,
Context context) {
+ return new LiteralValue(dExpr.getDouble());
+ }
+
+ @Override
+ public Object visitDecimal9Constant(ValueExpressions.Decimal9Expression
decExpr, Context context) {
+ return new LiteralValue(BigDecimal.valueOf(decExpr.getIntFromDecimal(),
decExpr.getScale()));
+ }
+
+ @Override
+ public Object visitDecimal18Constant(ValueExpressions.Decimal18Expression
decExpr, Context context) {
+ return new LiteralValue(BigDecimal.valueOf(decExpr.getLongFromDecimal(),
decExpr.getScale()));
+ }
+
+ @Override
+ public Object visitDecimal28Constant(ValueExpressions.Decimal28Expression
decExpr, Context context) {
+ return new LiteralValue(decExpr.getBigDecimal());
+ }
+
+ @Override
+ public Object visitDecimal38Constant(ValueExpressions.Decimal38Expression
decExpr, Context context) {
+ return new LiteralValue(decExpr.getBigDecimal());
+ }
+
+ @Override
+ public Object visitVarDecimalConstant(ValueExpressions.VarDecimalExpression
decExpr, Context context) {
+ return new LiteralValue(decExpr.getBigDecimal());
+ }
+
+ @Override
+ public Object visitDateConstant(ValueExpressions.DateExpression dateExpr,
Context context) {
+ return new LiteralValue(new Date(dateExpr.getDate()));
+ }
+
+ @Override
+ public Object visitTimeConstant(ValueExpressions.TimeExpression timeExpr,
Context context) {
+ return new LiteralValue(new Time(timeExpr.getTime()));
+ }
+
+ @Override
+ public Object visitTimeStampConstant(ValueExpressions.TimeStampExpression
timestampExpr, Context context) {
+ return new LiteralValue(new Timestamp(timestampExpr.getTimeStamp()));
+ }
+
+ @Override
+ public Object visitQuotedStringConstant(ValueExpressions.QuotedString e,
Context context) {
+ return new LiteralValue(e.getString());
+ }
+
+ @Override
+ public Object visitNullExpression(NullExpression e, Context context) {
+ return new LiteralValue(null);
+ }
+
+ @Override
+ public Object visitUnknown(LogicalExpression e, Context context) {
+ return null;
+ }
+
+ private Predicate buildComparisonPredicate(Context context,
LogicalExpression leftExpr,
+ LogicalExpression rightExpr, Comparison comparison) {
+ Object left = leftExpr.accept(this, context);
+ Object right = rightExpr.accept(this, context);
+ return buildComparisonPredicate(context, left, right, comparison);
+ }
+
+ private Predicate buildComparisonPredicate(Context context, Object left,
Object right, Comparison comparison) {
+ if (left instanceof SchemaPath && right instanceof LiteralValue) {
+ return buildPredicate(context, (SchemaPath) left, comparison,
(LiteralValue) right);
+ }
+ if (right instanceof SchemaPath && left instanceof LiteralValue) {
+ return buildPredicate(context, (SchemaPath) right, comparison.flip(),
(LiteralValue) left);
+ }
+ return null;
+ }
+
+ private Predicate buildPredicate(Context context, SchemaPath path,
Comparison comparison, LiteralValue literalValue) {
+ int index = columnIndex(context, path);
+ if (index < 0) {
+ return null;
+ }
+ Object value = literalValue.value();
+ if (value == null) {
+ return null;
+ }
+ DataField field = context.rowType.getFields().get(index);
+ Object internalValue;
+ try {
+ internalValue = PredicateBuilder.convertJavaObject(field.type(), value);
+ } catch (RuntimeException e) {
+ return null;
+ }
+ switch (comparison) {
+ case EQ:
+ return context.predicateBuilder.equal(index, internalValue);
+ case NE:
+ return context.predicateBuilder.notEqual(index, internalValue);
+ case LT:
+ return context.predicateBuilder.lessThan(index, internalValue);
+ case LE:
+ return context.predicateBuilder.lessOrEqual(index, internalValue);
+ case GT:
+ return context.predicateBuilder.greaterThan(index, internalValue);
+ case GE:
+ return context.predicateBuilder.greaterOrEqual(index, internalValue);
+ default:
+ return null;
+ }
+ }
+
+ private Predicate buildIsNullPredicate(Context context, SchemaPath path,
boolean isNull) {
+ int index = columnIndex(context, path);
+ if (index < 0) {
+ return null;
+ }
+ return isNull
+ ? context.predicateBuilder.isNull(index)
+ : context.predicateBuilder.isNotNull(index);
+ }
+
+ private int columnIndex(Context context, SchemaPath path) {
+ PathSegment segment = path.getRootSegment();
+ if (segment == null || !segment.isNamed() || segment.getChild() != null) {
+ return -1;
+ }
+ return
context.predicateBuilder.indexOf(segment.getNameSegment().getPath());
+ }
+
+ private Predicate asPredicate(Object value) {
+ return value instanceof Predicate ? (Predicate) value : null;
+ }
+
+ private enum Comparison {
+ EQ,
+ NE,
+ LT,
+ LE,
+ GT,
+ GE;
+
+ public Comparison flip() {
+ switch (this) {
+ case LT:
+ return GT;
+ case LE:
+ return GE;
+ case GT:
+ return LT;
+ case GE:
+ return LE;
+ case EQ:
+ case NE:
+ default:
+ return this;
+ }
+ }
+ }
+
+ public static class Context {
+ private final RowType rowType;
+ private final PredicateBuilder predicateBuilder;
+
+ public Context(RowType rowType) {
+ this.rowType = rowType;
+ this.predicateBuilder = new PredicateBuilder(rowType);
+ }
+ }
+
+ private static class LiteralValue {
+ private final Object value;
+
+ private LiteralValue(Object value) {
+ this.value = value;
+ }
+
+ private Object value() {
+ return value;
+ }
+ }
+}
diff --git
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/plan/PaimonPluginImplementor.java
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/plan/PaimonPluginImplementor.java
new file mode 100644
index 0000000000..e1af514428
--- /dev/null
+++
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/plan/PaimonPluginImplementor.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.plan;
+
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Util;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.paimon.PaimonGroupScan;
+import org.apache.drill.exec.store.paimon.PaimonTableUtils;
+import org.apache.drill.exec.store.plan.AbstractPluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.RowType;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.stream.Collectors;
+public class PaimonPluginImplementor extends AbstractPluginImplementor {
+
+ private PaimonGroupScan groupScan;
+
+ @Override
+ public void implement(StoragePluginTableScan scan) {
+ groupScan = (PaimonGroupScan) scan.getGroupScan();
+ }
+
+ @Override
+ public void implement(PluginFilterRel filter) throws IOException {
+ visitChild(filter.getInput());
+
+ RexNode condition = filter.getCondition();
+ LogicalExpression expression = DrillOptiq.toDrill(
+ new
DrillParseContext(PrelUtil.getPlannerSettings(filter.getCluster().getPlanner())),
+ filter.getInput(),
+ condition);
+ groupScan = groupScan.toBuilder().condition(expression).build();
+ }
+
+ @Override
+ public void implement(PluginProjectRel project) throws IOException {
+ visitChild(project.getInput());
+
+ DrillParseContext context = new
DrillParseContext(PrelUtil.getPlannerSettings(project.getCluster().getPlanner()));
+ RelNode input = project.getInput();
+
+ List<SchemaPath> projects = project.getProjects().stream()
+ .map(e -> (SchemaPath) DrillOptiq.toDrill(context, input, e))
+ .collect(Collectors.toList());
+ groupScan = groupScan.clone(projects);
+ }
+
+ @Override
+ public void implement(PluginLimitRel limit) throws IOException {
+ visitChild(limit.getInput());
+ int maxRecords = getArtificialLimit(limit);
+ if (maxRecords >= 0) {
+ groupScan = groupScan.applyLimit(maxRecords);
+ }
+ }
+
+ @Override
+ public boolean canImplement(Filter filter) {
+ RexNode condition = filter.getCondition();
+ LogicalExpression logicalExpression = DrillOptiq.toDrill(
+ new
DrillParseContext(PrelUtil.getPlannerSettings(filter.getCluster().getPlanner())),
+ filter.getInput(),
+ condition);
+ GroupScan scan = findGroupScan(filter);
+ if (!(scan instanceof PaimonGroupScan)) {
+ return false;
+ }
+ PaimonGroupScan paimonGroupScan = (PaimonGroupScan) scan;
+ try {
+ Table table =
PaimonTableUtils.loadTable(paimonGroupScan.getFormatPlugin(),
paimonGroupScan.getPath());
+ RowType rowType = table.rowType();
+ return DrillExprToPaimonTranslator.translate(logicalExpression, rowType)
!= null;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean canImplement(DrillLimitRelBase limit) {
+ if (hasPluginGroupScan(limit)) {
+ FirstLimitFinder finder = new FirstLimitFinder();
+ limit.getInput().accept(finder);
+ int oldLimit = getArtificialLimit(finder.getFetch(), finder.getOffset());
+ int newLimit = getArtificialLimit(limit);
+ return newLimit >= 0 && (oldLimit < 0 || newLimit < oldLimit);
+ }
+ return false;
+ }
+
+ @Override
+ public boolean artificialLimit() {
+ return true;
+ }
+
+ @Override
+ public boolean canImplement(Project project) {
+ return hasPluginGroupScan(project);
+ }
+
+ @Override
+ public boolean splitProject(Project project) {
+ return true;
+ }
+
+ @Override
+ protected boolean hasPluginGroupScan(RelNode node) {
+ return findGroupScan(node) instanceof PaimonGroupScan;
+ }
+
+ @Override
+ public GroupScan getPhysicalOperator() {
+ return groupScan;
+ }
+
+ @Override
+ protected Class<? extends StoragePlugin> supportedPlugin() {
+ return FileSystemPlugin.class;
+ }
+
+ private int rexLiteralIntValue(RexLiteral offset) {
+ return ((BigDecimal) offset.getValue()).intValue();
+ }
+
+ private int getArtificialLimit(DrillLimitRelBase limit) {
+ return getArtificialLimit(limit.getFetch(), limit.getOffset());
+ }
+
+ private int getArtificialLimit(RexNode fetch, RexNode offset) {
+ int maxRows = -1;
+ if (fetch != null) {
+ maxRows = rexLiteralIntValue((RexLiteral) fetch);
+ if (offset != null) {
+ maxRows += rexLiteralIntValue((RexLiteral) offset);
+ }
+ }
+ return maxRows;
+ }
+
+ private static class FirstLimitFinder extends RelShuttleImpl {
+ private RexNode fetch;
+
+ private RexNode offset;
+
+ @Override
+ public RelNode visit(RelNode other) {
+ if (other instanceof DrillLimitRelBase) {
+ DrillLimitRelBase limitRelBase = (DrillLimitRelBase) other;
+ fetch = limitRelBase.getFetch();
+ offset = limitRelBase.getOffset();
+ return other;
+ } else if (other instanceof RelSubset) {
+ RelSubset relSubset = (RelSubset) other;
+ Util.first(relSubset.getBest(), relSubset.getOriginal()).accept(this);
+ }
+ return super.visit(other);
+ }
+
+ public RexNode getFetch() {
+ return fetch;
+ }
+
+ public RexNode getOffset() {
+ return offset;
+ }
+ }
+}
diff --git
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonColumnConverterFactory.java
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonColumnConverterFactory.java
new file mode 100644
index 0000000000..1b411e3ef9
--- /dev/null
+++
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonColumnConverterFactory.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.read;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.record.ColumnConverter;
+import org.apache.drill.exec.record.ColumnConverterFactory;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.vector.accessor.ValueWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.LocalZoneTimestamp;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.RowType;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+
+public class PaimonColumnConverterFactory extends ColumnConverterFactory {
+
+ public PaimonColumnConverterFactory(TupleMetadata providedSchema) {
+ super(providedSchema);
+ }
+
+ @Override
+ public ColumnConverter.ScalarColumnConverter buildScalar(ColumnMetadata
readerSchema, ValueWriter writer) {
+ switch (readerSchema.type()) {
+ case TIMESTAMP:
+ case TIMESTAMPTZ:
+ return new ColumnConverter.ScalarColumnConverter(value ->
writer.setTimestamp(asInstant(value)));
+ case VARDECIMAL:
+ return new ColumnConverter.ScalarColumnConverter(value ->
writer.setDecimal(asBigDecimal(value)));
+ case VARCHAR:
+ return new ColumnConverter.ScalarColumnConverter(value ->
writer.setString(asString(value)));
+ case VARBINARY:
+ return new ColumnConverter.ScalarColumnConverter(value -> {
+ byte[] bytes = (byte[]) value;
+ writer.setBytes(bytes, bytes.length);
+ });
+ default:
+ return super.buildScalar(readerSchema, writer);
+ }
+ }
+
+ public static ColumnMetadata getColumnMetadata(DataField field) {
+ DataType type = field.type();
+ String name = field.name();
+ TypeProtos.DataMode dataMode = type.isNullable()
+ ? TypeProtos.DataMode.OPTIONAL
+ : TypeProtos.DataMode.REQUIRED;
+ return getColumnMetadata(name, type, dataMode);
+ }
+
+ public static TupleSchema convertSchema(RowType rowType) {
+ TupleSchema schema = new TupleSchema();
+ for (DataField field : rowType.getFields()) {
+ ColumnMetadata columnMetadata = getColumnMetadata(field);
+ schema.add(columnMetadata);
+ }
+ return schema;
+ }
+
+ private static ColumnMetadata getColumnMetadata(String name, DataType type,
TypeProtos.DataMode dataMode) {
+ DataTypeRoot typeRoot = type.getTypeRoot();
+ switch (typeRoot) {
+ case ARRAY:
+ case MAP:
+ case MULTISET:
+ case ROW:
+ case VARIANT:
+ throw new UnsupportedOperationException(String.format("Unsupported
type: %s for column: %s", typeRoot, name));
+ default:
+ return getPrimitiveMetadata(name, type, dataMode);
+ }
+ }
+
+ private static ColumnMetadata getPrimitiveMetadata(String name, DataType
type, TypeProtos.DataMode dataMode) {
+ TypeProtos.MinorType minorType = getType(type.getTypeRoot());
+ if (minorType == null) {
+ throw new UnsupportedOperationException(String.format("Unsupported type:
%s for column: %s", type.getTypeRoot(), name));
+ }
+ TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder()
+ .setMinorType(minorType)
+ .setMode(dataMode);
+ if (type.getTypeRoot() == DataTypeRoot.DECIMAL) {
+ DecimalType decimalType = (DecimalType) type;
+ builder.setScale(decimalType.getScale())
+ .setPrecision(decimalType.getPrecision());
+ }
+ MaterializedField materializedField = MaterializedField.create(name,
builder.build());
+ return MetadataUtils.fromField(materializedField);
+ }
+
+ private static TypeProtos.MinorType getType(DataTypeRoot typeRoot) {
+ switch (typeRoot) {
+ case BOOLEAN:
+ return TypeProtos.MinorType.BIT;
+ case TINYINT:
+ return TypeProtos.MinorType.TINYINT;
+ case SMALLINT:
+ return TypeProtos.MinorType.SMALLINT;
+ case INTEGER:
+ return TypeProtos.MinorType.INT;
+ case BIGINT:
+ return TypeProtos.MinorType.BIGINT;
+ case FLOAT:
+ return TypeProtos.MinorType.FLOAT4;
+ case DOUBLE:
+ return TypeProtos.MinorType.FLOAT8;
+ case DATE:
+ return TypeProtos.MinorType.DATE;
+ case TIME_WITHOUT_TIME_ZONE:
+ return TypeProtos.MinorType.TIME;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return TypeProtos.MinorType.TIMESTAMP;
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return TypeProtos.MinorType.TIMESTAMPTZ;
+ case CHAR:
+ case VARCHAR:
+ return TypeProtos.MinorType.VARCHAR;
+ case BINARY:
+ case VARBINARY:
+ return TypeProtos.MinorType.VARBINARY;
+ case DECIMAL:
+ return TypeProtos.MinorType.VARDECIMAL;
+ default:
+ return null;
+ }
+ }
+
+ private static String asString(Object value) {
+ if (value instanceof BinaryString) {
+ return value.toString();
+ }
+ return (String) value;
+ }
+
+ private static BigDecimal asBigDecimal(Object value) {
+ if (value instanceof Decimal) {
+ return ((Decimal) value).toBigDecimal();
+ }
+ return (BigDecimal) value;
+ }
+
+ private static Instant asInstant(Object value) {
+ if (value instanceof Timestamp) {
+ return ((Timestamp) value).toInstant();
+ }
+ if (value instanceof LocalZoneTimestamp) {
+ return ((LocalZoneTimestamp) value).toInstant();
+ }
+ if (value instanceof java.sql.Timestamp) {
+ return ((java.sql.Timestamp) value).toInstant();
+ }
+ if (value instanceof Long) {
+ return Instant.ofEpochMilli((Long) value);
+ }
+ return (Instant) value;
+ }
+}
diff --git
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonRecordReader.java
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonRecordReader.java
new file mode 100644
index 0000000000..312838556d
--- /dev/null
+++
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonRecordReader.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.read;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.impl.scan.v3.FixedReceiver;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.record.ColumnConverter;
+import org.apache.drill.exec.record.ColumnConverterFactory;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.store.paimon.PaimonTableUtils;
+import org.apache.drill.exec.store.paimon.PaimonWork;
+import org.apache.drill.exec.store.paimon.PaimonReadUtils;
+import org.apache.drill.exec.store.paimon.format.PaimonFormatPlugin;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PaimonRecordReader implements ManagedReader {
+ private static final Logger logger =
LoggerFactory.getLogger(PaimonRecordReader.class);
+
+ private final PaimonFormatPlugin formatPlugin;
+
+ private final String path;
+
+ private final List<SchemaPath> columns;
+
+ private final LogicalExpression condition;
+
+ private final PaimonWork work;
+
+ private final int maxRecords;
+
+ private ResultSetLoader loader;
+
+ private ColumnConverter[] converters;
+
+ private InternalRow.FieldGetter[] getters;
+
+ private RecordReader<InternalRow> recordReader;
+
+ private RecordReader.RecordIterator<InternalRow> currentBatch;
+
+ private OperatorStats stats;
+
+ private int lastSchemaVersion = -1;
+
+ public PaimonRecordReader(PaimonFormatPlugin formatPlugin, String path,
+ List<SchemaPath> columns, LogicalExpression condition, PaimonWork work,
int maxRecords,
+ SchemaNegotiator negotiator) {
+ this.formatPlugin = formatPlugin;
+ this.path = path;
+ this.columns = columns;
+ this.condition = condition;
+ this.work = work;
+ this.maxRecords = maxRecords;
+ try {
+ Table table = PaimonTableUtils.loadTable(formatPlugin, path);
+ RowType rowType = table.rowType();
+ ReadBuilder readBuilder = table.newReadBuilder();
+ PaimonReadUtils.applyFilter(readBuilder, rowType, condition);
+ PaimonReadUtils.applyProjection(readBuilder, rowType, columns);
+ RowType readType = readBuilder.readType();
+
+ TupleSchema tableSchema =
PaimonColumnConverterFactory.convertSchema(readType);
+ TupleMetadata providedSchema = negotiator.providedSchema();
+ TupleMetadata tupleSchema =
FixedReceiver.Builder.mergeSchemas(providedSchema, tableSchema);
+ negotiator.tableSchema(tupleSchema, true);
+ loader = negotiator.build();
+
+ converters = buildConverters(providedSchema, tableSchema);
+ getters = buildGetters(readType);
+
+ recordReader =
readBuilder.newRead().executeFilter().createReader(work.getSplit());
+ currentBatch = null;
+ stats = negotiator.context().getStats();
+ } catch (IOException e) {
+ throw UserException.dataReadError(e)
+ .message("Failed to open Paimon reader for %s", path)
+ .build(logger);
+ }
+ }
+
+ @Override
+ public boolean next() {
+ RowSetLoader rowWriter = loader.writer();
+ while (!rowWriter.isFull()) {
+ if (!nextLine(rowWriter)) {
+ updateStats(rowWriter);
+ return false;
+ }
+ }
+ updateStats(rowWriter);
+ return true;
+ }
+
+ @Override
+ public void close() {
+ if (currentBatch != null) {
+ currentBatch.releaseBatch();
+ }
+ AutoCloseables.closeSilently(recordReader);
+ if (loader != null) {
+ loader.close();
+ }
+ }
+
+ private boolean nextLine(RowSetLoader rowWriter) {
+ if (rowWriter.limitReached(maxRecords)) {
+ return false;
+ }
+
+ InternalRow row = nextRow();
+ if (row == null) {
+ return false;
+ }
+
+ rowWriter.start();
+ for (int i = 0; i < getters.length; i++) {
+ converters[i].convert(getters[i].getFieldOrNull(row));
+ }
+ rowWriter.save();
+
+ return true;
+ }
+
+ private InternalRow nextRow() {
+ try {
+ while (true) {
+ if (currentBatch == null) {
+ currentBatch = recordReader.readBatch();
+ if (currentBatch == null) {
+ return null;
+ }
+ }
+ InternalRow row = currentBatch.next();
+ if (row != null) {
+ return row;
+ }
+ currentBatch.releaseBatch();
+ currentBatch = null;
+ }
+ } catch (IOException e) {
+ throw UserException.dataReadError(e)
+ .message("Failed to read Paimon data for %s", path)
+ .build(logger);
+ }
+ }
+
+ private void updateStats(RowSetLoader rowWriter) {
+ if (stats == null) {
+ return;
+ }
+ int rowCount = rowWriter.rowCount();
+ if (rowCount == 0) {
+ return;
+ }
+ int schemaVersion = loader.schemaVersion();
+ boolean isNewSchema = schemaVersion != lastSchemaVersion;
+ lastSchemaVersion = schemaVersion;
+ stats.batchReceived(0, rowCount, isNewSchema);
+ }
+
+ private ColumnConverter[] buildConverters(TupleMetadata providedSchema,
TupleSchema tableSchema) {
+ ColumnConverterFactory factory = new
PaimonColumnConverterFactory(providedSchema);
+ ColumnConverter[] columnConverters = new
ColumnConverter[tableSchema.size()];
+ for (int i = 0; i < tableSchema.size(); i++) {
+ ColumnMetadata columnMetadata = tableSchema.metadata(i);
+ columnConverters[i] = factory.getConverter(providedSchema,
columnMetadata,
+ loader.writer().column(columnMetadata.name()));
+ }
+ return columnConverters;
+ }
+
+ private InternalRow.FieldGetter[] buildGetters(RowType readType) {
+ List<DataField> fields = readType.getFields();
+ InternalRow.FieldGetter[] fieldGetters = new
InternalRow.FieldGetter[fields.size()];
+ for (int i = 0; i < fields.size(); i++) {
+ fieldGetters[i] = InternalRow.createFieldGetter(fields.get(i).type(), i);
+ }
+ return fieldGetters;
+ }
+}
diff --git
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonScanBatchCreator.java
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonScanBatchCreator.java
new file mode 100644
index 0000000000..ee3e2f5af6
--- /dev/null
+++
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonScanBatchCreator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.read;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.ScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.SchemaNegotiator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.paimon.PaimonSubScan;
+import org.apache.drill.exec.store.paimon.PaimonWork;
+import com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.List;
+
+@SuppressWarnings("unused")
+public class PaimonScanBatchCreator implements BatchCreator<PaimonSubScan> {
+
+ @Override
+ public CloseableRecordBatch getBatch(ExecutorFragmentContext context,
+ PaimonSubScan subScan, List<RecordBatch> children) throws
ExecutionSetupException {
+ Preconditions.checkArgument(children.isEmpty());
+
+ try {
+ ScanLifecycleBuilder builder = createBuilder(subScan);
+ return builder.buildScanOperator(context, subScan);
+ } catch (UserException e) {
+ throw e;
+ } catch (Throwable e) {
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ private ScanLifecycleBuilder createBuilder(PaimonSubScan subScan) {
+ ScanLifecycleBuilder builder = new ScanLifecycleBuilder();
+ builder.projection(subScan.getColumns());
+ builder.userName(subScan.getUserName());
+ builder.providedSchema(subScan.getSchema());
+ builder.readerFactory(new PaimonReaderFactory(subScan));
+ builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
+ return builder;
+ }
+
+ private static class PaimonReaderFactory implements
ReaderFactory<SchemaNegotiator> {
+ private final PaimonSubScan subScan;
+ private final Iterator<PaimonWork> workIterator;
+
+ private PaimonReaderFactory(PaimonSubScan subScan) {
+ this.subScan = subScan;
+ this.workIterator = subScan.getWorkList().iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return workIterator.hasNext();
+ }
+
+ @Override
+ public ManagedReader next(SchemaNegotiator negotiator) {
+ return new PaimonRecordReader(subScan.getFormatPlugin(),
subScan.getPath(),
+ subScan.getColumns(), subScan.getCondition(), workIterator.next(),
subScan.getMaxRecords(),
+ negotiator);
+ }
+ }
+}
diff --git
a/contrib/format-paimon/src/main/resources/bootstrap-format-plugins.json
b/contrib/format-paimon/src/main/resources/bootstrap-format-plugins.json
new file mode 100644
index 0000000000..786d5fd747
--- /dev/null
+++ b/contrib/format-paimon/src/main/resources/bootstrap-format-plugins.json
@@ -0,0 +1,20 @@
+{
+ "storage":{
+ "dfs": {
+ "type": "file",
+ "formats": {
+ "paimon": {
+ "type": "paimon"
+ }
+ }
+ },
+ "s3": {
+ "type": "file",
+ "formats": {
+ "paimon": {
+ "type": "paimon"
+ }
+ }
+ }
+ }
+}
diff --git a/contrib/format-paimon/src/main/resources/drill-module.conf
b/contrib/format-paimon/src/main/resources/drill-module.conf
new file mode 100644
index 0000000000..44156959a3
--- /dev/null
+++ b/contrib/format-paimon/src/main/resources/drill-module.conf
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file tells Drill to consider this module when class path scanning.
+# This file can also include any supplementary configuration information.
+# This file is in HOCON format, see
https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill.classpath.scanning: {
+ packages += "org.apache.drill.exec.store.paimon"
+}
diff --git
a/contrib/format-paimon/src/test/java/org/apache/drill/exec/store/paimon/PaimonQueriesTest.java
b/contrib/format-paimon/src/test/java/org/apache/drill/exec/store/paimon/PaimonQueriesTest.java
new file mode 100644
index 0000000000..1a500ca4f2
--- /dev/null
+++
b/contrib/format-paimon/src/test/java/org/apache/drill/exec/store/paimon/PaimonQueriesTest.java
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon;
+
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.paimon.format.PaimonFormatPluginConfig;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.types.DataTypes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.StringContains.containsString;
+import static
org.apache.drill.exec.util.StoragePluginTestUtils.DFS_PLUGIN_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class PaimonQueriesTest extends ClusterTest {
+
+ private static final String DB_NAME = "default";
+ private static final String TABLE_NAME = "append_table";
+ private static final String PK_TABLE_NAME = "pk_table";
+ private static final String PAIMON_SCAN_PATTERN =
"(PAIMON_GROUP_SCAN|PaimonGroupScan)";
+ private static String tableRelativePath;
+ private static String pkTableRelativePath;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ startCluster(ClusterFixture.builder(dirTestWatcher));
+
+ StoragePluginRegistry pluginRegistry =
cluster.drillbit().getContext().getStorage();
+ FileSystemConfig pluginConfig = (FileSystemConfig)
pluginRegistry.getPlugin(DFS_PLUGIN_NAME).getConfig();
+ Map<String, FormatPluginConfig> formats = new
HashMap<>(pluginConfig.getFormats());
+ formats.put("paimon", PaimonFormatPluginConfig.builder().build());
+ FileSystemConfig newPluginConfig = new FileSystemConfig(
+ pluginConfig.getConnection(),
+ pluginConfig.getConfig(),
+ pluginConfig.getWorkspaces(),
+ formats,
+ PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+ newPluginConfig.setEnabled(pluginConfig.isEnabled());
+ pluginRegistry.put(DFS_PLUGIN_NAME, newPluginConfig);
+
+ tableRelativePath = createAppendTable();
+ pkTableRelativePath = createPrimaryKeyTable();
+ }
+
+ @Test
+ public void testReadAppendTable() throws Exception {
+ String query = String.format("select id, name from dfs.tmp.`%s`",
tableRelativePath);
+ RowSet results = queryBuilder().sql(query).rowSet();
+ TupleMetadata actualSchema = results.schema();
+ assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+ assertEquals(TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").type());
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+ .add("name", TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").mode())
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(1, "alice")
+ .addRow(2, "bob")
+ .addRow(3, "carol")
+ .build();
+ new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+ }
+
+ @Test
+ public void testReadPrimaryKeyTable() throws Exception {
+ String query = String.format("select id, name from dfs.tmp.`%s`",
pkTableRelativePath);
+ RowSet results = queryBuilder().sql(query).rowSet();
+ TupleMetadata actualSchema = results.schema();
+ assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+ assertEquals(TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").type());
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+ .add("name", TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").mode())
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(1, "dave")
+ .addRow(2, "erin")
+ .build();
+ new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+ }
+
+ @Test
+ public void testProjectionPushdown() throws Exception {
+ String query = String.format("select name from dfs.tmp.`%s`",
tableRelativePath);
+
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include(PAIMON_SCAN_PATTERN + ".*columns=\\[.*name.*\\]")
+ .match(true);
+
+ RowSet results = queryBuilder().sql(query).rowSet();
+ TupleMetadata actualSchema = results.schema();
+ assertEquals(TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").type());
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("name", TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").mode())
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("alice")
+ .addRow("bob")
+ .addRow("carol")
+ .build();
+ new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+ }
+
+ @Test
+ public void testMultiColumnProjection() throws Exception {
+ String query = String.format("select id, name from dfs.tmp.`%s`",
tableRelativePath);
+ RowSet results = queryBuilder().sql(query).rowSet();
+ TupleMetadata actualSchema = results.schema();
+ assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+ assertEquals(TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").type());
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+ .add("name", TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").mode())
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(1, "alice")
+ .addRow(2, "bob")
+ .addRow(3, "carol")
+ .build();
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testFilterPushdown() throws Exception {
+ String query = String.format("select id, name from dfs.tmp.`%s` where id =
2", tableRelativePath);
+
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2")
+ .match(true);
+
+ RowSet results = queryBuilder().sql(query).rowSet();
+ TupleMetadata actualSchema = results.schema();
+ assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+ assertEquals(TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").type());
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+ .add("name", TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").mode())
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(2, "bob")
+ .build();
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testFilterPushdownGT() throws Exception {
+ String query = String.format("select id, name from dfs.tmp.`%s` where id >
1", tableRelativePath);
+
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*1")
+ .match(true);
+
+ RowSet results = queryBuilder().sql(query).rowSet();
+ TupleMetadata actualSchema = results.schema();
+ assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+ assertEquals(TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").type());
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+ .add("name", TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").mode())
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(2, "bob")
+ .addRow(3, "carol")
+ .build();
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testFilterPushdownLT() throws Exception {
+ String query = String.format("select id, name from dfs.tmp.`%s` where id <
3", tableRelativePath);
+
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*3")
+ .match(true);
+
+ RowSet results = queryBuilder().sql(query).rowSet();
+ TupleMetadata actualSchema = results.schema();
+ assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+ assertEquals(TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").type());
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+ .add("name", TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").mode())
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(1, "alice")
+ .addRow(2, "bob")
+ .build();
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testFilterPushdownGE() throws Exception {
+ String query = String.format("select id, name from dfs.tmp.`%s` where id
>= 2", tableRelativePath);
+
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2")
+ .match(true);
+
+ RowSet results = queryBuilder().sql(query).rowSet();
+ TupleMetadata actualSchema = results.schema();
+ assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+ assertEquals(TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").type());
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+ .add("name", TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").mode())
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(2, "bob")
+ .addRow(3, "carol")
+ .build();
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testFilterPushdownLE() throws Exception {
+ String query = String.format("select id, name from dfs.tmp.`%s` where id
<= 2", tableRelativePath);
+
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2")
+ .match(true);
+
+ RowSet results = queryBuilder().sql(query).rowSet();
+ TupleMetadata actualSchema = results.schema();
+ assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+ assertEquals(TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").type());
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+ .add("name", TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").mode())
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(1, "alice")
+ .addRow(2, "bob")
+ .build();
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testFilterPushdownNE() throws Exception {
+ String query = String.format("select id, name from dfs.tmp.`%s` where id
<> 2", tableRelativePath);
+
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2")
+ .match(true);
+
+ RowSet results = queryBuilder().sql(query).rowSet();
+ TupleMetadata actualSchema = results.schema();
+ assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+ assertEquals(TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").type());
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+ .add("name", TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").mode())
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(1, "alice")
+ .addRow(3, "carol")
+ .build();
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testFilterPushdownAnd() throws Exception {
+ String query = String.format("select id, name from dfs.tmp.`%s` where id >
1 and id < 3", tableRelativePath);
+
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include(PAIMON_SCAN_PATTERN + ".*condition=.*booleanAnd")
+ .match(true);
+
+ RowSet results = queryBuilder().sql(query).rowSet();
+ TupleMetadata actualSchema = results.schema();
+ assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+ assertEquals(TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").type());
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+ .add("name", TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").mode())
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(2, "bob")
+ .build();
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testFilterPushdownOr() throws Exception {
+ String query = String.format("select id, name from dfs.tmp.`%s` where id =
1 or id = 3", tableRelativePath);
+
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include(PAIMON_SCAN_PATTERN + ".*condition=.*booleanOr")
+ .match(true);
+
+ RowSet results = queryBuilder().sql(query).rowSet();
+ TupleMetadata actualSchema = results.schema();
+ assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+ assertEquals(TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").type());
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+ .add("name", TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").mode())
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(1, "alice")
+ .addRow(3, "carol")
+ .build();
+ new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+ }
+
+ @Test
+ public void testFilterPushdownNot() throws Exception {
+ String query = String.format("select id, name from dfs.tmp.`%s` where not
(id = 2)", tableRelativePath);
+
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2")
+ .match(true);
+
+ RowSet results = queryBuilder().sql(query).rowSet();
+ TupleMetadata actualSchema = results.schema();
+ assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+ assertEquals(TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").type());
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+ .add("name", TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").mode())
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(1, "alice")
+ .addRow(3, "carol")
+ .build();
+ new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+ }
+
+ @Test
+ public void testLimitPushdown() throws Exception {
+ String query = String.format("select id from dfs.tmp.`%s` limit 2",
tableRelativePath);
+
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include(PAIMON_SCAN_PATTERN + ".*maxRecords=2")
+ .match(true);
+ RowSet results = queryBuilder().sql(query).rowSet();
+ TupleMetadata actualSchema = results.schema();
+ assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+ assertEquals(2, results.rowCount());
+ results.clear();
+ }
+
+ @Test
+ public void testCombinedPushdownFilterProjectionLimit() throws Exception {
+ String query = String.format("select id, name from dfs.tmp.`%s` where id >
1 limit 1", tableRelativePath);
+
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*1")
+ .include(PAIMON_SCAN_PATTERN + ".*maxRecords=1")
+ .match(true);
+
+ RowSet results = queryBuilder().sql(query).rowSet();
+ TupleMetadata actualSchema = results.schema();
+ assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+ assertEquals(TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").type());
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+ .add("name", TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").mode())
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(2, "bob")
+ .build();
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testSelectWildcard() throws Exception {
+ String query = String.format("select * from dfs.tmp.`%s`",
tableRelativePath);
+ RowSet results = queryBuilder().sql(query).rowSet();
+ TupleMetadata actualSchema = results.schema();
+ assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+ assertEquals(TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").type());
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+ .add("name", TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").mode())
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(1, "alice")
+ .addRow(2, "bob")
+ .addRow(3, "carol")
+ .build();
+ new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+ }
+
+ @Test
+ public void testSelectWithOrderBy() throws Exception {
+ String query = String.format("select id, name from dfs.tmp.`%s` order by
id desc", tableRelativePath);
+ RowSet results = queryBuilder().sql(query).rowSet();
+ TupleMetadata actualSchema = results.schema();
+ assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+ assertEquals(TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").type());
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+ .add("name", TypeProtos.MinorType.VARCHAR,
actualSchema.metadata("name").mode())
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(3, "carol")
+ .addRow(2, "bob")
+ .addRow(1, "alice")
+ .build();
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testSelectWithCount() throws Exception {
+ String query = String.format("select count(*) from dfs.tmp.`%s`",
tableRelativePath);
+
+ assertEquals(3, queryBuilder().sql(query).singletonLong());
+ }
+
+ @Test
+ public void testSerDe() throws Exception {
+ String snapshotQuery = String.format(
+ "select snapshot_id from dfs.tmp.`%s#snapshots` order by commit_time
limit 1", tableRelativePath);
+
+ long snapshotId = queryBuilder().sql(snapshotQuery).singletonLong();
+ String sql = String.format(
+ "select count(*) as cnt from table(dfs.tmp.`%s`(type => 'paimon',
snapshotId => %d))",
+ tableRelativePath, snapshotId);
+ String plan = queryBuilder().sql(sql).explainJson();
+ long count = queryBuilder().physical(plan).singletonLong();
+ assertEquals(2, count);
+ }
+
+ @Test
+ public void testInvalidColumnName() throws Exception {
+ String query = String.format("select id, invalid_column from
dfs.tmp.`%s`", tableRelativePath);
+ try {
+ queryBuilder().sql(query).run();
+ fail("Expected UserRemoteException for invalid column name");
+ } catch (UserRemoteException e) {
+ assertThat(e.getVerboseMessage(), containsString("invalid_column"));
+ }
+ }
+
+ @Test
+ public void testSelectWithSnapshotId() throws Exception {
+ String snapshotQuery = String.format(
+ "select snapshot_id from dfs.tmp.`%s#snapshots` order by commit_time
limit 1", tableRelativePath);
+
+ long snapshotId = queryBuilder().sql(snapshotQuery).singletonLong();
+ String query = String.format(
+ "select id, name from table(dfs.tmp.`%s`(type => 'paimon', snapshotId =>
%d))",
+ tableRelativePath, snapshotId);
+ long count = queryBuilder().sql(query).run().recordCount();
+ assertEquals(2, count);
+ }
+
+ @Test
+ public void testSelectWithSnapshotAsOfTime() throws Exception {
+ String snapshotQuery = String.format(
+ "select commit_time from dfs.tmp.`%s#snapshots` order by commit_time
limit 1", tableRelativePath);
+
+ long snapshotTime = queryBuilder().sql(snapshotQuery).singletonLong();
+ String query = String.format(
+ "select id, name from table(dfs.tmp.`%s`(type => 'paimon',
snapshotAsOfTime => %d))",
+ tableRelativePath, snapshotTime);
+ long count = queryBuilder().sql(query).run().recordCount();
+ assertEquals(2, count);
+ }
+
+ @Test
+ public void testSelectWithSnapshotIdAndSnapshotAsOfTime() throws Exception {
+ String query = String.format(
+ "select * from table(dfs.tmp.`%s`(type => 'paimon', snapshotId => %d,
snapshotAsOfTime => %d))",
+ tableRelativePath, 123, 456);
+ try {
+ queryBuilder().sql(query).run();
+ fail();
+ } catch (UserRemoteException e) {
+ assertThat(e.getVerboseMessage(),
+ containsString("Both 'snapshotId' and 'snapshotAsOfTime' cannot be
specified"));
+ }
+ }
+
+ @Test
+ public void testSelectSnapshotsMetadata() throws Exception {
+ String query = String.format("select * from dfs.tmp.`%s#snapshots`",
tableRelativePath);
+
+ long count = queryBuilder().sql(query).run().recordCount();
+ assertEquals(2, count);
+ }
+
+ @Test
+ public void testSelectSchemasMetadata() throws Exception {
+ String query = String.format("select * from dfs.tmp.`%s#schemas`",
tableRelativePath);
+
+ long count = queryBuilder().sql(query).run().recordCount();
+ assertEquals(1, count);
+ }
+
+ @Test
+ public void testSelectFilesMetadata() throws Exception {
+ String query = String.format("select * from dfs.tmp.`%s#files`",
tableRelativePath);
+
+ long count = queryBuilder().sql(query).run().recordCount();
+ assertEquals(2, count);
+ }
+
+ @Test
+ public void testSelectManifestsMetadata() throws Exception {
+ String query = String.format("select * from dfs.tmp.`%s#manifests`",
tableRelativePath);
+
+ long count = queryBuilder().sql(query).run().recordCount();
+ assertEquals(2, count);
+ }
+
+ private static String createAppendTable() throws Exception {
+ Path dfsRoot =
Paths.get(dirTestWatcher.getDfsTestTmpDir().toURI().getPath());
+ Path warehouseDir = dfsRoot.resolve("paimon_warehouse");
+
+ Options options = new Options();
+ options.set("warehouse", warehouseDir.toUri().toString());
+ options.set("metastore", "filesystem");
+
+ CatalogContext context = CatalogContext.create(options, new
Configuration());
+ try (Catalog catalog = CatalogFactory.createCatalog(context)) {
+ catalog.createDatabase(DB_NAME, true);
+
+ Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .build();
+ Identifier identifier = Identifier.create(DB_NAME, TABLE_NAME);
+ catalog.createTable(identifier, schema, false);
+
+ Table table = catalog.getTable(identifier);
+ writeRows(table, Arrays.asList(
+ GenericRow.of(1, BinaryString.fromString("alice")),
+ GenericRow.of(2, BinaryString.fromString("bob"))
+ ));
+ writeRows(table, Arrays.asList(
+ GenericRow.of(3, BinaryString.fromString("carol"))
+ ));
+ }
+
+ Path tablePath = warehouseDir.resolve(DB_NAME + ".db").resolve(TABLE_NAME);
+ Path relativePath = dfsRoot.relativize(tablePath);
+ return relativePath.toString().replace('\\', '/');
+ }
+
+ private static String createPrimaryKeyTable() throws Exception {
+ Path dfsRoot =
Paths.get(dirTestWatcher.getDfsTestTmpDir().toURI().getPath());
+ Path warehouseDir = dfsRoot.resolve("paimon_warehouse");
+
+ Options options = new Options();
+ options.set("warehouse", warehouseDir.toUri().toString());
+ options.set("metastore", "filesystem");
+
+ CatalogContext context = CatalogContext.create(options, new
Configuration());
+ try (Catalog catalog = CatalogFactory.createCatalog(context)) {
+ catalog.createDatabase(DB_NAME, true);
+
+ Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .option(CoreOptions.BUCKET.key(), "1")
+ .primaryKey("id")
+ .build();
+ Identifier identifier = Identifier.create(DB_NAME, PK_TABLE_NAME);
+ catalog.createTable(identifier, schema, false);
+
+ Table table = catalog.getTable(identifier);
+ writeRows(table, Arrays.asList(
+ GenericRow.of(1, BinaryString.fromString("dave")),
+ GenericRow.of(2, BinaryString.fromString("erin"))
+ ));
+ }
+
+ Path tablePath = warehouseDir.resolve(DB_NAME +
".db").resolve(PK_TABLE_NAME);
+ Path relativePath = dfsRoot.relativize(tablePath);
+ return relativePath.toString().replace('\\', '/');
+ }
+
+ private static void writeRows(Table table, List<GenericRow> rows) throws
Exception {
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ List<CommitMessage> messages;
+ try (BatchTableWrite write = writeBuilder.newWrite()) {
+ for (GenericRow row : rows) {
+ write.write(row);
+ }
+ messages = write.prepareCommit();
+ }
+ try (BatchTableCommit commit = writeBuilder.newCommit()) {
+ commit.commit(messages);
+ }
+ }
+
+}
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 50ccdc90dd..4c1e0bfc19 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -51,6 +51,7 @@
<module>format-image</module>
<module>format-log</module>
<module>format-ltsv</module>
+ <module>format-paimon</module>
<module>format-pcapng</module>
<module>format-pdf</module>
<module>format-sas</module>
diff --git a/distribution/pom.xml b/distribution/pom.xml
index a35882c483..e73ec3f7e6 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -537,6 +537,11 @@
<artifactId>drill-iceberg-format</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.drill.contrib</groupId>
+ <artifactId>drill-paimon-format</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.drill.contrib</groupId>
<artifactId>drill-deltalake-format</artifactId>
diff --git a/distribution/src/assemble/component.xml
b/distribution/src/assemble/component.xml
index 66792fd43f..d30caef9a8 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -48,6 +48,7 @@
<include>org.apache.drill.contrib:drill-kudu-storage:jar</include>
<include>org.apache.drill.contrib:drill-mongo-storage:jar</include>
<include>org.apache.drill.contrib:drill-opentsdb-storage:jar</include>
+ <include>org.apache.drill.contrib:drill-paimon-format:jar</include>
<include>org.apache.drill.contrib:drill-storage-cassandra:jar</include>
<include>org.apache.drill.contrib:drill-storage-elasticsearch:jar</include>
<include>org.apache.drill.contrib:drill-storage-googlesheets:jar</include>
diff --git a/pom.xml b/pom.xml
index 2ab9e8138c..85b2bcb457 100644
--- a/pom.xml
+++ b/pom.xml
@@ -51,6 +51,7 @@
<!-- Please keep this list sorted. -->
<aircompressor.version>0.25</aircompressor.version>
<antlr.version>4.9.3</antlr.version>
+ <apiguardian.version>1.1.2</apiguardian.version>
<asm.version>9.5</asm.version>
<avatica.version>1.23.0</avatica.version>
<avro.version>1.12.0</avro.version>
@@ -93,6 +94,7 @@
<httpclient.version>4.5.14</httpclient.version>
<httpdlog-parser.version>5.11.0</httpdlog-parser.version>
<iceberg.version>0.12.1</iceberg.version>
+ <paimon.version>1.3.1</paimon.version>
<jackson.version>2.18.3</jackson.version>
<janino.version>3.1.12</janino.version>
<javassist.version>3.29.2-GA</javassist.version>
@@ -1013,6 +1015,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apiguardian</groupId>
+ <artifactId>apiguardian-api</artifactId>
+ <version>${apiguardian.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica-core</artifactId>