This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b005f379e DRILL-8542: Support Paimon Format Plugin (#3035)
1b005f379e is described below

commit 1b005f379ee6d4cbbb24c2544f46a7a57c54dde7
Author: Letian Jiang <[email protected]>
AuthorDate: Fri Jan 30 00:52:23 2026 +0800

    DRILL-8542: Support Paimon Format Plugin (#3035)
---
 contrib/format-paimon/README.md                    |  96 +++
 contrib/format-paimon/pom.xml                      | 123 ++++
 .../exec/store/paimon/PaimonCompleteWork.java      |  60 ++
 .../drill/exec/store/paimon/PaimonGroupScan.java   | 401 ++++++++++++
 .../drill/exec/store/paimon/PaimonReadUtils.java   | 107 ++++
 .../drill/exec/store/paimon/PaimonSubScan.java     | 232 +++++++
 .../drill/exec/store/paimon/PaimonTableUtils.java  | 105 ++++
 .../apache/drill/exec/store/paimon/PaimonWork.java | 132 ++++
 .../format/PaimonFormatLocationTransformer.java    |  61 ++
 .../store/paimon/format/PaimonFormatMatcher.java   |  85 +++
 .../store/paimon/format/PaimonFormatPlugin.java    | 221 +++++++
 .../paimon/format/PaimonFormatPluginConfig.java    | 123 ++++
 .../paimon/format/PaimonMetadataFileSelection.java |  33 +
 .../store/paimon/format/PaimonMetadataType.java    |  54 ++
 .../paimon/plan/DrillExprToPaimonTranslator.java   | 341 +++++++++++
 .../store/paimon/plan/PaimonPluginImplementor.java | 203 ++++++
 .../paimon/read/PaimonColumnConverterFactory.java  | 183 ++++++
 .../exec/store/paimon/read/PaimonRecordReader.java | 215 +++++++
 .../store/paimon/read/PaimonScanBatchCreator.java  |  88 +++
 .../main/resources/bootstrap-format-plugins.json   |  20 +
 .../src/main/resources/drill-module.conf           |  25 +
 .../drill/exec/store/paimon/PaimonQueriesTest.java | 679 +++++++++++++++++++++
 contrib/pom.xml                                    |   1 +
 distribution/pom.xml                               |   5 +
 distribution/src/assemble/component.xml            |   1 +
 pom.xml                                            |   7 +
 26 files changed, 3601 insertions(+)

diff --git a/contrib/format-paimon/README.md b/contrib/format-paimon/README.md
new file mode 100644
index 0000000000..224405dacc
--- /dev/null
+++ b/contrib/format-paimon/README.md
@@ -0,0 +1,96 @@
+# Apache Paimon format plugin
+
+This format plugin enables Drill to query Apache Paimon tables.
+
+Unlike regular format plugins, the Paimon table is a folder with data and 
metadata files, but Drill checks the presence
+of the `snapshot` directory and `schema` directory to ensure that the table is 
a Paimon one.
+
+Drill supports reading all formats of Paimon tables currently supported via 
Paimon Java API: Parquet and ORC.
+No need to provide actual table format, it will be discovered automatically.
+
+For details related to Apache Paimon table format, please refer to [official 
docs](https://paimon.apache.org/).
+
+## Supported optimizations and features
+
+### Project pushdown
+
+This format plugin supports project pushdown optimization.
+
+For the case of project pushdown, only columns specified in the query will be 
read. In conjunction with
+column-oriented formats like Parquet or ORC, it allows improving reading 
performance significantly.
+
+### Filter pushdown
+
+This format plugin supports filter pushdown optimization.
+
+For the case of filter pushdown, expressions supported by Paimon API will be 
pushed down, so only data that matches
+the filter expression will be read.
+
+### Limit pushdown
+
+This format plugin supports limit pushdown optimization.
+
+The limit is pushed down to Paimon scan planning to reduce the amount of data 
read.
+
+### Querying table metadata
+
+Apache Drill provides the ability to query table metadata exposed by Paimon.
+
+At this point, Apache Paimon has the following metadata kinds:
+
+* SNAPSHOTS
+* SCHEMAS
+* FILES
+* MANIFESTS
+
+To query specific metadata, just add the `#metadata_name` suffix to the table 
location, like in the following example:
+
+```sql
+SELECT *
+FROM dfs.tmp.`testTable#snapshots`;
+```
+
+### Querying specific table versions (time travel)
+
+Apache Paimon has the ability to track the table modifications and read 
specific version before or after modifications
+or modifications itself.
+
+This format plugin embraces this ability and provides an easy-to-use way of 
triggering it.
+
+The following ways of specifying table version are supported:
+
+- `snapshotId` - id of the specific snapshot
+- `snapshotAsOfTime` - the most recent snapshot as of the given time in 
milliseconds
+
+Table function can be used to specify one of the above configs in the 
following way:
+
+```sql
+SELECT *
+FROM table(dfs.tmp.testTable(type => 'paimon', snapshotId => 1));
+
+SELECT *
+FROM table(dfs.tmp.testTable(type => 'paimon', snapshotAsOfTime => 
1736345510000));
+```
+
+Note: `snapshotId` and `snapshotAsOfTime` are mutually exclusive and cannot be 
specified at the same time.
+
+## Configuration
+
+The only required configuration option is:
+
+- `type` - format plugin type, should be `'paimon'`
+
+Note: `snapshotId` and `snapshotAsOfTime` for time travel queries are 
specified at query time using the `table()` function.
+
+### Format config example:
+
+```json
+{
+  "type": "file",
+  "formats": {
+    "paimon": {
+      "type": "paimon"
+    }
+  }
+}
+```
diff --git a/contrib/format-paimon/pom.xml b/contrib/format-paimon/pom.xml
new file mode 100644
index 0000000000..0bc02a9da0
--- /dev/null
+++ b/contrib/format-paimon/pom.xml
@@ -0,0 +1,123 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>drill-contrib-parent</artifactId>
+    <groupId>org.apache.drill.contrib</groupId>
+    <version>1.23.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>drill-paimon-format</artifactId>
+
+  <name>Drill : Contrib : Format : Paimon</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.paimon</groupId>
+      <artifactId>paimon-core</artifactId>
+      <version>${paimon.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.paimon</groupId>
+      <artifactId>paimon-common</artifactId>
+      <version>${paimon.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.paimon</groupId>
+      <artifactId>paimon-api</artifactId>
+      <version>${paimon.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.paimon</groupId>
+      <artifactId>paimon-codegen-loader</artifactId>
+      <version>${paimon.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.paimon</groupId>
+      <artifactId>paimon-format</artifactId>
+      <version>${paimon.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.paimon</groupId>
+      <artifactId>paimon-shade-jackson-2</artifactId>
+      <version>2.14.2-0.8.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.paimon</groupId>
+      <artifactId>paimon-shade-guava-30</artifactId>
+      <version>30.1.1-jre-0.8.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.paimon</groupId>
+      <artifactId>paimon-shade-caffeine-2</artifactId>
+      <version>2.9.3-0.8.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.paimon</groupId>
+      <artifactId>paimon-shade-netty-4</artifactId>
+      <version>4.1.100.Final-0.8.0</version>
+    </dependency>
+    <dependency>
+      <groupId>io.airlift</groupId>
+      <artifactId>aircompressor</artifactId>
+      <version>0.27</version>
+    </dependency>
+    <dependency>
+      <groupId>org.lz4</groupId>
+      <artifactId>lz4-java</artifactId>
+      <version>1.8.0</version>
+    </dependency>
+    <dependency>
+      <groupId>com.github.luben</groupId>
+      <artifactId>zstd-jni</artifactId>
+      <version>1.5.5-11</version>
+    </dependency>
+    <dependency>
+      <groupId>org.xerial.snappy</groupId>
+      <artifactId>snappy-java</artifactId>
+      <version>1.1.8.4</version>
+    </dependency>
+
+    <!-- Test dependency -->
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git 
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonCompleteWork.java
 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonCompleteWork.java
new file mode 100644
index 0000000000..7d410ae5f3
--- /dev/null
+++ 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonCompleteWork.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon;
+
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.paimon.table.source.Split;
+
+public class PaimonCompleteWork implements CompleteWork {
+  private final EndpointByteMap byteMap;
+
+  private final Split split;
+
+  private final long totalBytes;
+
+  public PaimonCompleteWork(EndpointByteMap byteMap, Split split) {
+    this.byteMap = byteMap;
+    this.split = split;
+    long rowCount = split.rowCount();
+    this.totalBytes = rowCount > 0 ? rowCount : 1;
+  }
+
+  public Split getSplit() {
+    return split;
+  }
+
+  public long getRowCount() {
+    return split.rowCount();
+  }
+
+  @Override
+  public long getTotalBytes() {
+    return totalBytes;
+  }
+
+  @Override
+  public EndpointByteMap getByteMap() {
+    return byteMap;
+  }
+
+  @Override
+  public int compareTo(CompleteWork o) {
+    return 0;
+  }
+}
diff --git 
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonGroupScan.java
 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonGroupScan.java
new file mode 100644
index 0000000000..d344f9aafe
--- /dev/null
+++ 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonGroupScan.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ExpressionStringBuilder;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.paimon.format.PaimonFormatPlugin;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ListMultimap;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@JsonTypeName("paimon-scan")
+@SuppressWarnings("unused")
+public class PaimonGroupScan extends AbstractGroupScan {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(PaimonGroupScan.class);
+
+  private final PaimonFormatPlugin formatPlugin;
+
+  private final String path;
+
+  private final TupleMetadata schema;
+
+  private final LogicalExpression condition;
+
+  private final List<SchemaPath> columns;
+
+  private int maxRecords;
+
+  private List<PaimonCompleteWork> chunks;
+
+  private List<EndpointAffinity> endpointAffinities;
+
+  private ListMultimap<Integer, PaimonCompleteWork> mappings;
+
+  @JsonCreator
+  public PaimonGroupScan(
+      @JsonProperty("userName") String userName,
+      @JsonProperty("storage") StoragePluginConfig storageConfig,
+      @JsonProperty("format") FormatPluginConfig formatConfig,
+      @JsonProperty("columns") List<SchemaPath> columns,
+      @JsonProperty("schema") TupleMetadata schema,
+      @JsonProperty("path") String path,
+      @JsonProperty("condition") LogicalExpression condition,
+      @JsonProperty("maxRecords") Integer maxRecords,
+      @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException {
+    this(builder()
+      .userName(userName)
+      .formatPlugin(pluginRegistry.resolveFormat(storageConfig, formatConfig, 
PaimonFormatPlugin.class))
+      .schema(schema)
+      .path(path)
+      .condition(condition)
+      .columns(columns)
+      .maxRecords(maxRecords));
+  }
+
+  private PaimonGroupScan(PaimonGroupScanBuilder builder) throws IOException {
+    super(builder.userName);
+    this.formatPlugin = builder.formatPlugin;
+    this.columns = builder.columns;
+    this.path = builder.path;
+    this.schema = builder.schema;
+    this.condition = builder.condition;
+    this.maxRecords = builder.maxRecords;
+
+    init();
+  }
+
+  private PaimonGroupScan(PaimonGroupScan that) {
+    super(that);
+    this.columns = that.columns;
+    this.formatPlugin = that.formatPlugin;
+    this.path = that.path;
+    this.condition = that.condition;
+    this.schema = that.schema;
+    this.maxRecords = that.maxRecords;
+    this.chunks = that.chunks;
+    this.endpointAffinities = that.endpointAffinities;
+    this.mappings = that.mappings;
+  }
+
+  public static PaimonGroupScanBuilder builder() {
+    return new PaimonGroupScanBuilder();
+  }
+
+  @Override
+  public PaimonGroupScan clone(List<SchemaPath> columns) {
+    try {
+      return toBuilder().columns(columns).build();
+    } catch (IOException e) {
+      throw UserException.dataReadError(e)
+        .message("Failed to clone Paimon group scan")
+        .build(logger);
+    }
+  }
+
+  @Override
+  public PaimonGroupScan applyLimit(int maxRecords) {
+    PaimonGroupScan clone = new PaimonGroupScan(this);
+    clone.maxRecords = maxRecords;
+    return clone;
+  }
+
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
+    mappings = AssignmentCreator.getMappings(endpoints, chunks);
+  }
+
+  private void createMappings(List<EndpointAffinity> affinities) {
+    List<DrillbitEndpoint> endpoints = affinities.stream()
+      .map(EndpointAffinity::getEndpoint)
+      .collect(Collectors.toList());
+    applyAssignments(endpoints);
+  }
+
+  @Override
+  public PaimonSubScan getSpecificScan(int minorFragmentId) {
+    if (chunks.isEmpty()) {
+      return emptySubScan();
+    }
+
+    if (mappings == null) {
+      createMappings(endpointAffinities);
+    }
+
+    List<PaimonCompleteWork> workList = mappings.get(minorFragmentId);
+    List<PaimonWork> paimonWorkList = workList == null
+      ? Collections.emptyList()
+      : convertWorkList(workList);
+
+    PaimonSubScan subScan = PaimonSubScan.builder()
+      .userName(userName)
+      .formatPlugin(formatPlugin)
+      .columns(columns)
+      .condition(condition)
+      .schema(schema)
+      .workList(paimonWorkList)
+      .path(path)
+      .maxRecords(maxRecords)
+      .build();
+
+    subScan.setOperatorId(getOperatorId());
+    return subScan;
+  }
+
+  private PaimonSubScan emptySubScan() {
+    PaimonSubScan subScan = PaimonSubScan.builder()
+      .userName(userName)
+      .formatPlugin(formatPlugin)
+      .columns(columns)
+      .condition(condition)
+      .schema(schema)
+      .workList(Collections.emptyList())
+      .path(path)
+      .maxRecords(maxRecords)
+      .build();
+    subScan.setOperatorId(getOperatorId());
+    return subScan;
+  }
+
+  private List<PaimonWork> convertWorkList(List<PaimonCompleteWork> workList) {
+    return workList.stream()
+      .map(PaimonCompleteWork::getSplit)
+      .map(PaimonWork::new)
+      .collect(Collectors.toList());
+  }
+
+  @JsonProperty("maxRecords")
+  public int getMaxRecords() {
+    return maxRecords;
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return Math.max(chunks.size(), 1);
+  }
+
+  @Override
+  public String getDigest() {
+    return toString();
+  }
+
+  @Override
+  public ScanStats getScanStats() {
+    long rowCount = chunks.stream()
+      .mapToLong(PaimonCompleteWork::getRowCount)
+      .sum();
+    long estimatedRecords = rowCount > 0
+      ? rowCount
+      : Math.max(chunks.size(), 1) * 1_000_000L;
+    return new ScanStats(ScanStats.GroupScanProperty.NO_EXACT_ROW_COUNT, 
estimatedRecords, 1, 0);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return new PaimonGroupScan(this);
+  }
+
+  private void init() throws IOException {
+    Table table = PaimonTableUtils.loadTable(formatPlugin, path);
+    String fileFormat = new CoreOptions(table.options()).fileFormatString();
+    // Paimon supports multiple formats; Drill currently reads Parquet/ORC 
only.
+    if (!"parquet".equalsIgnoreCase(fileFormat) && 
!"orc".equalsIgnoreCase(fileFormat)) {
+      throw UserException.unsupportedError()
+        .message("Paimon file format '%s' is not supported. Only parquet and 
orc are supported.", fileFormat)
+        .build(logger);
+    }
+
+    RowType rowType = table.rowType();
+    ReadBuilder readBuilder = table.newReadBuilder();
+    PaimonReadUtils.applyFilter(readBuilder, rowType, condition);
+    PaimonReadUtils.applyProjection(readBuilder, rowType, columns);
+    TableScan tableScan = readBuilder.newScan();
+    List<Split> splits = tableScan.plan().splits();
+    chunks = splits.stream()
+      .map(split -> new PaimonCompleteWork(new EndpointByteMapImpl(), split))
+      .collect(Collectors.toList());
+    endpointAffinities = AffinityCreator.getAffinityMap(chunks);
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    if (endpointAffinities == null) {
+      logger.debug("Chunks size: {}", chunks.size());
+      endpointAffinities = AffinityCreator.getAffinityMap(chunks);
+    }
+    return endpointAffinities;
+  }
+
+  @Override
+  public boolean supportsLimitPushdown() {
+    return true;
+  }
+
+  @Override
+  @JsonProperty("columns")
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  @JsonProperty("schema")
+  public TupleMetadata getSchema() {
+    return schema;
+  }
+
+  @JsonProperty("storage")
+  public StoragePluginConfig getStorageConfig() {
+    return formatPlugin.getStorageConfig();
+  }
+
+  @JsonProperty("format")
+  public FormatPluginConfig getFormatConfig() {
+    return formatPlugin.getConfig();
+  }
+
+  @JsonProperty("path")
+  public String getPath() {
+    return path;
+  }
+
+  @JsonProperty("condition")
+  public LogicalExpression getCondition() {
+    return condition;
+  }
+
+  @JsonIgnore
+  public PaimonFormatPlugin getFormatPlugin() {
+    return formatPlugin;
+  }
+
+  @Override
+  public String getOperatorType() {
+    return "PAIMON_GROUP_SCAN";
+  }
+
+  @Override
+  public String toString() {
+    String conditionString = condition == null ? null : 
ExpressionStringBuilder.toString(condition).trim();
+    return new PlanStringBuilder(this)
+      .field("path", path)
+      .field("schema", schema)
+      .field("columns", columns)
+      .field("condition", conditionString)
+      .field("maxRecords", maxRecords)
+      .toString();
+  }
+
+  public PaimonGroupScanBuilder toBuilder() {
+    return new PaimonGroupScanBuilder()
+      .userName(this.userName)
+      .formatPlugin(this.formatPlugin)
+      .schema(this.schema)
+      .path(this.path)
+      .condition(this.condition)
+      .columns(this.columns)
+      .maxRecords(this.maxRecords);
+  }
+
+  public static class PaimonGroupScanBuilder {
+    private String userName;
+
+    private PaimonFormatPlugin formatPlugin;
+
+    private TupleMetadata schema;
+
+    private String path;
+
+    private LogicalExpression condition;
+
+    private List<SchemaPath> columns;
+
+    private int maxRecords;
+
+    public PaimonGroupScanBuilder userName(String userName) {
+      this.userName = userName;
+      return this;
+    }
+
+    public PaimonGroupScanBuilder formatPlugin(PaimonFormatPlugin 
formatPlugin) {
+      this.formatPlugin = formatPlugin;
+      return this;
+    }
+
+    public PaimonGroupScanBuilder schema(TupleMetadata schema) {
+      this.schema = schema;
+      return this;
+    }
+
+    public PaimonGroupScanBuilder path(String path) {
+      this.path = path;
+      return this;
+    }
+
+    public PaimonGroupScanBuilder condition(LogicalExpression condition) {
+      this.condition = condition;
+      return this;
+    }
+
+    public PaimonGroupScanBuilder columns(List<SchemaPath> columns) {
+      this.columns = columns;
+      return this;
+    }
+
+    public PaimonGroupScanBuilder maxRecords(int maxRecords) {
+      this.maxRecords = maxRecords;
+      return this;
+    }
+
+    public PaimonGroupScan build() throws IOException {
+      return new PaimonGroupScan(this);
+    }
+  }
+}
diff --git 
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonReadUtils.java
 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonReadUtils.java
new file mode 100644
index 0000000000..1c69642265
--- /dev/null
+++ 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonReadUtils.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.paimon.plan.DrillExprToPaimonTranslator;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Utility class for common Paimon read operations such as applying filters
+ * and projections to ReadBuilder instances.
+ */
+public final class PaimonReadUtils {
+  private static final Logger logger = 
LoggerFactory.getLogger(PaimonReadUtils.class);
+
+  private PaimonReadUtils() {
+    // Utility class
+  }
+
+  /**
+   * Applies a filter expression to the Paimon ReadBuilder.
+   *
+   * @param readBuilder the Paimon ReadBuilder
+   * @param rowType the table row type
+   * @param condition the filter condition
+   */
+  public static void applyFilter(ReadBuilder readBuilder, RowType rowType, 
LogicalExpression condition) {
+    if (condition == null) {
+      return;
+    }
+    // Translate Drill expression into a Paimon predicate (if supported).
+    Predicate predicate = DrillExprToPaimonTranslator.translate(condition, 
rowType);
+    if (predicate != null) {
+      readBuilder.withFilter(predicate);
+    }
+  }
+
+  /**
+   * Applies column projection to the Paimon ReadBuilder.
+   *
+   * @param readBuilder the Paimon ReadBuilder
+   * @param rowType the table row type
+   * @param columns the columns to project
+   */
+  public static void applyProjection(ReadBuilder readBuilder, RowType rowType, 
List<SchemaPath> columns) {
+    if (columns == null || columns.isEmpty()) {
+      return;
+    }
+
+    boolean hasStar = columns.stream().anyMatch(SchemaPath::isDynamicStar);
+    if (hasStar) {
+      return;
+    }
+
+    Set<String> projectedNames = new HashSet<>();
+    List<Integer> projection = new ArrayList<>();
+    for (SchemaPath column : columns) {
+      PathSegment segment = column.getRootSegment();
+      if (segment == null || !segment.isNamed()) {
+        continue;
+      }
+      String name = segment.getNameSegment().getPath();
+      if (!projectedNames.add(name)) {
+        continue;
+      }
+      int index = rowType.getFieldIndex(name);
+      if (index < 0) {
+        throw UserException.validationError()
+          .message("Paimon column not found: %s", name)
+          .build(logger);
+      }
+      projection.add(index);
+    }
+
+    if (!projection.isEmpty()) {
+      int[] projectionArray = 
projection.stream().mapToInt(Integer::intValue).toArray();
+      readBuilder.withProjection(projectionArray);
+    }
+  }
+}
diff --git 
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonSubScan.java
 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonSubScan.java
new file mode 100644
index 0000000000..6f94462c1c
--- /dev/null
+++ 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonSubScan.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.paimon.format.PaimonFormatPlugin;
+import com.google.common.base.Preconditions;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+@JsonTypeName("paimon-read")
+@SuppressWarnings("unused")
+public class PaimonSubScan extends AbstractBase implements SubScan {
+
+  private static final String OPERATOR_TYPE = "PAIMON_SUB_SCAN";
+
+  private final PaimonFormatPlugin formatPlugin;
+
+  private final List<SchemaPath> columns;
+
+  private final LogicalExpression condition;
+
+  private final TupleMetadata schema;
+
+  private final List<PaimonWork> workList;
+
+  private final String path;
+
+  private final int maxRecords;
+
+  @JsonCreator
+  public PaimonSubScan(
+    @JsonProperty("userName") String userName,
+    @JsonProperty("storage") StoragePluginConfig storageConfig,
+    @JsonProperty("format") FormatPluginConfig formatConfig,
+    @JsonProperty("columns") List<SchemaPath> columns,
+    @JsonProperty("path") String path,
+    @JsonProperty("workList") List<PaimonWork> workList,
+    @JsonProperty("schema") TupleMetadata schema,
+    @JsonProperty("condition") LogicalExpression condition,
+    @JsonProperty("maxRecords") Integer maxRecords,
+    @JacksonInject StoragePluginRegistry pluginRegistry) {
+    this.formatPlugin = pluginRegistry.resolveFormat(storageConfig, 
formatConfig, PaimonFormatPlugin.class);
+    this.columns = columns;
+    this.workList = workList;
+    this.path = path;
+    this.condition = condition;
+    this.schema = schema;
+    this.maxRecords = maxRecords;
+  }
+
+  private PaimonSubScan(PaimonSubScanBuilder builder) {
+    super(builder.userName);
+    this.formatPlugin = builder.formatPlugin;
+    this.columns = builder.columns;
+    this.condition = builder.condition;
+    this.schema = builder.schema;
+    this.workList = builder.workList;
+    this.path = builder.path;
+    this.maxRecords = builder.maxRecords;
+  }
+
+  public static PaimonSubScanBuilder builder() {
+    return new PaimonSubScanBuilder();
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(
+    PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitSubScan(this, value);
+  }
+
+  public List<PaimonWork> getWorkList() {
+    return workList;
+  }
+
+  public int getMaxRecords() {
+    return maxRecords;
+  }
+
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  public LogicalExpression getCondition() {
+    return condition;
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return this.toBuilder().build();
+  }
+
+  @JsonProperty("storage")
+  public StoragePluginConfig getStorageConfig() {
+    return formatPlugin.getStorageConfig();
+  }
+
+  @JsonProperty("format")
+  public FormatPluginConfig getFormatConfig() {
+    return formatPlugin.getConfig();
+  }
+
+  public String getPath() {
+    return path;
+  }
+
+  @Override
+  public String getOperatorType() {
+    return OPERATOR_TYPE;
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return Collections.emptyIterator();
+  }
+
+  public TupleMetadata getSchema() {
+    return schema;
+  }
+
+  @JsonIgnore
+  public PaimonFormatPlugin getFormatPlugin() {
+    return formatPlugin;
+  }
+
+  public PaimonSubScanBuilder toBuilder() {
+    return new PaimonSubScanBuilder()
+      .userName(this.userName)
+      .formatPlugin(this.formatPlugin)
+      .columns(this.columns)
+      .condition(this.condition)
+      .schema(this.schema)
+      .workList(this.workList)
+      .path(this.path)
+      .maxRecords(this.maxRecords);
+  }
+
+  public static class PaimonSubScanBuilder {
+    private String userName;
+
+    private PaimonFormatPlugin formatPlugin;
+
+    private List<SchemaPath> columns;
+
+    private LogicalExpression condition;
+
+    private TupleMetadata schema;
+
+    private List<PaimonWork> workList;
+
+    private String path;
+
+    private int maxRecords;
+
+    public PaimonSubScanBuilder userName(String userName) {
+      this.userName = userName;
+      return this;
+    }
+
+    public PaimonSubScanBuilder formatPlugin(PaimonFormatPlugin formatPlugin) {
+      this.formatPlugin = formatPlugin;
+      return this;
+    }
+
+    public PaimonSubScanBuilder columns(List<SchemaPath> columns) {
+      this.columns = columns;
+      return this;
+    }
+
+    public PaimonSubScanBuilder condition(LogicalExpression condition) {
+      this.condition = condition;
+      return this;
+    }
+
+    public PaimonSubScanBuilder schema(TupleMetadata schema) {
+      this.schema = schema;
+      return this;
+    }
+
+    public PaimonSubScanBuilder workList(List<PaimonWork> workList) {
+      this.workList = workList;
+      return this;
+    }
+
+    public PaimonSubScanBuilder path(String path) {
+      this.path = path;
+      return this;
+    }
+
+    public PaimonSubScanBuilder maxRecords(int maxRecords) {
+      this.maxRecords = maxRecords;
+      return this;
+    }
+
+    public PaimonSubScan build() {
+      return new PaimonSubScan(this);
+    }
+  }
+}
diff --git 
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonTableUtils.java
 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonTableUtils.java
new file mode 100644
index 0000000000..6979799a3b
--- /dev/null
+++ 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonTableUtils.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.exec.store.paimon.format.PaimonFormatPlugin;
+import 
org.apache.drill.exec.store.paimon.format.PaimonFormatLocationTransformer;
+import org.apache.drill.exec.store.paimon.format.PaimonFormatPluginConfig;
+import org.apache.drill.exec.store.paimon.format.PaimonMetadataType;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.system.SystemTableLoader;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public final class PaimonTableUtils {
+  private PaimonTableUtils() {
+  }
+
+  /**
+   * Load a Paimon table directly from a filesystem path. If a metadata suffix 
is present,
+   * returns the corresponding system table; otherwise returns the data table.
+   */
+  public static Table loadTable(PaimonFormatPlugin formatPlugin, String path) 
throws IOException {
+    PaimonMetadataType metadataType = extractMetadataType(path);
+    String tableLocation = metadataType == null ? path : 
stripMetadataType(path);
+    Path tablePath = new Path(tableLocation);
+    Options options = new Options();
+    CatalogContext context = CatalogContext.create(options, 
formatPlugin.getFsConf());
+    FileIO fileIO = FileIO.get(tablePath, context);
+    FileStoreTable table = FileStoreTableFactory.create(fileIO, tablePath);
+    // Apply time-travel and custom options at table load time.
+    Map<String, String> dynamicOptions = 
buildDynamicOptions(formatPlugin.getConfig());
+    if (!dynamicOptions.isEmpty()) {
+      table = table.copy(dynamicOptions);
+    }
+    if (metadataType == null) {
+      return table;
+    }
+    Table metadataTable = SystemTableLoader.load(metadataType.getName(), 
table);
+    Preconditions.checkArgument(metadataTable != null, "Unsupported metadata 
table: %s", metadataType.getName());
+    return metadataTable;
+  }
+
+  private static PaimonMetadataType extractMetadataType(String location) {
+    int index = 
location.lastIndexOf(PaimonFormatLocationTransformer.METADATA_SEPARATOR);
+    if (index < 0) {
+      return null;
+    }
+    String metadataName = location.substring(index + 1);
+    return PaimonMetadataType.from(metadataName);
+  }
+
+  private static String stripMetadataType(String location) {
+    int index = 
location.lastIndexOf(PaimonFormatLocationTransformer.METADATA_SEPARATOR);
+    return index < 0 ? location : location.substring(0, index);
+  }
+
+  private static Map<String, String> 
buildDynamicOptions(PaimonFormatPluginConfig config) {
+    Map<String, String> dynamicOptions = new HashMap<>();
+    if (config == null) {
+      return dynamicOptions;
+    }
+    if (config.getProperties() != null) {
+      dynamicOptions.putAll(config.getProperties());
+    }
+
+    Long snapshotId = config.getSnapshotId();
+    Long snapshotAsOfTime = config.getSnapshotAsOfTime();
+    Preconditions.checkArgument(snapshotId == null || snapshotAsOfTime == null,
+      "Both 'snapshotId' and 'snapshotAsOfTime' cannot be specified");
+    if (snapshotId != null) {
+      dynamicOptions.put(CoreOptions.SCAN_MODE.key(), 
CoreOptions.StartupMode.FROM_SNAPSHOT.toString());
+      dynamicOptions.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), 
snapshotId.toString());
+    } else if (snapshotAsOfTime != null) {
+      dynamicOptions.put(CoreOptions.SCAN_MODE.key(), 
CoreOptions.StartupMode.FROM_TIMESTAMP.toString());
+      dynamicOptions.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), 
snapshotAsOfTime.toString());
+    }
+
+    return dynamicOptions;
+  }
+}
diff --git 
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonWork.java
 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonWork.java
new file mode 100644
index 0000000000..ff06d60de8
--- /dev/null
+++ 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonWork.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.paimon.table.source.Split;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Base64;
+import java.util.Objects;
+
+@JsonSerialize(using = PaimonWork.PaimonWorkSerializer.class)
+@JsonDeserialize(using = PaimonWork.PaimonWorkDeserializer.class)
+public class PaimonWork {
+  private final Split split;
+
+  public PaimonWork(Split split) {
+    this.split = split;
+  }
+
+  public Split getSplit() {
+    return split;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    PaimonWork that = (PaimonWork) o;
+    return Objects.equals(split, that.split);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(split);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("split", split)
+      .toString();
+  }
+
+  public static class PaimonWorkDeserializer extends 
StdDeserializer<PaimonWork> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(PaimonWorkDeserializer.class);
+
+    public PaimonWorkDeserializer() {
+      super(PaimonWork.class);
+    }
+
+    @Override
+    public PaimonWork deserialize(JsonParser p, DeserializationContext ctxt) 
throws IOException {
+      JsonNode node = p.getCodec().readTree(p);
+      String splitString = node.get(PaimonWorkSerializer.SPLIT_FIELD).asText();
+
+      byte[] decoded = Base64.getDecoder().decode(splitString);
+      try (ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(decoded))) {
+        Object split = ois.readObject();
+        if (!(split instanceof Split)) {
+          throw UserException.dataReadError()
+            .message("Deserialized object is not a Paimon Split: %s", 
split.getClass().getName())
+            .build(logger);
+        }
+        return new PaimonWork((Split) split);
+      } catch (ClassNotFoundException e) {
+        logger.error("Failed to deserialize Paimon Split: {}", e.getMessage(), 
e);
+        throw UserException.dataReadError(e)
+          .message("Failed to deserialize Paimon Split: %s", e.getMessage())
+          .build(logger);
+      }
+    }
+  }
+
+  public static class PaimonWorkSerializer extends StdSerializer<PaimonWork> {
+
+    public static final String SPLIT_FIELD = "split";
+
+    public PaimonWorkSerializer() {
+      super(PaimonWork.class);
+    }
+
+    @Override
+    public void serialize(PaimonWork value, JsonGenerator gen, 
SerializerProvider provider) throws IOException {
+      gen.writeStartObject();
+      try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+           ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+        oos.writeObject(value.split);
+        oos.flush();
+        gen.writeStringField(SPLIT_FIELD, 
Base64.getEncoder().encodeToString(baos.toByteArray()));
+      }
+      gen.writeEndObject();
+    }
+  }
+
+}
diff --git 
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatLocationTransformer.java
 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatLocationTransformer.java
new file mode 100644
index 0000000000..43b0e58854
--- /dev/null
+++ 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatLocationTransformer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FormatLocationTransformer;
+
+import java.util.function.Function;
+
+public class PaimonFormatLocationTransformer implements 
FormatLocationTransformer {
+  public static final FormatLocationTransformer INSTANCE = new 
PaimonFormatLocationTransformer();
+
+  // Paimon metadata tables are addressed via suffix: e.g. 
/path/to/table#snapshots
+  public static final String METADATA_SEPARATOR = "#";
+
+  @Override
+  public boolean canTransform(String location) {
+    PaimonMetadataType metadataType = getMetadataType(location);
+    if (metadataType == null) {
+      return false;
+    }
+    return true;
+  }
+
+  private PaimonMetadataType getMetadataType(String location) {
+    String metadataType = StringUtils.substringAfterLast(location, 
METADATA_SEPARATOR);
+    if (StringUtils.isNotEmpty(metadataType)) {
+      return PaimonMetadataType.from(metadataType);
+    }
+    return null;
+  }
+
+  @Override
+  public FileSelection transform(String location, Function<String, 
FileSelection> selectionFactory) {
+    PaimonMetadataType metadataType = getMetadataType(location);
+    location = StringUtils.substringBeforeLast(location, METADATA_SEPARATOR);
+    FileSelection fileSelection = selectionFactory.apply(location);
+    if (fileSelection == null) {
+      return null;
+    }
+    // Preserve metadata type while keeping the base path selection.
+    return new PaimonMetadataFileSelection(fileSelection, metadataType);
+  }
+
+}
diff --git 
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatMatcher.java
 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatMatcher.java
new file mode 100644
index 0000000000..bacdbe3c73
--- /dev/null
+++ 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatMatcher.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatLocationTransformer;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.plan.rel.PluginDrillTable;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+public class PaimonFormatMatcher extends FormatMatcher {
+  private static final String SNAPSHOT_DIR_NAME = "snapshot";
+  private static final String SCHEMA_DIR_NAME = "schema";
+
+  private final PaimonFormatPlugin formatPlugin;
+
+  public PaimonFormatMatcher(PaimonFormatPlugin formatPlugin) {
+    this.formatPlugin = formatPlugin;
+  }
+
+  @Override
+  public boolean supportDirectoryReads() {
+    return true;
+  }
+
+  @Override
+  public DrillTable isReadable(DrillFileSystem fs, FileSelection selection, 
FileSystemPlugin fsPlugin,
+    String storageEngineName, SchemaConfig schemaConfig) throws IOException {
+    Path selectionRoot = selection.getSelectionRoot();
+    Path snapshotDir = new Path(selectionRoot, SNAPSHOT_DIR_NAME);
+    Path schemaDir = new Path(selectionRoot, SCHEMA_DIR_NAME);
+    if (fs.isDirectory(selectionRoot)
+      && fs.exists(snapshotDir) && fs.isDirectory(snapshotDir)
+      && fs.exists(schemaDir) && fs.isDirectory(schemaDir)) {
+      FormatSelection formatSelection = new 
FormatSelection(formatPlugin.getConfig(), selection);
+      return new PluginDrillTable(fsPlugin, storageEngineName, 
schemaConfig.getUserName(),
+        formatSelection, formatPlugin.getConvention());
+    }
+    return null;
+  }
+
+  @Override
+  public boolean isFileReadable(DrillFileSystem fs, FileStatus status) {
+    return false;
+  }
+
+  @Override
+  public FormatPlugin getFormatPlugin() {
+    return formatPlugin;
+  }
+
+  @Override
+  public int priority() {
+    return HIGH_PRIORITY;
+  }
+
+  @Override
+  public FormatLocationTransformer getFormatLocationTransformer() {
+    return PaimonFormatLocationTransformer.INSTANCE;
+  }
+}
diff --git 
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPlugin.java
 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPlugin.java
new file mode 100644
index 0000000000..d778b3f88c
--- /dev/null
+++ 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPlugin.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.planner.common.DrillStatsTable;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.PluginRulesProviderImpl;
+import org.apache.drill.exec.store.StoragePluginRulesSupplier;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.paimon.PaimonGroupScan;
+import org.apache.drill.exec.store.paimon.plan.PaimonPluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginRel;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PaimonFormatPlugin implements FormatPlugin {
+
+  private static final String PAIMON_CONVENTION_PREFIX = "PAIMON.";
+
+  private static final AtomicInteger NEXT_ID = new AtomicInteger(0);
+
+  private final FileSystemConfig storageConfig;
+
+  private final PaimonFormatPluginConfig config;
+
+  private final Configuration fsConf;
+
+  private final DrillbitContext context;
+
+  private final String name;
+
+  private final PaimonFormatMatcher matcher;
+
+  private final StoragePluginRulesSupplier storagePluginRulesSupplier;
+
+  public PaimonFormatPlugin(
+    String name,
+    DrillbitContext context,
+    Configuration fsConf,
+    FileSystemConfig storageConfig,
+    PaimonFormatPluginConfig config) {
+    this.storageConfig = storageConfig;
+    this.config = config;
+    this.fsConf = fsConf;
+    this.context = context;
+    this.name = name;
+    this.matcher = new PaimonFormatMatcher(this);
+    this.storagePluginRulesSupplier = storagePluginRulesSupplier(name + 
NEXT_ID.getAndIncrement());
+  }
+
+  private static StoragePluginRulesSupplier storagePluginRulesSupplier(String 
name) {
+    Convention convention = new Convention.Impl(PAIMON_CONVENTION_PREFIX + 
name, PluginRel.class);
+    return StoragePluginRulesSupplier.builder()
+      .rulesProvider(new PluginRulesProviderImpl(convention, 
PaimonPluginImplementor::new))
+      .supportsFilterPushdown(true)
+      .supportsProjectPushdown(true)
+      .supportsLimitPushdown(true)
+      .convention(convention)
+      .build();
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public boolean supportsWrite() {
+    return false;
+  }
+
+  @Override
+  public boolean supportsAutoPartitioning() {
+    return false;
+  }
+
+  @Override
+  public FormatMatcher getMatcher() {
+    return matcher;
+  }
+
+  @Override
+  public AbstractWriter getWriter(PhysicalOperator child, String location, 
List<String> partitionColumns) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set<? extends RelOptRule> getOptimizerRules(PlannerPhase phase) {
+    switch (phase) {
+      case PHYSICAL:
+      case LOGICAL:
+        return storagePluginRulesSupplier.getOptimizerRules();
+      case LOGICAL_PRUNE_AND_JOIN:
+      case LOGICAL_PRUNE:
+      case PARTITION_PRUNING:
+      case JOIN_PLANNING:
+      default:
+        return Collections.emptySet();
+    }
+  }
+
+  @Override
+  public AbstractGroupScan getGroupScan(String userName, FileSelection 
selection, List<SchemaPath> columns) throws IOException {
+    return PaimonGroupScan.builder()
+      .userName(userName)
+      .formatPlugin(this)
+      .path(getPath(selection))
+      .columns(columns)
+      .maxRecords(-1)
+      .build();
+  }
+
+  @Override
+  public AbstractGroupScan getGroupScan(String userName, FileSelection 
selection,
+    List<SchemaPath> columns, MetadataProviderManager metadataProviderManager) 
throws IOException {
+    SchemaProvider schemaProvider = 
metadataProviderManager.getSchemaProvider();
+    TupleMetadata schema = null;
+    if (schemaProvider != null) {
+      schema = schemaProvider.read().getSchema();
+    }
+    return PaimonGroupScan.builder()
+      .userName(userName)
+      .formatPlugin(this)
+      .schema(schema)
+      .path(getPath(selection))
+      .columns(columns)
+      .maxRecords(-1)
+      .build();
+  }
+
+  @Override
+  public boolean supportsStatistics() {
+    return false;
+  }
+
+  @Override
+  public DrillStatsTable.TableStatistics readStatistics(FileSystem fs, Path 
statsTablePath) {
+    throw new UnsupportedOperationException("unimplemented");
+  }
+
+  @Override
+  public void writeStatistics(DrillStatsTable.TableStatistics statistics, 
FileSystem fs, Path statsTablePath) {
+    throw new UnsupportedOperationException("unimplemented");
+  }
+
+  @Override
+  public PaimonFormatPluginConfig getConfig() {
+    return config;
+  }
+
+  @Override
+  public FileSystemConfig getStorageConfig() {
+    return storageConfig;
+  }
+
+  @Override
+  public Configuration getFsConf() {
+    return fsConf;
+  }
+
+  @Override
+  public DrillbitContext getContext() {
+    return context;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  public Convention getConvention() {
+    return storagePluginRulesSupplier.convention();
+  }
+
+  private String getPath(FileSelection selection) {
+    String path = selection.getSelectionRoot().toString();
+    if (selection instanceof PaimonMetadataFileSelection) {
+      PaimonMetadataFileSelection metadataFileSelection = 
(PaimonMetadataFileSelection) selection;
+      // Map dfs.`/path/table#snapshots` to Paimon system table.
+      path = String.format("%s%s%s", path, 
PaimonFormatLocationTransformer.METADATA_SEPARATOR,
+        metadataFileSelection.getMetadataType().getName());
+    }
+    return path;
+  }
+
+}
diff --git 
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPluginConfig.java
 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPluginConfig.java
new file mode 100644
index 0000000000..f30c0b4f82
--- /dev/null
+++ 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPluginConfig.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+import java.util.Map;
+import java.util.Objects;
+
+@JsonTypeName(PaimonFormatPluginConfig.NAME)
+@JsonDeserialize(builder = 
PaimonFormatPluginConfig.PaimonFormatPluginConfigBuilder.class)
+public class PaimonFormatPluginConfig implements FormatPluginConfig {
+
+  public static final String NAME = "paimon";
+
+  private final Map<String, String> properties;
+
+  // Time travel: load a specific snapshot id.
+  private final Long snapshotId;
+
+  // Time travel: load the latest snapshot at or before the given timestamp 
(millis).
+  private final Long snapshotAsOfTime;
+
+  @JsonCreator
+  public PaimonFormatPluginConfig(PaimonFormatPluginConfigBuilder builder) {
+    this.properties = builder.properties;
+    this.snapshotId = builder.snapshotId;
+    this.snapshotAsOfTime = builder.snapshotAsOfTime;
+  }
+
+  public static PaimonFormatPluginConfigBuilder builder() {
+    return new PaimonFormatPluginConfigBuilder();
+  }
+
+  public Map<String, String> getProperties() {
+    return properties;
+  }
+
+  public Long getSnapshotId() {
+    return snapshotId;
+  }
+
+  public Long getSnapshotAsOfTime() {
+    return snapshotAsOfTime;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    PaimonFormatPluginConfig that = (PaimonFormatPluginConfig) o;
+    return Objects.equals(properties, that.properties)
+      && Objects.equals(snapshotId, that.snapshotId)
+      && Objects.equals(snapshotAsOfTime, that.snapshotAsOfTime);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(properties, snapshotId, snapshotAsOfTime);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("properties", properties)
+      .field("snapshotId", snapshotId)
+      .field("snapshotAsOfTime", snapshotAsOfTime)
+      .toString();
+  }
+
+  @JsonPOJOBuilder(withPrefix = "")
+  public static class PaimonFormatPluginConfigBuilder {
+    private Map<String, String> properties;
+
+    private Long snapshotId;
+
+    private Long snapshotAsOfTime;
+
+    public PaimonFormatPluginConfigBuilder properties(Map<String, String> 
properties) {
+      this.properties = properties;
+      return this;
+    }
+
+    public PaimonFormatPluginConfigBuilder snapshotId(Long snapshotId) {
+      this.snapshotId = snapshotId;
+      return this;
+    }
+
+    public PaimonFormatPluginConfigBuilder snapshotAsOfTime(Long 
snapshotAsOfTime) {
+      this.snapshotAsOfTime = snapshotAsOfTime;
+      return this;
+    }
+
+    public PaimonFormatPluginConfig build() {
+      return new PaimonFormatPluginConfig(this);
+    }
+  }
+
+}
diff --git 
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataFileSelection.java
 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataFileSelection.java
new file mode 100644
index 0000000000..ed90d18d92
--- /dev/null
+++ 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataFileSelection.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import org.apache.drill.exec.store.dfs.FileSelection;
+
+public class PaimonMetadataFileSelection extends FileSelection {
+  private final PaimonMetadataType metadataType;
+
+  protected PaimonMetadataFileSelection(FileSelection selection, 
PaimonMetadataType metadataType) {
+    super(selection);
+    this.metadataType = metadataType;
+  }
+
+  public PaimonMetadataType getMetadataType() {
+    return metadataType;
+  }
+}
diff --git 
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataType.java
 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataType.java
new file mode 100644
index 0000000000..0cdec7345c
--- /dev/null
+++ 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataType.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import java.util.Locale;
+
+/**
+ * Paimon system tables exposed via table path suffix, e.g. 
/path/table#snapshots.
+ */
+public enum PaimonMetadataType {
+  SNAPSHOTS("snapshots"),
+  SCHEMAS("schemas"),
+  FILES("files"),
+  MANIFESTS("manifests");
+
+  private final String name;
+
+  PaimonMetadataType(String name) {
+    this.name = name;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public static PaimonMetadataType from(String value) {
+    if (value == null) {
+      return null;
+    }
+    String normalized = value.toLowerCase(Locale.ROOT);
+    for (PaimonMetadataType type : values()) {
+      if (type.name.equals(normalized)) {
+        return type;
+      }
+    }
+    return null;
+  }
+
+}
diff --git 
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/plan/DrillExprToPaimonTranslator.java
 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/plan/DrillExprToPaimonTranslator.java
new file mode 100644
index 0000000000..16942fe4c8
--- /dev/null
+++ 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/plan/DrillExprToPaimonTranslator.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.plan;
+
+import org.apache.drill.common.FunctionNames;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.NullExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.BooleanOperator;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.common.expression.visitors.ExprVisitor;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Optional;
+
+public class DrillExprToPaimonTranslator
+  extends AbstractExprVisitor<Object, DrillExprToPaimonTranslator.Context, 
RuntimeException> {
+
+  // Translate Drill logical expressions into Paimon predicates for filter 
pushdown.
+  public static final ExprVisitor<Object, Context, RuntimeException> INSTANCE =
+    new DrillExprToPaimonTranslator();
+
+  public static Predicate translate(LogicalExpression expression, RowType 
rowType) {
+    if (expression == null || rowType == null) {
+      return null;
+    }
+    Object value = expression.accept(INSTANCE, new Context(rowType));
+    return value instanceof Predicate ? (Predicate) value : null;
+  }
+
+  @Override
+  public Object visitBooleanOperator(BooleanOperator op, Context context) {
+    if (!FunctionNames.AND.equals(op.getName()) && 
!FunctionNames.OR.equals(op.getName())) {
+      return null;
+    }
+    Predicate result = null;
+    for (LogicalExpression arg : op.args()) {
+      Predicate predicate = asPredicate(arg.accept(this, context));
+      if (predicate == null) {
+        return null;
+      }
+      if (result == null) {
+        result = predicate;
+      } else {
+        result = FunctionNames.AND.equals(op.getName())
+          ? PredicateBuilder.and(result, predicate)
+          : PredicateBuilder.or(result, predicate);
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public Object visitFunctionCall(FunctionCall call, Context context) {
+    switch (call.getName()) {
+      case FunctionNames.AND: {
+        Predicate left = asPredicate(call.arg(0).accept(this, context));
+        Predicate right = asPredicate(call.arg(1).accept(this, context));
+        if (left != null && right != null) {
+          return PredicateBuilder.and(left, right);
+        }
+        return null;
+      }
+      case FunctionNames.OR: {
+        Predicate left = asPredicate(call.arg(0).accept(this, context));
+        Predicate right = asPredicate(call.arg(1).accept(this, context));
+        if (left != null && right != null) {
+          return PredicateBuilder.or(left, right);
+        }
+        return null;
+      }
+      case FunctionNames.NOT: {
+        Predicate predicate = asPredicate(call.arg(0).accept(this, context));
+        if (predicate == null) {
+          return null;
+        }
+        Optional<Predicate> negated = predicate.negate();
+        return negated.orElse(null);
+      }
+      case FunctionNames.IS_NULL: {
+        Object arg = call.arg(0).accept(this, context);
+        if (arg instanceof SchemaPath) {
+          return buildIsNullPredicate(context, (SchemaPath) arg, true);
+        }
+        return null;
+      }
+      case FunctionNames.IS_NOT_NULL: {
+        Object arg = call.arg(0).accept(this, context);
+        if (arg instanceof SchemaPath) {
+          return buildIsNullPredicate(context, (SchemaPath) arg, false);
+        }
+        return null;
+      }
+      case FunctionNames.LT:
+        return buildComparisonPredicate(context, call.arg(0), call.arg(1), 
Comparison.LT);
+      case FunctionNames.LE:
+        return buildComparisonPredicate(context, call.arg(0), call.arg(1), 
Comparison.LE);
+      case FunctionNames.GT:
+        return buildComparisonPredicate(context, call.arg(0), call.arg(1), 
Comparison.GT);
+      case FunctionNames.GE:
+        return buildComparisonPredicate(context, call.arg(0), call.arg(1), 
Comparison.GE);
+      case FunctionNames.EQ:
+        return buildComparisonPredicate(context, call.arg(0), call.arg(1), 
Comparison.EQ);
+      case FunctionNames.NE:
+        return buildComparisonPredicate(context, call.arg(0), call.arg(1), 
Comparison.NE);
+      default:
+        return null;
+    }
+  }
+
+  @Override
+  public Object visitSchemaPath(SchemaPath path, Context context) {
+    return path;
+  }
+
+  @Override
+  public Object visitBooleanConstant(ValueExpressions.BooleanExpression e, 
Context context) {
+    return new LiteralValue(e.getBoolean());
+  }
+
+  @Override
+  public Object visitFloatConstant(ValueExpressions.FloatExpression fExpr, 
Context context) {
+    return new LiteralValue(fExpr.getFloat());
+  }
+
+  @Override
+  public Object visitIntConstant(ValueExpressions.IntExpression intExpr, 
Context context) {
+    return new LiteralValue(intExpr.getInt());
+  }
+
+  @Override
+  public Object visitLongConstant(ValueExpressions.LongExpression longExpr, 
Context context) {
+    return new LiteralValue(longExpr.getLong());
+  }
+
+  @Override
+  public Object visitDoubleConstant(ValueExpressions.DoubleExpression dExpr, 
Context context) {
+    return new LiteralValue(dExpr.getDouble());
+  }
+
+  @Override
+  public Object visitDecimal9Constant(ValueExpressions.Decimal9Expression 
decExpr, Context context) {
+    return new LiteralValue(BigDecimal.valueOf(decExpr.getIntFromDecimal(), 
decExpr.getScale()));
+  }
+
+  @Override
+  public Object visitDecimal18Constant(ValueExpressions.Decimal18Expression 
decExpr, Context context) {
+    return new LiteralValue(BigDecimal.valueOf(decExpr.getLongFromDecimal(), 
decExpr.getScale()));
+  }
+
+  @Override
+  public Object visitDecimal28Constant(ValueExpressions.Decimal28Expression 
decExpr, Context context) {
+    return new LiteralValue(decExpr.getBigDecimal());
+  }
+
+  @Override
+  public Object visitDecimal38Constant(ValueExpressions.Decimal38Expression 
decExpr, Context context) {
+    return new LiteralValue(decExpr.getBigDecimal());
+  }
+
+  @Override
+  public Object visitVarDecimalConstant(ValueExpressions.VarDecimalExpression 
decExpr, Context context) {
+    return new LiteralValue(decExpr.getBigDecimal());
+  }
+
+  @Override
+  public Object visitDateConstant(ValueExpressions.DateExpression dateExpr, 
Context context) {
+    return new LiteralValue(new Date(dateExpr.getDate()));
+  }
+
+  @Override
+  public Object visitTimeConstant(ValueExpressions.TimeExpression timeExpr, 
Context context) {
+    return new LiteralValue(new Time(timeExpr.getTime()));
+  }
+
+  @Override
+  public Object visitTimeStampConstant(ValueExpressions.TimeStampExpression 
timestampExpr, Context context) {
+    return new LiteralValue(new Timestamp(timestampExpr.getTimeStamp()));
+  }
+
+  @Override
+  public Object visitQuotedStringConstant(ValueExpressions.QuotedString e, 
Context context) {
+    return new LiteralValue(e.getString());
+  }
+
+  @Override
+  public Object visitNullExpression(NullExpression e, Context context) {
+    return new LiteralValue(null);
+  }
+
+  @Override
+  public Object visitUnknown(LogicalExpression e, Context context) {
+    return null;
+  }
+
+  private Predicate buildComparisonPredicate(Context context, 
LogicalExpression leftExpr,
+    LogicalExpression rightExpr, Comparison comparison) {
+    Object left = leftExpr.accept(this, context);
+    Object right = rightExpr.accept(this, context);
+    return buildComparisonPredicate(context, left, right, comparison);
+  }
+
+  private Predicate buildComparisonPredicate(Context context, Object left, 
Object right, Comparison comparison) {
+    if (left instanceof SchemaPath && right instanceof LiteralValue) {
+      return buildPredicate(context, (SchemaPath) left, comparison, 
(LiteralValue) right);
+    }
+    if (right instanceof SchemaPath && left instanceof LiteralValue) {
+      return buildPredicate(context, (SchemaPath) right, comparison.flip(), 
(LiteralValue) left);
+    }
+    return null;
+  }
+
+  private Predicate buildPredicate(Context context, SchemaPath path, 
Comparison comparison, LiteralValue literalValue) {
+    int index = columnIndex(context, path);
+    if (index < 0) {
+      return null;
+    }
+    Object value = literalValue.value();
+    if (value == null) {
+      return null;
+    }
+    DataField field = context.rowType.getFields().get(index);
+    Object internalValue;
+    try {
+      internalValue = PredicateBuilder.convertJavaObject(field.type(), value);
+    } catch (RuntimeException e) {
+      return null;
+    }
+    switch (comparison) {
+      case EQ:
+        return context.predicateBuilder.equal(index, internalValue);
+      case NE:
+        return context.predicateBuilder.notEqual(index, internalValue);
+      case LT:
+        return context.predicateBuilder.lessThan(index, internalValue);
+      case LE:
+        return context.predicateBuilder.lessOrEqual(index, internalValue);
+      case GT:
+        return context.predicateBuilder.greaterThan(index, internalValue);
+      case GE:
+        return context.predicateBuilder.greaterOrEqual(index, internalValue);
+      default:
+        return null;
+    }
+  }
+
+  private Predicate buildIsNullPredicate(Context context, SchemaPath path, 
boolean isNull) {
+    int index = columnIndex(context, path);
+    if (index < 0) {
+      return null;
+    }
+    return isNull
+      ? context.predicateBuilder.isNull(index)
+      : context.predicateBuilder.isNotNull(index);
+  }
+
+  private int columnIndex(Context context, SchemaPath path) {
+    PathSegment segment = path.getRootSegment();
+    if (segment == null || !segment.isNamed() || segment.getChild() != null) {
+      return -1;
+    }
+    return 
context.predicateBuilder.indexOf(segment.getNameSegment().getPath());
+  }
+
+  private Predicate asPredicate(Object value) {
+    return value instanceof Predicate ? (Predicate) value : null;
+  }
+
+  private enum Comparison {
+    EQ,
+    NE,
+    LT,
+    LE,
+    GT,
+    GE;
+
+    public Comparison flip() {
+      switch (this) {
+        case LT:
+          return GT;
+        case LE:
+          return GE;
+        case GT:
+          return LT;
+        case GE:
+          return LE;
+        case EQ:
+        case NE:
+        default:
+          return this;
+      }
+    }
+  }
+
+  public static class Context {
+    private final RowType rowType;
+    private final PredicateBuilder predicateBuilder;
+
+    public Context(RowType rowType) {
+      this.rowType = rowType;
+      this.predicateBuilder = new PredicateBuilder(rowType);
+    }
+  }
+
+  private static class LiteralValue {
+    private final Object value;
+
+    private LiteralValue(Object value) {
+      this.value = value;
+    }
+
+    private Object value() {
+      return value;
+    }
+  }
+}
diff --git 
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/plan/PaimonPluginImplementor.java
 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/plan/PaimonPluginImplementor.java
new file mode 100644
index 0000000000..e1af514428
--- /dev/null
+++ 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/plan/PaimonPluginImplementor.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.plan;
+
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Util;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.paimon.PaimonGroupScan;
+import org.apache.drill.exec.store.paimon.PaimonTableUtils;
+import org.apache.drill.exec.store.plan.AbstractPluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.RowType;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.stream.Collectors;
+public class PaimonPluginImplementor extends AbstractPluginImplementor {
+
+  private PaimonGroupScan groupScan;
+
+  @Override
+  public void implement(StoragePluginTableScan scan) {
+    groupScan = (PaimonGroupScan) scan.getGroupScan();
+  }
+
+  @Override
+  public void implement(PluginFilterRel filter) throws IOException {
+    visitChild(filter.getInput());
+
+    RexNode condition = filter.getCondition();
+    LogicalExpression expression = DrillOptiq.toDrill(
+      new 
DrillParseContext(PrelUtil.getPlannerSettings(filter.getCluster().getPlanner())),
+      filter.getInput(),
+      condition);
+    groupScan = groupScan.toBuilder().condition(expression).build();
+  }
+
+  @Override
+  public void implement(PluginProjectRel project) throws IOException {
+    visitChild(project.getInput());
+
+    DrillParseContext context = new 
DrillParseContext(PrelUtil.getPlannerSettings(project.getCluster().getPlanner()));
+    RelNode input = project.getInput();
+
+    List<SchemaPath> projects = project.getProjects().stream()
+      .map(e -> (SchemaPath) DrillOptiq.toDrill(context, input, e))
+      .collect(Collectors.toList());
+    groupScan = groupScan.clone(projects);
+  }
+
+  @Override
+  public void implement(PluginLimitRel limit) throws IOException {
+    visitChild(limit.getInput());
+    int maxRecords = getArtificialLimit(limit);
+    if (maxRecords >= 0) {
+      groupScan = groupScan.applyLimit(maxRecords);
+    }
+  }
+
+  @Override
+  public boolean canImplement(Filter filter) {
+    RexNode condition = filter.getCondition();
+    LogicalExpression logicalExpression = DrillOptiq.toDrill(
+      new 
DrillParseContext(PrelUtil.getPlannerSettings(filter.getCluster().getPlanner())),
+      filter.getInput(),
+      condition);
+    GroupScan scan = findGroupScan(filter);
+    if (!(scan instanceof PaimonGroupScan)) {
+      return false;
+    }
+    PaimonGroupScan paimonGroupScan = (PaimonGroupScan) scan;
+    try {
+      Table table = 
PaimonTableUtils.loadTable(paimonGroupScan.getFormatPlugin(), 
paimonGroupScan.getPath());
+      RowType rowType = table.rowType();
+      return DrillExprToPaimonTranslator.translate(logicalExpression, rowType) 
!= null;
+    } catch (IOException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean canImplement(DrillLimitRelBase limit) {
+    if (hasPluginGroupScan(limit)) {
+      FirstLimitFinder finder = new FirstLimitFinder();
+      limit.getInput().accept(finder);
+      int oldLimit = getArtificialLimit(finder.getFetch(), finder.getOffset());
+      int newLimit = getArtificialLimit(limit);
+      return newLimit >= 0 && (oldLimit < 0 || newLimit < oldLimit);
+    }
+    return false;
+  }
+
+  @Override
+  public boolean artificialLimit() {
+    return true;
+  }
+
+  @Override
+  public boolean canImplement(Project project) {
+    return hasPluginGroupScan(project);
+  }
+
+  @Override
+  public boolean splitProject(Project project) {
+    return true;
+  }
+
+  @Override
+  protected boolean hasPluginGroupScan(RelNode node) {
+    return findGroupScan(node) instanceof PaimonGroupScan;
+  }
+
+  @Override
+  public GroupScan getPhysicalOperator() {
+    return groupScan;
+  }
+
+  @Override
+  protected Class<? extends StoragePlugin> supportedPlugin() {
+    return FileSystemPlugin.class;
+  }
+
+  private int rexLiteralIntValue(RexLiteral offset) {
+    return ((BigDecimal) offset.getValue()).intValue();
+  }
+
+  private int getArtificialLimit(DrillLimitRelBase limit) {
+    return getArtificialLimit(limit.getFetch(), limit.getOffset());
+  }
+
+  private int getArtificialLimit(RexNode fetch, RexNode offset) {
+    int maxRows = -1;
+    if (fetch != null) {
+      maxRows = rexLiteralIntValue((RexLiteral) fetch);
+      if (offset != null) {
+        maxRows += rexLiteralIntValue((RexLiteral) offset);
+      }
+    }
+    return maxRows;
+  }
+
+  private static class FirstLimitFinder extends RelShuttleImpl {
+    private RexNode fetch;
+
+    private RexNode offset;
+
+    @Override
+    public RelNode visit(RelNode other) {
+      if (other instanceof DrillLimitRelBase) {
+        DrillLimitRelBase limitRelBase = (DrillLimitRelBase) other;
+        fetch = limitRelBase.getFetch();
+        offset = limitRelBase.getOffset();
+        return other;
+      } else if (other instanceof RelSubset) {
+        RelSubset relSubset = (RelSubset) other;
+        Util.first(relSubset.getBest(), relSubset.getOriginal()).accept(this);
+      }
+      return super.visit(other);
+    }
+
+    public RexNode getFetch() {
+      return fetch;
+    }
+
+    public RexNode getOffset() {
+      return offset;
+    }
+  }
+}
diff --git 
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonColumnConverterFactory.java
 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonColumnConverterFactory.java
new file mode 100644
index 0000000000..1b411e3ef9
--- /dev/null
+++ 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonColumnConverterFactory.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.read;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.record.ColumnConverter;
+import org.apache.drill.exec.record.ColumnConverterFactory;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.vector.accessor.ValueWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.LocalZoneTimestamp;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.RowType;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+
+public class PaimonColumnConverterFactory extends ColumnConverterFactory {
+
+  public PaimonColumnConverterFactory(TupleMetadata providedSchema) {
+    super(providedSchema);
+  }
+
+  @Override
+  public ColumnConverter.ScalarColumnConverter buildScalar(ColumnMetadata 
readerSchema, ValueWriter writer) {
+    switch (readerSchema.type()) {
+      case TIMESTAMP:
+      case TIMESTAMPTZ:
+        return new ColumnConverter.ScalarColumnConverter(value -> 
writer.setTimestamp(asInstant(value)));
+      case VARDECIMAL:
+        return new ColumnConverter.ScalarColumnConverter(value -> 
writer.setDecimal(asBigDecimal(value)));
+      case VARCHAR:
+        return new ColumnConverter.ScalarColumnConverter(value -> 
writer.setString(asString(value)));
+      case VARBINARY:
+        return new ColumnConverter.ScalarColumnConverter(value -> {
+          byte[] bytes = (byte[]) value;
+          writer.setBytes(bytes, bytes.length);
+        });
+      default:
+        return super.buildScalar(readerSchema, writer);
+    }
+  }
+
+  public static ColumnMetadata getColumnMetadata(DataField field) {
+    DataType type = field.type();
+    String name = field.name();
+    TypeProtos.DataMode dataMode = type.isNullable()
+      ? TypeProtos.DataMode.OPTIONAL
+      : TypeProtos.DataMode.REQUIRED;
+    return getColumnMetadata(name, type, dataMode);
+  }
+
+  public static TupleSchema convertSchema(RowType rowType) {
+    TupleSchema schema = new TupleSchema();
+    for (DataField field : rowType.getFields()) {
+      ColumnMetadata columnMetadata = getColumnMetadata(field);
+      schema.add(columnMetadata);
+    }
+    return schema;
+  }
+
+  private static ColumnMetadata getColumnMetadata(String name, DataType type, 
TypeProtos.DataMode dataMode) {
+    DataTypeRoot typeRoot = type.getTypeRoot();
+    switch (typeRoot) {
+      case ARRAY:
+      case MAP:
+      case MULTISET:
+      case ROW:
+      case VARIANT:
+        throw new UnsupportedOperationException(String.format("Unsupported 
type: %s for column: %s", typeRoot, name));
+      default:
+        return getPrimitiveMetadata(name, type, dataMode);
+    }
+  }
+
+  private static ColumnMetadata getPrimitiveMetadata(String name, DataType 
type, TypeProtos.DataMode dataMode) {
+    TypeProtos.MinorType minorType = getType(type.getTypeRoot());
+    if (minorType == null) {
+      throw new UnsupportedOperationException(String.format("Unsupported type: 
%s for column: %s", type.getTypeRoot(), name));
+    }
+    TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder()
+      .setMinorType(minorType)
+      .setMode(dataMode);
+    if (type.getTypeRoot() == DataTypeRoot.DECIMAL) {
+      DecimalType decimalType = (DecimalType) type;
+      builder.setScale(decimalType.getScale())
+        .setPrecision(decimalType.getPrecision());
+    }
+    MaterializedField materializedField = MaterializedField.create(name, 
builder.build());
+    return MetadataUtils.fromField(materializedField);
+  }
+
+  private static TypeProtos.MinorType getType(DataTypeRoot typeRoot) {
+    switch (typeRoot) {
+      case BOOLEAN:
+        return TypeProtos.MinorType.BIT;
+      case TINYINT:
+        return TypeProtos.MinorType.TINYINT;
+      case SMALLINT:
+        return TypeProtos.MinorType.SMALLINT;
+      case INTEGER:
+        return TypeProtos.MinorType.INT;
+      case BIGINT:
+        return TypeProtos.MinorType.BIGINT;
+      case FLOAT:
+        return TypeProtos.MinorType.FLOAT4;
+      case DOUBLE:
+        return TypeProtos.MinorType.FLOAT8;
+      case DATE:
+        return TypeProtos.MinorType.DATE;
+      case TIME_WITHOUT_TIME_ZONE:
+        return TypeProtos.MinorType.TIME;
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+        return TypeProtos.MinorType.TIMESTAMP;
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+        return TypeProtos.MinorType.TIMESTAMPTZ;
+      case CHAR:
+      case VARCHAR:
+        return TypeProtos.MinorType.VARCHAR;
+      case BINARY:
+      case VARBINARY:
+        return TypeProtos.MinorType.VARBINARY;
+      case DECIMAL:
+        return TypeProtos.MinorType.VARDECIMAL;
+      default:
+        return null;
+    }
+  }
+
+  private static String asString(Object value) {
+    if (value instanceof BinaryString) {
+      return value.toString();
+    }
+    return (String) value;
+  }
+
+  private static BigDecimal asBigDecimal(Object value) {
+    if (value instanceof Decimal) {
+      return ((Decimal) value).toBigDecimal();
+    }
+    return (BigDecimal) value;
+  }
+
+  private static Instant asInstant(Object value) {
+    if (value instanceof Timestamp) {
+      return ((Timestamp) value).toInstant();
+    }
+    if (value instanceof LocalZoneTimestamp) {
+      return ((LocalZoneTimestamp) value).toInstant();
+    }
+    if (value instanceof java.sql.Timestamp) {
+      return ((java.sql.Timestamp) value).toInstant();
+    }
+    if (value instanceof Long) {
+      return Instant.ofEpochMilli((Long) value);
+    }
+    return (Instant) value;
+  }
+}
diff --git 
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonRecordReader.java
 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonRecordReader.java
new file mode 100644
index 0000000000..312838556d
--- /dev/null
+++ 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonRecordReader.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.read;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.impl.scan.v3.FixedReceiver;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.record.ColumnConverter;
+import org.apache.drill.exec.record.ColumnConverterFactory;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.store.paimon.PaimonTableUtils;
+import org.apache.drill.exec.store.paimon.PaimonWork;
+import org.apache.drill.exec.store.paimon.PaimonReadUtils;
+import org.apache.drill.exec.store.paimon.format.PaimonFormatPlugin;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PaimonRecordReader implements ManagedReader {
+  private static final Logger logger = 
LoggerFactory.getLogger(PaimonRecordReader.class);
+
+  private final PaimonFormatPlugin formatPlugin;
+
+  private final String path;
+
+  private final List<SchemaPath> columns;
+
+  private final LogicalExpression condition;
+
+  private final PaimonWork work;
+
+  private final int maxRecords;
+
+  private ResultSetLoader loader;
+
+  private ColumnConverter[] converters;
+
+  private InternalRow.FieldGetter[] getters;
+
+  private RecordReader<InternalRow> recordReader;
+
+  private RecordReader.RecordIterator<InternalRow> currentBatch;
+
+  private OperatorStats stats;
+
+  private int lastSchemaVersion = -1;
+
+  public PaimonRecordReader(PaimonFormatPlugin formatPlugin, String path,
+    List<SchemaPath> columns, LogicalExpression condition, PaimonWork work, 
int maxRecords,
+    SchemaNegotiator negotiator) {
+    this.formatPlugin = formatPlugin;
+    this.path = path;
+    this.columns = columns;
+    this.condition = condition;
+    this.work = work;
+    this.maxRecords = maxRecords;
+    try {
+      Table table = PaimonTableUtils.loadTable(formatPlugin, path);
+      RowType rowType = table.rowType();
+      ReadBuilder readBuilder = table.newReadBuilder();
+      PaimonReadUtils.applyFilter(readBuilder, rowType, condition);
+      PaimonReadUtils.applyProjection(readBuilder, rowType, columns);
+      RowType readType = readBuilder.readType();
+
+      TupleSchema tableSchema = 
PaimonColumnConverterFactory.convertSchema(readType);
+      TupleMetadata providedSchema = negotiator.providedSchema();
+      TupleMetadata tupleSchema = 
FixedReceiver.Builder.mergeSchemas(providedSchema, tableSchema);
+      negotiator.tableSchema(tupleSchema, true);
+      loader = negotiator.build();
+
+      converters = buildConverters(providedSchema, tableSchema);
+      getters = buildGetters(readType);
+
+      recordReader = 
readBuilder.newRead().executeFilter().createReader(work.getSplit());
+      currentBatch = null;
+      stats = negotiator.context().getStats();
+    } catch (IOException e) {
+      throw UserException.dataReadError(e)
+        .message("Failed to open Paimon reader for %s", path)
+        .build(logger);
+    }
+  }
+
+  @Override
+  public boolean next() {
+    RowSetLoader rowWriter = loader.writer();
+    while (!rowWriter.isFull()) {
+      if (!nextLine(rowWriter)) {
+        updateStats(rowWriter);
+        return false;
+      }
+    }
+    updateStats(rowWriter);
+    return true;
+  }
+
+  @Override
+  public void close() {
+    if (currentBatch != null) {
+      currentBatch.releaseBatch();
+    }
+    AutoCloseables.closeSilently(recordReader);
+    if (loader != null) {
+      loader.close();
+    }
+  }
+
+  private boolean nextLine(RowSetLoader rowWriter) {
+    if (rowWriter.limitReached(maxRecords)) {
+      return false;
+    }
+
+    InternalRow row = nextRow();
+    if (row == null) {
+      return false;
+    }
+
+    rowWriter.start();
+    for (int i = 0; i < getters.length; i++) {
+      converters[i].convert(getters[i].getFieldOrNull(row));
+    }
+    rowWriter.save();
+
+    return true;
+  }
+
+  private InternalRow nextRow() {
+    try {
+      while (true) {
+        if (currentBatch == null) {
+          currentBatch = recordReader.readBatch();
+          if (currentBatch == null) {
+            return null;
+          }
+        }
+        InternalRow row = currentBatch.next();
+        if (row != null) {
+          return row;
+        }
+        currentBatch.releaseBatch();
+        currentBatch = null;
+      }
+    } catch (IOException e) {
+      throw UserException.dataReadError(e)
+        .message("Failed to read Paimon data for %s", path)
+        .build(logger);
+    }
+  }
+
+  private void updateStats(RowSetLoader rowWriter) {
+    if (stats == null) {
+      return;
+    }
+    int rowCount = rowWriter.rowCount();
+    if (rowCount == 0) {
+      return;
+    }
+    int schemaVersion = loader.schemaVersion();
+    boolean isNewSchema = schemaVersion != lastSchemaVersion;
+    lastSchemaVersion = schemaVersion;
+    stats.batchReceived(0, rowCount, isNewSchema);
+  }
+
+  private ColumnConverter[] buildConverters(TupleMetadata providedSchema, 
TupleSchema tableSchema) {
+    ColumnConverterFactory factory = new 
PaimonColumnConverterFactory(providedSchema);
+    ColumnConverter[] columnConverters = new 
ColumnConverter[tableSchema.size()];
+    for (int i = 0; i < tableSchema.size(); i++) {
+      ColumnMetadata columnMetadata = tableSchema.metadata(i);
+      columnConverters[i] = factory.getConverter(providedSchema, 
columnMetadata,
+        loader.writer().column(columnMetadata.name()));
+    }
+    return columnConverters;
+  }
+
+  private InternalRow.FieldGetter[] buildGetters(RowType readType) {
+    List<DataField> fields = readType.getFields();
+    InternalRow.FieldGetter[] fieldGetters = new 
InternalRow.FieldGetter[fields.size()];
+    for (int i = 0; i < fields.size(); i++) {
+      fieldGetters[i] = InternalRow.createFieldGetter(fields.get(i).type(), i);
+    }
+    return fieldGetters;
+  }
+}
diff --git 
a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonScanBatchCreator.java
 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonScanBatchCreator.java
new file mode 100644
index 0000000000..ee3e2f5af6
--- /dev/null
+++ 
b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonScanBatchCreator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.read;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.ScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.SchemaNegotiator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.paimon.PaimonSubScan;
+import org.apache.drill.exec.store.paimon.PaimonWork;
+import com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.List;
+
+@SuppressWarnings("unused")
+public class PaimonScanBatchCreator implements BatchCreator<PaimonSubScan> {
+
+  @Override
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context,
+    PaimonSubScan subScan, List<RecordBatch> children) throws 
ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+
+    try {
+      ScanLifecycleBuilder builder = createBuilder(subScan);
+      return builder.buildScanOperator(context, subScan);
+    } catch (UserException e) {
+      throw e;
+    } catch (Throwable e) {
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  private ScanLifecycleBuilder createBuilder(PaimonSubScan subScan) {
+    ScanLifecycleBuilder builder = new ScanLifecycleBuilder();
+    builder.projection(subScan.getColumns());
+    builder.userName(subScan.getUserName());
+    builder.providedSchema(subScan.getSchema());
+    builder.readerFactory(new PaimonReaderFactory(subScan));
+    builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
+    return builder;
+  }
+
+  private static class PaimonReaderFactory implements 
ReaderFactory<SchemaNegotiator> {
+    private final PaimonSubScan subScan;
+    private final Iterator<PaimonWork> workIterator;
+
+    private PaimonReaderFactory(PaimonSubScan subScan) {
+      this.subScan = subScan;
+      this.workIterator = subScan.getWorkList().iterator();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return workIterator.hasNext();
+    }
+
+    @Override
+    public ManagedReader next(SchemaNegotiator negotiator) {
+      return new PaimonRecordReader(subScan.getFormatPlugin(), 
subScan.getPath(),
+        subScan.getColumns(), subScan.getCondition(), workIterator.next(), 
subScan.getMaxRecords(),
+        negotiator);
+    }
+  }
+}
diff --git 
a/contrib/format-paimon/src/main/resources/bootstrap-format-plugins.json 
b/contrib/format-paimon/src/main/resources/bootstrap-format-plugins.json
new file mode 100644
index 0000000000..786d5fd747
--- /dev/null
+++ b/contrib/format-paimon/src/main/resources/bootstrap-format-plugins.json
@@ -0,0 +1,20 @@
+{
+  "storage":{
+    "dfs": {
+      "type": "file",
+      "formats": {
+        "paimon": {
+          "type": "paimon"
+        }
+      }
+    },
+    "s3": {
+      "type": "file",
+      "formats": {
+        "paimon": {
+          "type": "paimon"
+        }
+      }
+    }
+  }
+}
diff --git a/contrib/format-paimon/src/main/resources/drill-module.conf 
b/contrib/format-paimon/src/main/resources/drill-module.conf
new file mode 100644
index 0000000000..44156959a3
--- /dev/null
+++ b/contrib/format-paimon/src/main/resources/drill-module.conf
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#  This file tells Drill to consider this module when class path scanning.
+#  This file can also include any supplementary configuration information.
+#  This file is in HOCON format, see 
https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill.classpath.scanning: {
+  packages += "org.apache.drill.exec.store.paimon"
+}
diff --git 
a/contrib/format-paimon/src/test/java/org/apache/drill/exec/store/paimon/PaimonQueriesTest.java
 
b/contrib/format-paimon/src/test/java/org/apache/drill/exec/store/paimon/PaimonQueriesTest.java
new file mode 100644
index 0000000000..1a500ca4f2
--- /dev/null
+++ 
b/contrib/format-paimon/src/test/java/org/apache/drill/exec/store/paimon/PaimonQueriesTest.java
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon;
+
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.paimon.format.PaimonFormatPluginConfig;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.types.DataTypes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.StringContains.containsString;
+import static 
org.apache.drill.exec.util.StoragePluginTestUtils.DFS_PLUGIN_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class PaimonQueriesTest extends ClusterTest {
+
+  private static final String DB_NAME = "default";
+  private static final String TABLE_NAME = "append_table";
+  private static final String PK_TABLE_NAME = "pk_table";
+  private static final String PAIMON_SCAN_PATTERN = 
"(PAIMON_GROUP_SCAN|PaimonGroupScan)";
+  private static String tableRelativePath;
+  private static String pkTableRelativePath;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    StoragePluginRegistry pluginRegistry = 
cluster.drillbit().getContext().getStorage();
+    FileSystemConfig pluginConfig = (FileSystemConfig) 
pluginRegistry.getPlugin(DFS_PLUGIN_NAME).getConfig();
+    Map<String, FormatPluginConfig> formats = new 
HashMap<>(pluginConfig.getFormats());
+    formats.put("paimon", PaimonFormatPluginConfig.builder().build());
+    FileSystemConfig newPluginConfig = new FileSystemConfig(
+      pluginConfig.getConnection(),
+      pluginConfig.getConfig(),
+      pluginConfig.getWorkspaces(),
+      formats,
+      PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+    newPluginConfig.setEnabled(pluginConfig.isEnabled());
+    pluginRegistry.put(DFS_PLUGIN_NAME, newPluginConfig);
+
+    tableRelativePath = createAppendTable();
+    pkTableRelativePath = createPrimaryKeyTable();
+  }
+
+  @Test
+  public void testReadAppendTable() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s`", 
tableRelativePath);
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1, "alice")
+      .addRow(2, "bob")
+      .addRow(3, "carol")
+      .build();
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testReadPrimaryKeyTable() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s`", 
pkTableRelativePath);
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1, "dave")
+      .addRow(2, "erin")
+      .build();
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testProjectionPushdown() throws Exception {
+    String query = String.format("select name from dfs.tmp.`%s`", 
tableRelativePath);
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(PAIMON_SCAN_PATTERN + ".*columns=\\[.*name.*\\]")
+      .match(true);
+
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("alice")
+      .addRow("bob")
+      .addRow("carol")
+      .build();
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testMultiColumnProjection() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s`", 
tableRelativePath);
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1, "alice")
+      .addRow(2, "bob")
+      .addRow(3, "carol")
+      .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testFilterPushdown() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s` where id = 
2", tableRelativePath);
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2")
+      .match(true);
+
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(2, "bob")
+      .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testFilterPushdownGT() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s` where id > 
1", tableRelativePath);
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*1")
+      .match(true);
+
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(2, "bob")
+      .addRow(3, "carol")
+      .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testFilterPushdownLT() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s` where id < 
3", tableRelativePath);
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*3")
+      .match(true);
+
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1, "alice")
+      .addRow(2, "bob")
+      .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testFilterPushdownGE() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s` where id 
>= 2", tableRelativePath);
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2")
+      .match(true);
+
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(2, "bob")
+      .addRow(3, "carol")
+      .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testFilterPushdownLE() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s` where id 
<= 2", tableRelativePath);
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2")
+      .match(true);
+
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1, "alice")
+      .addRow(2, "bob")
+      .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testFilterPushdownNE() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s` where id 
<> 2", tableRelativePath);
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2")
+      .match(true);
+
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1, "alice")
+      .addRow(3, "carol")
+      .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testFilterPushdownAnd() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s` where id > 
1 and id < 3", tableRelativePath);
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(PAIMON_SCAN_PATTERN + ".*condition=.*booleanAnd")
+      .match(true);
+
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(2, "bob")
+      .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testFilterPushdownOr() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s` where id = 
1 or id = 3", tableRelativePath);
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(PAIMON_SCAN_PATTERN + ".*condition=.*booleanOr")
+      .match(true);
+
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1, "alice")
+      .addRow(3, "carol")
+      .build();
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testFilterPushdownNot() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s` where not 
(id = 2)", tableRelativePath);
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2")
+      .match(true);
+
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1, "alice")
+      .addRow(3, "carol")
+      .build();
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testLimitPushdown() throws Exception {
+    String query = String.format("select id from dfs.tmp.`%s` limit 2", 
tableRelativePath);
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(PAIMON_SCAN_PATTERN + ".*maxRecords=2")
+      .match(true);
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(2, results.rowCount());
+    results.clear();
+  }
+
+  @Test
+  public void testCombinedPushdownFilterProjectionLimit() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s` where id > 
1 limit 1", tableRelativePath);
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*1")
+      .include(PAIMON_SCAN_PATTERN + ".*maxRecords=1")
+      .match(true);
+
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(2, "bob")
+      .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testSelectWildcard() throws Exception {
+    String query = String.format("select * from dfs.tmp.`%s`", 
tableRelativePath);
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1, "alice")
+      .addRow(2, "bob")
+      .addRow(3, "carol")
+      .build();
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testSelectWithOrderBy() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s` order by 
id desc", tableRelativePath);
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(3, "carol")
+      .addRow(2, "bob")
+      .addRow(1, "alice")
+      .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testSelectWithCount() throws Exception {
+    String query = String.format("select count(*) from dfs.tmp.`%s`", 
tableRelativePath);
+
+    assertEquals(3, queryBuilder().sql(query).singletonLong());
+  }
+
+  @Test
+  public void testSerDe() throws Exception {
+    String snapshotQuery = String.format(
+      "select snapshot_id from dfs.tmp.`%s#snapshots` order by commit_time 
limit 1", tableRelativePath);
+
+    long snapshotId = queryBuilder().sql(snapshotQuery).singletonLong();
+    String sql = String.format(
+      "select count(*) as cnt from table(dfs.tmp.`%s`(type => 'paimon', 
snapshotId => %d))",
+      tableRelativePath, snapshotId);
+    String plan = queryBuilder().sql(sql).explainJson();
+    long count = queryBuilder().physical(plan).singletonLong();
+    assertEquals(2, count);
+  }
+
+  @Test
+  public void testInvalidColumnName() throws Exception {
+    String query = String.format("select id, invalid_column from 
dfs.tmp.`%s`", tableRelativePath);
+    try {
+      queryBuilder().sql(query).run();
+      fail("Expected UserRemoteException for invalid column name");
+    } catch (UserRemoteException e) {
+      assertThat(e.getVerboseMessage(), containsString("invalid_column"));
+    }
+  }
+
+  @Test
+  public void testSelectWithSnapshotId() throws Exception {
+    String snapshotQuery = String.format(
+      "select snapshot_id from dfs.tmp.`%s#snapshots` order by commit_time 
limit 1", tableRelativePath);
+
+    long snapshotId = queryBuilder().sql(snapshotQuery).singletonLong();
+    String query = String.format(
+      "select id, name from table(dfs.tmp.`%s`(type => 'paimon', snapshotId => 
%d))",
+      tableRelativePath, snapshotId);
+    long count = queryBuilder().sql(query).run().recordCount();
+    assertEquals(2, count);
+  }
+
+  @Test
+  public void testSelectWithSnapshotAsOfTime() throws Exception {
+    String snapshotQuery = String.format(
+      "select commit_time from dfs.tmp.`%s#snapshots` order by commit_time 
limit 1", tableRelativePath);
+
+    long snapshotTime = queryBuilder().sql(snapshotQuery).singletonLong();
+    String query = String.format(
+      "select id, name from table(dfs.tmp.`%s`(type => 'paimon', 
snapshotAsOfTime => %d))",
+      tableRelativePath, snapshotTime);
+    long count = queryBuilder().sql(query).run().recordCount();
+    assertEquals(2, count);
+  }
+
+  @Test
+  public void testSelectWithSnapshotIdAndSnapshotAsOfTime() throws Exception {
+    String query = String.format(
+      "select * from table(dfs.tmp.`%s`(type => 'paimon', snapshotId => %d, 
snapshotAsOfTime => %d))",
+      tableRelativePath, 123, 456);
+    try {
+      queryBuilder().sql(query).run();
+      fail();
+    } catch (UserRemoteException e) {
+      assertThat(e.getVerboseMessage(),
+        containsString("Both 'snapshotId' and 'snapshotAsOfTime' cannot be 
specified"));
+    }
+  }
+
+  @Test
+  public void testSelectSnapshotsMetadata() throws Exception {
+    String query = String.format("select * from dfs.tmp.`%s#snapshots`", 
tableRelativePath);
+
+    long count = queryBuilder().sql(query).run().recordCount();
+    assertEquals(2, count);
+  }
+
+  @Test
+  public void testSelectSchemasMetadata() throws Exception {
+    String query = String.format("select * from dfs.tmp.`%s#schemas`", 
tableRelativePath);
+
+    long count = queryBuilder().sql(query).run().recordCount();
+    assertEquals(1, count);
+  }
+
+  @Test
+  public void testSelectFilesMetadata() throws Exception {
+    String query = String.format("select * from dfs.tmp.`%s#files`", 
tableRelativePath);
+
+    long count = queryBuilder().sql(query).run().recordCount();
+    assertEquals(2, count);
+  }
+
+  @Test
+  public void testSelectManifestsMetadata() throws Exception {
+    String query = String.format("select * from dfs.tmp.`%s#manifests`", 
tableRelativePath);
+
+    long count = queryBuilder().sql(query).run().recordCount();
+    assertEquals(2, count);
+  }
+
+  private static String createAppendTable() throws Exception {
+    Path dfsRoot = 
Paths.get(dirTestWatcher.getDfsTestTmpDir().toURI().getPath());
+    Path warehouseDir = dfsRoot.resolve("paimon_warehouse");
+
+    Options options = new Options();
+    options.set("warehouse", warehouseDir.toUri().toString());
+    options.set("metastore", "filesystem");
+
+    CatalogContext context = CatalogContext.create(options, new 
Configuration());
+    try (Catalog catalog = CatalogFactory.createCatalog(context)) {
+      catalog.createDatabase(DB_NAME, true);
+
+      Schema schema = Schema.newBuilder()
+        .column("id", DataTypes.INT())
+        .column("name", DataTypes.STRING())
+        .build();
+      Identifier identifier = Identifier.create(DB_NAME, TABLE_NAME);
+      catalog.createTable(identifier, schema, false);
+
+      Table table = catalog.getTable(identifier);
+      writeRows(table, Arrays.asList(
+        GenericRow.of(1, BinaryString.fromString("alice")),
+        GenericRow.of(2, BinaryString.fromString("bob"))
+      ));
+      writeRows(table, Arrays.asList(
+        GenericRow.of(3, BinaryString.fromString("carol"))
+      ));
+    }
+
+    Path tablePath = warehouseDir.resolve(DB_NAME + ".db").resolve(TABLE_NAME);
+    Path relativePath = dfsRoot.relativize(tablePath);
+    return relativePath.toString().replace('\\', '/');
+  }
+
+  private static String createPrimaryKeyTable() throws Exception {
+    Path dfsRoot = 
Paths.get(dirTestWatcher.getDfsTestTmpDir().toURI().getPath());
+    Path warehouseDir = dfsRoot.resolve("paimon_warehouse");
+
+    Options options = new Options();
+    options.set("warehouse", warehouseDir.toUri().toString());
+    options.set("metastore", "filesystem");
+
+    CatalogContext context = CatalogContext.create(options, new 
Configuration());
+    try (Catalog catalog = CatalogFactory.createCatalog(context)) {
+      catalog.createDatabase(DB_NAME, true);
+
+      Schema schema = Schema.newBuilder()
+        .column("id", DataTypes.INT())
+        .column("name", DataTypes.STRING())
+        .option(CoreOptions.BUCKET.key(), "1")
+        .primaryKey("id")
+        .build();
+      Identifier identifier = Identifier.create(DB_NAME, PK_TABLE_NAME);
+      catalog.createTable(identifier, schema, false);
+
+      Table table = catalog.getTable(identifier);
+      writeRows(table, Arrays.asList(
+        GenericRow.of(1, BinaryString.fromString("dave")),
+        GenericRow.of(2, BinaryString.fromString("erin"))
+      ));
+    }
+
+    Path tablePath = warehouseDir.resolve(DB_NAME + 
".db").resolve(PK_TABLE_NAME);
+    Path relativePath = dfsRoot.relativize(tablePath);
+    return relativePath.toString().replace('\\', '/');
+  }
+
+  private static void writeRows(Table table, List<GenericRow> rows) throws 
Exception {
+    BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+    List<CommitMessage> messages;
+    try (BatchTableWrite write = writeBuilder.newWrite()) {
+      for (GenericRow row : rows) {
+        write.write(row);
+      }
+      messages = write.prepareCommit();
+    }
+    try (BatchTableCommit commit = writeBuilder.newCommit()) {
+      commit.commit(messages);
+    }
+  }
+
+}
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 50ccdc90dd..4c1e0bfc19 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -51,6 +51,7 @@
     <module>format-image</module>
     <module>format-log</module>
     <module>format-ltsv</module>
+    <module>format-paimon</module>
     <module>format-pcapng</module>
     <module>format-pdf</module>
     <module>format-sas</module>
diff --git a/distribution/pom.xml b/distribution/pom.xml
index a35882c483..e73ec3f7e6 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -537,6 +537,11 @@
           <artifactId>drill-iceberg-format</artifactId>
           <version>${project.version}</version>
         </dependency>
+        <dependency>
+          <groupId>org.apache.drill.contrib</groupId>
+          <artifactId>drill-paimon-format</artifactId>
+          <version>${project.version}</version>
+        </dependency>
         <dependency>
           <groupId>org.apache.drill.contrib</groupId>
           <artifactId>drill-deltalake-format</artifactId>
diff --git a/distribution/src/assemble/component.xml 
b/distribution/src/assemble/component.xml
index 66792fd43f..d30caef9a8 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -48,6 +48,7 @@
         <include>org.apache.drill.contrib:drill-kudu-storage:jar</include>
         <include>org.apache.drill.contrib:drill-mongo-storage:jar</include>
         <include>org.apache.drill.contrib:drill-opentsdb-storage:jar</include>
+        <include>org.apache.drill.contrib:drill-paimon-format:jar</include>
         <include>org.apache.drill.contrib:drill-storage-cassandra:jar</include>
         
<include>org.apache.drill.contrib:drill-storage-elasticsearch:jar</include>
         
<include>org.apache.drill.contrib:drill-storage-googlesheets:jar</include>
diff --git a/pom.xml b/pom.xml
index 2ab9e8138c..85b2bcb457 100644
--- a/pom.xml
+++ b/pom.xml
@@ -51,6 +51,7 @@
     <!-- Please keep this list sorted. -->
     <aircompressor.version>0.25</aircompressor.version>
     <antlr.version>4.9.3</antlr.version>
+    <apiguardian.version>1.1.2</apiguardian.version>
     <asm.version>9.5</asm.version>
     <avatica.version>1.23.0</avatica.version>
     <avro.version>1.12.0</avro.version>
@@ -93,6 +94,7 @@
     <httpclient.version>4.5.14</httpclient.version>
     <httpdlog-parser.version>5.11.0</httpdlog-parser.version>
     <iceberg.version>0.12.1</iceberg.version>
+    <paimon.version>1.3.1</paimon.version>
     <jackson.version>2.18.3</jackson.version>
     <janino.version>3.1.12</janino.version>
     <javassist.version>3.29.2-GA</javassist.version>
@@ -1013,6 +1015,11 @@
           </exclusion>
         </exclusions>
       </dependency>
+      <dependency>
+        <groupId>org.apiguardian</groupId>
+        <artifactId>apiguardian-api</artifactId>
+        <version>${apiguardian.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.apache.calcite.avatica</groupId>
         <artifactId>avatica-core</artifactId>


Reply via email to