[ 
https://issues.apache.org/jira/browse/DRILL-8542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18054862#comment-18054862
 ] 

ASF GitHub Bot commented on DRILL-8542:
---------------------------------------

cgivre commented on code in PR #3035:
URL: https://github.com/apache/drill/pull/3035#discussion_r2736904808


##########
contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonRecordReader.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.read;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;

Review Comment:
   Note:  This is using the older version of the `ManagedReader` class.   
Please use `org.apache.drill.exec.physical.impl.scan.v3.ManagedReader`.  This 
will require refactoring this class a bit. 



##########
contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPluginConfig.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+import java.util.Map;
+import java.util.Objects;
+
+@JsonTypeName(PaimonFormatPluginConfig.NAME)
+@JsonDeserialize(builder = 
PaimonFormatPluginConfig.PaimonFormatPluginConfigBuilder.class)
+public class PaimonFormatPluginConfig implements FormatPluginConfig {
+
+  public static final String NAME = "paimon";
+
+  private final Map<String, String> properties;
+
+  // Time travel: load a specific snapshot id.
+  private final Long snapshotId;
+
+  // Time travel: load the latest snapshot at or before the given timestamp 
(millis).
+  private final Long snapshotAsOfTime;
+
+  @JsonCreator
+  public PaimonFormatPluginConfig(PaimonFormatPluginConfigBuilder builder) {
+    this.properties = builder.properties;
+    this.snapshotId = builder.snapshotId;
+    this.snapshotAsOfTime = builder.snapshotAsOfTime;
+  }
+
+  public static PaimonFormatPluginConfigBuilder builder() {
+    return new PaimonFormatPluginConfigBuilder();
+  }
+
+  public Map<String, String> getProperties() {
+    return properties;
+  }
+
+  public Long getSnapshotId() {
+    return snapshotId;
+  }
+
+  public Long getSnapshotAsOfTime() {
+    return snapshotAsOfTime;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    PaimonFormatPluginConfig that = (PaimonFormatPluginConfig) o;
+    return Objects.equals(properties, that.properties)
+      && Objects.equals(snapshotId, that.snapshotId)
+      && Objects.equals(snapshotAsOfTime, that.snapshotAsOfTime);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(properties, snapshotId, snapshotAsOfTime);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("properties", properties)
+      .field("snapshotId", snapshotId)
+      .field("snapshotAsOfTime", snapshotAsOfTime)
+      .toString();
+  }
+
+  @JsonPOJOBuilder(withPrefix = "")
+  public static class PaimonFormatPluginConfigBuilder {
+    private Map<String, String> properties;
+
+    private Long snapshotId;
+
+    private Long snapshotAsOfTime;
+
+    public PaimonFormatPluginConfigBuilder properties(Map<String, String> 
properties) {
+      this.properties = properties;
+      return this;
+    }
+
+    public PaimonFormatPluginConfigBuilder snapshotId(Long snapshotId) {
+      this.snapshotId = snapshotId;
+      return this;
+    }
+
+    public PaimonFormatPluginConfigBuilder snapshotAsOfTime(Long 
snapshotAsOfTime) {
+      this.snapshotAsOfTime = snapshotAsOfTime;
+      return this;
+    }
+
+    public PaimonFormatPluginConfig build() {
+      return new PaimonFormatPluginConfig(this);
+    }
+  }
+
+}

Review Comment:
   Please add new line at the end of all classes.  Here and elsewhere..



##########
contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPlugin.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.planner.common.DrillStatsTable;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.PluginRulesProviderImpl;
+import org.apache.drill.exec.store.StoragePluginRulesSupplier;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.paimon.PaimonGroupScan;
+import org.apache.drill.exec.store.paimon.plan.PaimonPluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginRel;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PaimonFormatPlugin implements FormatPlugin {
+
+  private static final String PAIMON_CONVENTION_PREFIX = "PAIMON.";
+
+  private static final AtomicInteger NEXT_ID = new AtomicInteger(0);
+
+  private final FileSystemConfig storageConfig;
+
+  private final PaimonFormatPluginConfig config;
+
+  private final Configuration fsConf;
+
+  private final DrillbitContext context;
+
+  private final String name;
+
+  private final PaimonFormatMatcher matcher;
+
+  private final StoragePluginRulesSupplier storagePluginRulesSupplier;
+
+  public PaimonFormatPlugin(
+    String name,
+    DrillbitContext context,
+    Configuration fsConf,
+    FileSystemConfig storageConfig,
+    PaimonFormatPluginConfig config) {
+    this.storageConfig = storageConfig;
+    this.config = config;
+    this.fsConf = fsConf;
+    this.context = context;
+    this.name = name;
+    this.matcher = new PaimonFormatMatcher(this);
+    this.storagePluginRulesSupplier = storagePluginRulesSupplier(name + 
NEXT_ID.getAndIncrement());
+  }
+
+  private static StoragePluginRulesSupplier storagePluginRulesSupplier(String 
name) {
+    Convention convention = new Convention.Impl(PAIMON_CONVENTION_PREFIX + 
name, PluginRel.class);
+    return StoragePluginRulesSupplier.builder()
+      .rulesProvider(new PluginRulesProviderImpl(convention, 
PaimonPluginImplementor::new))
+      .supportsFilterPushdown(true)
+      .supportsProjectPushdown(true)
+      .supportsLimitPushdown(true)
+      .convention(convention)
+      .build();
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public boolean supportsWrite() {
+    return false;
+  }
+
+  @Override
+  public boolean supportsAutoPartitioning() {
+    return false;
+  }
+
+  @Override
+  public FormatMatcher getMatcher() {
+    return matcher;
+  }
+
+  @Override
+  public AbstractWriter getWriter(PhysicalOperator child, String location, 
List<String> partitionColumns) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set<? extends RelOptRule> getOptimizerRules(PlannerPhase phase) {
+    switch (phase) {
+      case PHYSICAL:
+      case LOGICAL:
+        return storagePluginRulesSupplier.getOptimizerRules();
+      case LOGICAL_PRUNE_AND_JOIN:
+      case LOGICAL_PRUNE:
+      case PARTITION_PRUNING:
+      case JOIN_PLANNING:
+      default:
+        return Collections.emptySet();
+    }
+  }
+
+  @Override
+  public AbstractGroupScan getGroupScan(String userName, FileSelection 
selection, List<SchemaPath> columns) throws IOException {
+    return PaimonGroupScan.builder()
+      .userName(userName)
+      .formatPlugin(this)
+      .path(getPath(selection))
+      .columns(columns)
+      .maxRecords(-1)
+      .build();
+  }
+
+  @Override
+  public AbstractGroupScan getGroupScan(String userName, FileSelection 
selection,
+    List<SchemaPath> columns, MetadataProviderManager metadataProviderManager) 
throws IOException {
+    SchemaProvider schemaProvider = 
metadataProviderManager.getSchemaProvider();
+    TupleMetadata schema = schemaProvider != null
+      ? schemaProvider.read().getSchema()
+      : null;
+    return PaimonGroupScan.builder()
+      .userName(userName)
+      .formatPlugin(this)
+      .schema(schema)
+      .path(getPath(selection))
+      .columns(columns)
+      .maxRecords(-1)
+      .build();
+  }
+
+  @Override
+  public boolean supportsStatistics() {
+    return false;
+  }
+
+  @Override
+  public DrillStatsTable.TableStatistics readStatistics(FileSystem fs, Path 
statsTablePath) {

Review Comment:
   Take a look at `EasyFormatPlugin`.



##########
contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPlugin.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.planner.common.DrillStatsTable;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.PluginRulesProviderImpl;
+import org.apache.drill.exec.store.StoragePluginRulesSupplier;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.paimon.PaimonGroupScan;
+import org.apache.drill.exec.store.paimon.plan.PaimonPluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginRel;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PaimonFormatPlugin implements FormatPlugin {

Review Comment:
   Now I'm remembering why this looks overly complex.  Was there a reason why 
you chose to directly implement `FormatPlugin` instead of extending 
`EasyFormatPlugin`?   



##########
contrib/format-paimon/src/test/java/org/apache/drill/exec/store/paimon/PaimonQueriesTest.java:
##########
@@ -0,0 +1,665 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon;
+
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.paimon.format.PaimonFormatPluginConfig;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.types.DataTypes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.StringContains.containsString;
+import static 
org.apache.drill.exec.util.StoragePluginTestUtils.DFS_PLUGIN_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class PaimonQueriesTest extends ClusterTest {
+
+  private static final String DB_NAME = "default";
+  private static final String TABLE_NAME = "append_table";
+  private static final String PK_TABLE_NAME = "pk_table";
+  private static final String PAIMON_SCAN_PATTERN = 
"(PAIMON_GROUP_SCAN|PaimonGroupScan)";
+  private static String tableRelativePath;
+  private static String pkTableRelativePath;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    StoragePluginRegistry pluginRegistry = 
cluster.drillbit().getContext().getStorage();
+    FileSystemConfig pluginConfig = (FileSystemConfig) 
pluginRegistry.getPlugin(DFS_PLUGIN_NAME).getConfig();
+    Map<String, FormatPluginConfig> formats = new 
HashMap<>(pluginConfig.getFormats());
+    formats.put("paimon", PaimonFormatPluginConfig.builder().build());
+    FileSystemConfig newPluginConfig = new FileSystemConfig(
+      pluginConfig.getConnection(),
+      pluginConfig.getConfig(),
+      pluginConfig.getWorkspaces(),
+      formats,
+      PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+    newPluginConfig.setEnabled(pluginConfig.isEnabled());
+    pluginRegistry.put(DFS_PLUGIN_NAME, newPluginConfig);
+
+    tableRelativePath = createAppendTable();
+    pkTableRelativePath = createPrimaryKeyTable();
+  }
+
+  @Test
+  public void testReadAppendTable() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s`", 
tableRelativePath);
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1, "alice")
+      .addRow(2, "bob")
+      .addRow(3, "carol")
+      .build();
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testReadPrimaryKeyTable() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s`", 
pkTableRelativePath);
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1, "dave")
+      .addRow(2, "erin")
+      .build();
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testProjectionPushdown() throws Exception {
+    String query = String.format("select name from dfs.tmp.`%s`", 
tableRelativePath);
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(PAIMON_SCAN_PATTERN + ".*columns=\\[.*name.*\\]")
+      .match(true);
+
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("alice")
+      .addRow("bob")
+      .addRow("carol")
+      .build();
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testMultiColumnProjection() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s`", 
tableRelativePath);
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1, "alice")
+      .addRow(2, "bob")
+      .addRow(3, "carol")
+      .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testFilterPushdown() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s` where id = 
2", tableRelativePath);
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2")
+      .match(true);
+
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(2, "bob")
+      .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testFilterPushdownGT() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s` where id > 
1", tableRelativePath);
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*1")
+      .match(true);
+
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(2, "bob")
+      .addRow(3, "carol")
+      .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testFilterPushdownLT() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s` where id < 
3", tableRelativePath);
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*3")
+      .match(true);
+
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1, "alice")
+      .addRow(2, "bob")
+      .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testFilterPushdownGE() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s` where id 
>= 2", tableRelativePath);
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2")
+      .match(true);
+
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(2, "bob")
+      .addRow(3, "carol")
+      .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testFilterPushdownLE() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s` where id 
<= 2", tableRelativePath);
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2")
+      .match(true);
+
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1, "alice")
+      .addRow(2, "bob")
+      .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testFilterPushdownNE() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s` where id 
<> 2", tableRelativePath);
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2")
+      .match(true);
+
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1, "alice")
+      .addRow(3, "carol")
+      .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testFilterPushdownAnd() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s` where id > 
1 and id < 3", tableRelativePath);
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(PAIMON_SCAN_PATTERN + ".*condition=.*booleanAnd")
+      .match(true);
+
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(2, "bob")
+      .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testFilterPushdownOr() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s` where id = 
1 or id = 3", tableRelativePath);
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(PAIMON_SCAN_PATTERN + ".*condition=.*booleanOr")
+      .match(true);
+
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1, "alice")
+      .addRow(3, "carol")
+      .build();
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testFilterPushdownNot() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s` where not 
(id = 2)", tableRelativePath);
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2")
+      .match(true);
+
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1, "alice")
+      .addRow(3, "carol")
+      .build();
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testLimitPushdown() throws Exception {
+    String query = String.format("select id from dfs.tmp.`%s` limit 2", 
tableRelativePath);
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(PAIMON_SCAN_PATTERN + ".*maxRecords=2")
+      .match(true);
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(2, results.rowCount());
+    results.clear();
+  }
+
+  @Test
+  public void testCombinedPushdownFilterProjectionLimit() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s` where id > 
1 limit 1", tableRelativePath);
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*1")
+      .include(PAIMON_SCAN_PATTERN + ".*maxRecords=1")
+      .match(true);
+
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(2, "bob")
+      .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testSelectWildcard() throws Exception {
+    String query = String.format("select * from dfs.tmp.`%s`", 
tableRelativePath);
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1, "alice")
+      .addRow(2, "bob")
+      .addRow(3, "carol")
+      .build();
+    new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
+  }
+
+  @Test
+  public void testSelectWithOrderBy() throws Exception {
+    String query = String.format("select id, name from dfs.tmp.`%s` order by 
id desc", tableRelativePath);
+    RowSet results = queryBuilder().sql(query).rowSet();
+    TupleMetadata actualSchema = results.schema();
+    assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type());
+    assertEquals(TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").type());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode())
+      .add("name", TypeProtos.MinorType.VARCHAR, 
actualSchema.metadata("name").mode())
+      .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(3, "carol")
+      .addRow(2, "bob")
+      .addRow(1, "alice")
+      .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testSelectWithCount() throws Exception {
+    String query = String.format("select count(*) from dfs.tmp.`%s`", 
tableRelativePath);
+
+    assertEquals(3, queryBuilder().sql(query).singletonLong());
+  }
+
+  @Test
+  public void testInvalidColumnName() throws Exception {
+    String query = String.format("select id, invalid_column from 
dfs.tmp.`%s`", tableRelativePath);
+    try {
+      queryBuilder().sql(query).run();
+      fail("Expected UserRemoteException for invalid column name");
+    } catch (UserRemoteException e) {
+      assertThat(e.getVerboseMessage(), containsString("invalid_column"));
+    }
+  }
+
+  @Test
+  public void testSelectWithSnapshotId() throws Exception {
+    String snapshotQuery = String.format(
+      "select snapshot_id from dfs.tmp.`%s#snapshots` order by commit_time 
limit 1", tableRelativePath);
+
+    long snapshotId = queryBuilder().sql(snapshotQuery).singletonLong();
+    String query = String.format(
+      "select id, name from table(dfs.tmp.`%s`(type => 'paimon', snapshotId => 
%d))",
+      tableRelativePath, snapshotId);
+    long count = queryBuilder().sql(query).run().recordCount();
+    assertEquals(2, count);
+  }
+
+  @Test
+  public void testSelectWithSnapshotAsOfTime() throws Exception {
+    String snapshotQuery = String.format(
+      "select commit_time from dfs.tmp.`%s#snapshots` order by commit_time 
limit 1", tableRelativePath);
+
+    long snapshotTime = queryBuilder().sql(snapshotQuery).singletonLong();
+    String query = String.format(
+      "select id, name from table(dfs.tmp.`%s`(type => 'paimon', 
snapshotAsOfTime => %d))",
+      tableRelativePath, snapshotTime);
+    long count = queryBuilder().sql(query).run().recordCount();
+    assertEquals(2, count);
+  }
+
+  @Test
+  public void testSelectWithSnapshotIdAndSnapshotAsOfTime() throws Exception {
+    String query = String.format(
+      "select * from table(dfs.tmp.`%s`(type => 'paimon', snapshotId => %d, 
snapshotAsOfTime => %d))",
+      tableRelativePath, 123, 456);
+    try {
+      queryBuilder().sql(query).run();
+      fail();
+    } catch (UserRemoteException e) {
+      assertThat(e.getVerboseMessage(),
+        containsString("Both 'snapshotId' and 'snapshotAsOfTime' cannot be 
specified"));
+    }
+  }
+
+  @Test
+  public void testSelectSnapshotsMetadata() throws Exception {
+    String query = String.format("select * from dfs.tmp.`%s#snapshots`", 
tableRelativePath);
+
+    long count = queryBuilder().sql(query).run().recordCount();
+    assertEquals(2, count);
+  }
+
+  @Test
+  public void testSelectSchemasMetadata() throws Exception {
+    String query = String.format("select * from dfs.tmp.`%s#schemas`", 
tableRelativePath);
+
+    long count = queryBuilder().sql(query).run().recordCount();
+    assertEquals(1, count);
+  }
+
+  @Test
+  public void testSelectFilesMetadata() throws Exception {
+    String query = String.format("select * from dfs.tmp.`%s#files`", 
tableRelativePath);
+
+    long count = queryBuilder().sql(query).run().recordCount();
+    assertEquals(2, count);
+  }
+
+  @Test
+  public void testSelectManifestsMetadata() throws Exception {

Review Comment:
   Looking good.  Can you please add a `SerDe` test?    If this test fails, you 
won't be able to update the config.  Take a look here as an example. :
   
https://github.com/apache/drill/blob/bcb43863810360a81d550ed905eccc70b1ba3ec8/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java#L564-L570





> Format plugin for Apache Paimon
> -------------------------------
>
>                 Key: DRILL-8542
>                 URL: https://issues.apache.org/jira/browse/DRILL-8542
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Storage - Other
>            Reporter: Letian Jiang
>            Priority: Major
>
> Introduce a Paimon format plugin, enabling Drill users to query Paimon tables 
> directly via filesystem paths.
>  * support reading data from Paimon tables in Parquet and ORC formats
>  * support projection/filter/limit pushdown 
>  * support snapshot read via table() with snapshotId or snapshotAsOfTime
>  * support metadata tables:  #snapshots, #schemas, #files, #manifests
> Usage examples:
>  * SELECT * FROM dfs.`/path/to/paimon_table`
>  * SELECT * FROM table(dfs.`/path/to/paimon_table`, snapshotId => 123)
>  * SELECT * FROM table(dfs.`/path/to/paimon_table`, snapshotAsOfTime => 
> 1700000000000)
>  * SELECT * FROM dfs.`/path/to/paimon_table#snapshots`
> Reference:
> * https://paimon.apache.org/docs
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to