cgivre commented on code in PR #3035: URL: https://github.com/apache/drill/pull/3035#discussion_r2736904808
########## contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonRecordReader.java: ########## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.paimon.read; + +import org.apache.drill.common.AutoCloseables; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; Review Comment: Note: This is using the older version of the `ManagedReader` class. Please use `org.apache.drill.exec.physical.impl.scan.v3.ManagedReader`. This will require refactoring this class a bit. ########## contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPluginConfig.java: ########## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.paimon.format; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; +import org.apache.drill.common.PlanStringBuilder; +import org.apache.drill.common.logical.FormatPluginConfig; + +import java.util.Map; +import java.util.Objects; + +@JsonTypeName(PaimonFormatPluginConfig.NAME) +@JsonDeserialize(builder = PaimonFormatPluginConfig.PaimonFormatPluginConfigBuilder.class) +public class PaimonFormatPluginConfig implements FormatPluginConfig { + + public static final String NAME = "paimon"; + + private final Map<String, String> properties; + + // Time travel: load a specific snapshot id. + private final Long snapshotId; + + // Time travel: load the latest snapshot at or before the given timestamp (millis). + private final Long snapshotAsOfTime; + + @JsonCreator + public PaimonFormatPluginConfig(PaimonFormatPluginConfigBuilder builder) { + this.properties = builder.properties; + this.snapshotId = builder.snapshotId; + this.snapshotAsOfTime = builder.snapshotAsOfTime; + } + + public static PaimonFormatPluginConfigBuilder builder() { + return new PaimonFormatPluginConfigBuilder(); + } + + public Map<String, String> getProperties() { + return properties; + } + + public Long getSnapshotId() { + return snapshotId; + } + + public Long getSnapshotAsOfTime() { + return snapshotAsOfTime; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PaimonFormatPluginConfig that = (PaimonFormatPluginConfig) o; + return Objects.equals(properties, that.properties) + && Objects.equals(snapshotId, that.snapshotId) + && Objects.equals(snapshotAsOfTime, that.snapshotAsOfTime); + } + + @Override + public int hashCode() { + return Objects.hash(properties, snapshotId, snapshotAsOfTime); + } + + @Override + public String toString() { + return new PlanStringBuilder(this) + .field("properties", properties) + .field("snapshotId", snapshotId) + .field("snapshotAsOfTime", snapshotAsOfTime) + .toString(); + } + + @JsonPOJOBuilder(withPrefix = "") + public static class PaimonFormatPluginConfigBuilder { + private Map<String, String> properties; + + private Long snapshotId; + + private Long snapshotAsOfTime; + + public PaimonFormatPluginConfigBuilder properties(Map<String, String> properties) { + this.properties = properties; + return this; + } + + public PaimonFormatPluginConfigBuilder snapshotId(Long snapshotId) { + this.snapshotId = snapshotId; + return this; + } + + public PaimonFormatPluginConfigBuilder snapshotAsOfTime(Long snapshotAsOfTime) { + this.snapshotAsOfTime = snapshotAsOfTime; + return this; + } + + public PaimonFormatPluginConfig build() { + return new PaimonFormatPluginConfig(this); + } + } + +} Review Comment: Please add new line at the end of all classes. Here and elsewhere.. ########## contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPlugin.java: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.paimon.format; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptRule; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.metastore.MetadataProviderManager; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.AbstractWriter; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.planner.PlannerPhase; +import org.apache.drill.exec.planner.common.DrillStatsTable; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.metadata.schema.SchemaProvider; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.PluginRulesProviderImpl; +import org.apache.drill.exec.store.StoragePluginRulesSupplier; +import org.apache.drill.exec.store.dfs.FileSelection; +import org.apache.drill.exec.store.dfs.FileSystemConfig; +import org.apache.drill.exec.store.dfs.FormatMatcher; +import org.apache.drill.exec.store.dfs.FormatPlugin; +import org.apache.drill.exec.store.paimon.PaimonGroupScan; +import org.apache.drill.exec.store.paimon.plan.PaimonPluginImplementor; +import org.apache.drill.exec.store.plan.rel.PluginRel; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +public class PaimonFormatPlugin implements FormatPlugin { + + private static final String PAIMON_CONVENTION_PREFIX = "PAIMON."; + + private static final AtomicInteger NEXT_ID = new AtomicInteger(0); + + private final FileSystemConfig storageConfig; + + private final PaimonFormatPluginConfig config; + + private final Configuration fsConf; + + private final DrillbitContext context; + + private final String name; + + private final PaimonFormatMatcher matcher; + + private final StoragePluginRulesSupplier storagePluginRulesSupplier; + + public PaimonFormatPlugin( + String name, + DrillbitContext context, + Configuration fsConf, + FileSystemConfig storageConfig, + PaimonFormatPluginConfig config) { + this.storageConfig = storageConfig; + this.config = config; + this.fsConf = fsConf; + this.context = context; + this.name = name; + this.matcher = new PaimonFormatMatcher(this); + this.storagePluginRulesSupplier = storagePluginRulesSupplier(name + NEXT_ID.getAndIncrement()); + } + + private static StoragePluginRulesSupplier storagePluginRulesSupplier(String name) { + Convention convention = new Convention.Impl(PAIMON_CONVENTION_PREFIX + name, PluginRel.class); + return StoragePluginRulesSupplier.builder() + .rulesProvider(new PluginRulesProviderImpl(convention, PaimonPluginImplementor::new)) + .supportsFilterPushdown(true) + .supportsProjectPushdown(true) + .supportsLimitPushdown(true) + .convention(convention) + .build(); + } + + @Override + public boolean supportsRead() { + return true; + } + + @Override + public boolean supportsWrite() { + return false; + } + + @Override + public boolean supportsAutoPartitioning() { + return false; + } + + @Override + public FormatMatcher getMatcher() { + return matcher; + } + + @Override + public AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) { + throw new UnsupportedOperationException(); + } + + @Override + public Set<? extends RelOptRule> getOptimizerRules(PlannerPhase phase) { + switch (phase) { + case PHYSICAL: + case LOGICAL: + return storagePluginRulesSupplier.getOptimizerRules(); + case LOGICAL_PRUNE_AND_JOIN: + case LOGICAL_PRUNE: + case PARTITION_PRUNING: + case JOIN_PLANNING: + default: + return Collections.emptySet(); + } + } + + @Override + public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) throws IOException { + return PaimonGroupScan.builder() + .userName(userName) + .formatPlugin(this) + .path(getPath(selection)) + .columns(columns) + .maxRecords(-1) + .build(); + } + + @Override + public AbstractGroupScan getGroupScan(String userName, FileSelection selection, + List<SchemaPath> columns, MetadataProviderManager metadataProviderManager) throws IOException { + SchemaProvider schemaProvider = metadataProviderManager.getSchemaProvider(); + TupleMetadata schema = schemaProvider != null + ? schemaProvider.read().getSchema() + : null; + return PaimonGroupScan.builder() + .userName(userName) + .formatPlugin(this) + .schema(schema) + .path(getPath(selection)) + .columns(columns) + .maxRecords(-1) + .build(); + } + + @Override + public boolean supportsStatistics() { + return false; + } + + @Override + public DrillStatsTable.TableStatistics readStatistics(FileSystem fs, Path statsTablePath) { Review Comment: Take a look at `EasyFormatPlugin`. ########## contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPlugin.java: ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.paimon.format; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptRule; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.metastore.MetadataProviderManager; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.AbstractWriter; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.planner.PlannerPhase; +import org.apache.drill.exec.planner.common.DrillStatsTable; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.metadata.schema.SchemaProvider; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.PluginRulesProviderImpl; +import org.apache.drill.exec.store.StoragePluginRulesSupplier; +import org.apache.drill.exec.store.dfs.FileSelection; +import org.apache.drill.exec.store.dfs.FileSystemConfig; +import org.apache.drill.exec.store.dfs.FormatMatcher; +import org.apache.drill.exec.store.dfs.FormatPlugin; +import org.apache.drill.exec.store.paimon.PaimonGroupScan; +import org.apache.drill.exec.store.paimon.plan.PaimonPluginImplementor; +import org.apache.drill.exec.store.plan.rel.PluginRel; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +public class PaimonFormatPlugin implements FormatPlugin { Review Comment: Now I'm remembering why this looks overly complex. Was there a reason why you chose to directly implement `FormatPlugin` instead of extending `EasyFormatPlugin`? ########## contrib/format-paimon/src/test/java/org/apache/drill/exec/store/paimon/PaimonQueriesTest.java: ########## @@ -0,0 +1,665 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.paimon; + +import org.apache.drill.common.logical.FormatPluginConfig; +import org.apache.drill.common.logical.security.PlainCredentialsProvider; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.physical.rowSet.RowSet; +import org.apache.drill.exec.physical.rowSet.RowSetBuilder; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.dfs.FileSystemConfig; +import org.apache.drill.exec.store.paimon.format.PaimonFormatPluginConfig; +import org.apache.drill.common.exceptions.UserRemoteException; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterTest; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.apache.hadoop.conf.Configuration; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.types.DataTypes; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.StringContains.containsString; +import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_PLUGIN_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class PaimonQueriesTest extends ClusterTest { + + private static final String DB_NAME = "default"; + private static final String TABLE_NAME = "append_table"; + private static final String PK_TABLE_NAME = "pk_table"; + private static final String PAIMON_SCAN_PATTERN = "(PAIMON_GROUP_SCAN|PaimonGroupScan)"; + private static String tableRelativePath; + private static String pkTableRelativePath; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + startCluster(ClusterFixture.builder(dirTestWatcher)); + + StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage(); + FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getPlugin(DFS_PLUGIN_NAME).getConfig(); + Map<String, FormatPluginConfig> formats = new HashMap<>(pluginConfig.getFormats()); + formats.put("paimon", PaimonFormatPluginConfig.builder().build()); + FileSystemConfig newPluginConfig = new FileSystemConfig( + pluginConfig.getConnection(), + pluginConfig.getConfig(), + pluginConfig.getWorkspaces(), + formats, + PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER); + newPluginConfig.setEnabled(pluginConfig.isEnabled()); + pluginRegistry.put(DFS_PLUGIN_NAME, newPluginConfig); + + tableRelativePath = createAppendTable(); + pkTableRelativePath = createPrimaryKeyTable(); + } + + @Test + public void testReadAppendTable() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s`", tableRelativePath); + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(1, "alice") + .addRow(2, "bob") + .addRow(3, "carol") + .build(); + new RowSetComparison(expected).unorderedVerifyAndClearAll(results); + } + + @Test + public void testReadPrimaryKeyTable() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s`", pkTableRelativePath); + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(1, "dave") + .addRow(2, "erin") + .build(); + new RowSetComparison(expected).unorderedVerifyAndClearAll(results); + } + + @Test + public void testProjectionPushdown() throws Exception { + String query = String.format("select name from dfs.tmp.`%s`", tableRelativePath); + + queryBuilder() + .sql(query) + .planMatcher() + .include(PAIMON_SCAN_PATTERN + ".*columns=\\[.*name.*\\]") + .match(true); + + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("alice") + .addRow("bob") + .addRow("carol") + .build(); + new RowSetComparison(expected).unorderedVerifyAndClearAll(results); + } + + @Test + public void testMultiColumnProjection() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s`", tableRelativePath); + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(1, "alice") + .addRow(2, "bob") + .addRow(3, "carol") + .build(); + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testFilterPushdown() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s` where id = 2", tableRelativePath); + + queryBuilder() + .sql(query) + .planMatcher() + .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2") + .match(true); + + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(2, "bob") + .build(); + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testFilterPushdownGT() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s` where id > 1", tableRelativePath); + + queryBuilder() + .sql(query) + .planMatcher() + .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*1") + .match(true); + + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(2, "bob") + .addRow(3, "carol") + .build(); + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testFilterPushdownLT() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s` where id < 3", tableRelativePath); + + queryBuilder() + .sql(query) + .planMatcher() + .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*3") + .match(true); + + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(1, "alice") + .addRow(2, "bob") + .build(); + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testFilterPushdownGE() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s` where id >= 2", tableRelativePath); + + queryBuilder() + .sql(query) + .planMatcher() + .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2") + .match(true); + + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(2, "bob") + .addRow(3, "carol") + .build(); + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testFilterPushdownLE() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s` where id <= 2", tableRelativePath); + + queryBuilder() + .sql(query) + .planMatcher() + .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2") + .match(true); + + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(1, "alice") + .addRow(2, "bob") + .build(); + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testFilterPushdownNE() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s` where id <> 2", tableRelativePath); + + queryBuilder() + .sql(query) + .planMatcher() + .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2") + .match(true); + + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(1, "alice") + .addRow(3, "carol") + .build(); + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testFilterPushdownAnd() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s` where id > 1 and id < 3", tableRelativePath); + + queryBuilder() + .sql(query) + .planMatcher() + .include(PAIMON_SCAN_PATTERN + ".*condition=.*booleanAnd") + .match(true); + + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(2, "bob") + .build(); + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testFilterPushdownOr() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s` where id = 1 or id = 3", tableRelativePath); + + queryBuilder() + .sql(query) + .planMatcher() + .include(PAIMON_SCAN_PATTERN + ".*condition=.*booleanOr") + .match(true); + + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(1, "alice") + .addRow(3, "carol") + .build(); + new RowSetComparison(expected).unorderedVerifyAndClearAll(results); + } + + @Test + public void testFilterPushdownNot() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s` where not (id = 2)", tableRelativePath); + + queryBuilder() + .sql(query) + .planMatcher() + .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2") + .match(true); + + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(1, "alice") + .addRow(3, "carol") + .build(); + new RowSetComparison(expected).unorderedVerifyAndClearAll(results); + } + + @Test + public void testLimitPushdown() throws Exception { + String query = String.format("select id from dfs.tmp.`%s` limit 2", tableRelativePath); + + queryBuilder() + .sql(query) + .planMatcher() + .include(PAIMON_SCAN_PATTERN + ".*maxRecords=2") + .match(true); + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(2, results.rowCount()); + results.clear(); + } + + @Test + public void testCombinedPushdownFilterProjectionLimit() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s` where id > 1 limit 1", tableRelativePath); + + queryBuilder() + .sql(query) + .planMatcher() + .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*1") + .include(PAIMON_SCAN_PATTERN + ".*maxRecords=1") + .match(true); + + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(2, "bob") + .build(); + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testSelectWildcard() throws Exception { + String query = String.format("select * from dfs.tmp.`%s`", tableRelativePath); + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(1, "alice") + .addRow(2, "bob") + .addRow(3, "carol") + .build(); + new RowSetComparison(expected).unorderedVerifyAndClearAll(results); + } + + @Test + public void testSelectWithOrderBy() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s` order by id desc", tableRelativePath); + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(3, "carol") + .addRow(2, "bob") + .addRow(1, "alice") + .build(); + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testSelectWithCount() throws Exception { + String query = String.format("select count(*) from dfs.tmp.`%s`", tableRelativePath); + + assertEquals(3, queryBuilder().sql(query).singletonLong()); + } + + @Test + public void testInvalidColumnName() throws Exception { + String query = String.format("select id, invalid_column from dfs.tmp.`%s`", tableRelativePath); + try { + queryBuilder().sql(query).run(); + fail("Expected UserRemoteException for invalid column name"); + } catch (UserRemoteException e) { + assertThat(e.getVerboseMessage(), containsString("invalid_column")); + } + } + + @Test + public void testSelectWithSnapshotId() throws Exception { + String snapshotQuery = String.format( + "select snapshot_id from dfs.tmp.`%s#snapshots` order by commit_time limit 1", tableRelativePath); + + long snapshotId = queryBuilder().sql(snapshotQuery).singletonLong(); + String query = String.format( + "select id, name from table(dfs.tmp.`%s`(type => 'paimon', snapshotId => %d))", + tableRelativePath, snapshotId); + long count = queryBuilder().sql(query).run().recordCount(); + assertEquals(2, count); + } + + @Test + public void testSelectWithSnapshotAsOfTime() throws Exception { + String snapshotQuery = String.format( + "select commit_time from dfs.tmp.`%s#snapshots` order by commit_time limit 1", tableRelativePath); + + long snapshotTime = queryBuilder().sql(snapshotQuery).singletonLong(); + String query = String.format( + "select id, name from table(dfs.tmp.`%s`(type => 'paimon', snapshotAsOfTime => %d))", + tableRelativePath, snapshotTime); + long count = queryBuilder().sql(query).run().recordCount(); + assertEquals(2, count); + } + + @Test + public void testSelectWithSnapshotIdAndSnapshotAsOfTime() throws Exception { + String query = String.format( + "select * from table(dfs.tmp.`%s`(type => 'paimon', snapshotId => %d, snapshotAsOfTime => %d))", + tableRelativePath, 123, 456); + try { + queryBuilder().sql(query).run(); + fail(); + } catch (UserRemoteException e) { + assertThat(e.getVerboseMessage(), + containsString("Both 'snapshotId' and 'snapshotAsOfTime' cannot be specified")); + } + } + + @Test + public void testSelectSnapshotsMetadata() throws Exception { + String query = String.format("select * from dfs.tmp.`%s#snapshots`", tableRelativePath); + + long count = queryBuilder().sql(query).run().recordCount(); + assertEquals(2, count); + } + + @Test + public void testSelectSchemasMetadata() throws Exception { + String query = String.format("select * from dfs.tmp.`%s#schemas`", tableRelativePath); + + long count = queryBuilder().sql(query).run().recordCount(); + assertEquals(1, count); + } + + @Test + public void testSelectFilesMetadata() throws Exception { + String query = String.format("select * from dfs.tmp.`%s#files`", tableRelativePath); + + long count = queryBuilder().sql(query).run().recordCount(); + assertEquals(2, count); + } + + @Test + public void testSelectManifestsMetadata() throws Exception { Review Comment: Looking good. Can you please add a `SerDe` test? If this test fails, you won't be able to update the config. Take a look here as an example. : https://github.com/apache/drill/blob/bcb43863810360a81d550ed905eccc70b1ba3ec8/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java#L564-L570 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
