cgivre commented on code in PR #3035:
URL: https://github.com/apache/drill/pull/3035#discussion_r2732737813
##########
exec/java-exec/pom.xml:
##########
@@ -252,6 +243,10 @@
<groupId>${calcite.groupId}</groupId>
<artifactId>calcite-core</artifactId>
</dependency>
+ <dependency>
Review Comment:
Is this needed if we already imported the library in the main `pom.xml` file?
##########
contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatLocationTransformer.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FormatLocationTransformer;
+
+import java.util.function.Function;
+
+public class PaimonFormatLocationTransformer implements
FormatLocationTransformer {
+ public static final FormatLocationTransformer INSTANCE = new
PaimonFormatLocationTransformer();
+
+ public static final String METADATA_SEPARATOR = "#";
+
+ @Override
+ public boolean canTransform(String location) {
+ return getMetadataType(location) != null;
+ }
+
+ private PaimonMetadataType getMetadataType(String location) {
+ String metadataType = StringUtils.substringAfterLast(location,
METADATA_SEPARATOR);
+ return StringUtils.isNotEmpty(metadataType)
Review Comment:
Nit: Please use if/else. Much easier to follow if you are not familiar
with the code.
##########
distribution/src/assemble/component.xml:
##########
@@ -44,6 +44,7 @@
<include>org.apache.drill.contrib:drill-format-syslog:jar</include>
<include>org.apache.drill.contrib:drill-format-xml:jar</include>
<include>org.apache.drill.contrib:drill-iceberg-format:jar</include>
+ <include>org.apache.drill.contrib:drill-paimon-format:jar</include>
Review Comment:
nit: Please keep these in alphabetical order.
##########
contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataType.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import java.util.Locale;
+
+public enum PaimonMetadataType {
+ SNAPSHOTS("snapshots"),
+ SCHEMAS("schemas"),
+ FILES("files"),
+ MANIFESTS("manifests");
+
+ private final String name;
+
+ PaimonMetadataType(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public static PaimonMetadataType from(String value) {
+ if (value == null) {
+ return null;
+ }
+ String normalized = value.toLowerCase(Locale.ROOT);
+ for (PaimonMetadataType type : values()) {
+ if (type.name.equals(normalized)) {
+ return type;
+ }
+ }
+ return null;
+ }
+}
Review Comment:
Nit: Please add blank line at the end of classes. Here and elsewhere.
##########
contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPlugin.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.planner.common.DrillStatsTable;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.PluginRulesProviderImpl;
+import org.apache.drill.exec.store.StoragePluginRulesSupplier;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.paimon.PaimonGroupScan;
+import org.apache.drill.exec.store.paimon.plan.PaimonPluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginRel;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PaimonFormatPlugin implements FormatPlugin {
+
+ private static final String PAIMON_CONVENTION_PREFIX = "PAIMON.";
+
+ private static final AtomicInteger NEXT_ID = new AtomicInteger(0);
+
+ private final FileSystemConfig storageConfig;
+
+ private final PaimonFormatPluginConfig config;
+
+ private final Configuration fsConf;
+
+ private final DrillbitContext context;
+
+ private final String name;
+
+ private final PaimonFormatMatcher matcher;
+
+ private final StoragePluginRulesSupplier storagePluginRulesSupplier;
+
+ public PaimonFormatPlugin(
+ String name,
+ DrillbitContext context,
+ Configuration fsConf,
+ FileSystemConfig storageConfig,
+ PaimonFormatPluginConfig config) {
+ this.storageConfig = storageConfig;
+ this.config = config;
+ this.fsConf = fsConf;
+ this.context = context;
+ this.name = name;
+ this.matcher = new PaimonFormatMatcher(this);
+ this.storagePluginRulesSupplier = storagePluginRulesSupplier(name +
NEXT_ID.getAndIncrement());
+ }
+
+ private static StoragePluginRulesSupplier storagePluginRulesSupplier(String
name) {
+ Convention convention = new Convention.Impl(PAIMON_CONVENTION_PREFIX +
name, PluginRel.class);
+ return StoragePluginRulesSupplier.builder()
+ .rulesProvider(new PluginRulesProviderImpl(convention,
PaimonPluginImplementor::new))
+ .supportsFilterPushdown(true)
+ .supportsProjectPushdown(true)
+ .supportsLimitPushdown(true)
+ .convention(convention)
+ .build();
+ }
+
+ @Override
+ public boolean supportsRead() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsWrite() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsAutoPartitioning() {
+ return false;
+ }
+
+ @Override
+ public FormatMatcher getMatcher() {
+ return matcher;
+ }
+
+ @Override
+ public AbstractWriter getWriter(PhysicalOperator child, String location,
List<String> partitionColumns) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<? extends RelOptRule> getOptimizerRules(PlannerPhase phase) {
+ switch (phase) {
+ case PHYSICAL:
+ case LOGICAL:
+ return storagePluginRulesSupplier.getOptimizerRules();
+ case LOGICAL_PRUNE_AND_JOIN:
+ case LOGICAL_PRUNE:
+ case PARTITION_PRUNING:
+ case JOIN_PLANNING:
+ default:
+ return Collections.emptySet();
+ }
+ }
+
+ @Override
+ public AbstractGroupScan getGroupScan(String userName, FileSelection
selection, List<SchemaPath> columns) throws IOException {
+ return PaimonGroupScan.builder()
+ .userName(userName)
+ .formatPlugin(this)
+ .path(getPath(selection))
+ .columns(columns)
+ .maxRecords(-1)
+ .build();
+ }
+
+ @Override
+ public AbstractGroupScan getGroupScan(String userName, FileSelection
selection,
+ List<SchemaPath> columns, MetadataProviderManager metadataProviderManager)
throws IOException {
+ SchemaProvider schemaProvider =
metadataProviderManager.getSchemaProvider();
+ TupleMetadata schema = schemaProvider != null
Review Comment:
Again, logic is a bit confusing to read. Please use if/else.
##########
contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPlugin.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.planner.common.DrillStatsTable;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.PluginRulesProviderImpl;
+import org.apache.drill.exec.store.StoragePluginRulesSupplier;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.paimon.PaimonGroupScan;
+import org.apache.drill.exec.store.paimon.plan.PaimonPluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginRel;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PaimonFormatPlugin implements FormatPlugin {
+
+ private static final String PAIMON_CONVENTION_PREFIX = "PAIMON.";
+
+ private static final AtomicInteger NEXT_ID = new AtomicInteger(0);
+
+ private final FileSystemConfig storageConfig;
+
+ private final PaimonFormatPluginConfig config;
+
+ private final Configuration fsConf;
+
+ private final DrillbitContext context;
+
+ private final String name;
+
+ private final PaimonFormatMatcher matcher;
+
+ private final StoragePluginRulesSupplier storagePluginRulesSupplier;
+
+ public PaimonFormatPlugin(
+ String name,
+ DrillbitContext context,
+ Configuration fsConf,
+ FileSystemConfig storageConfig,
+ PaimonFormatPluginConfig config) {
+ this.storageConfig = storageConfig;
+ this.config = config;
+ this.fsConf = fsConf;
+ this.context = context;
+ this.name = name;
+ this.matcher = new PaimonFormatMatcher(this);
+ this.storagePluginRulesSupplier = storagePluginRulesSupplier(name +
NEXT_ID.getAndIncrement());
+ }
+
+ private static StoragePluginRulesSupplier storagePluginRulesSupplier(String
name) {
+ Convention convention = new Convention.Impl(PAIMON_CONVENTION_PREFIX +
name, PluginRel.class);
+ return StoragePluginRulesSupplier.builder()
+ .rulesProvider(new PluginRulesProviderImpl(convention,
PaimonPluginImplementor::new))
+ .supportsFilterPushdown(true)
+ .supportsProjectPushdown(true)
+ .supportsLimitPushdown(true)
+ .convention(convention)
+ .build();
+ }
+
+ @Override
+ public boolean supportsRead() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsWrite() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsAutoPartitioning() {
+ return false;
+ }
+
+ @Override
+ public FormatMatcher getMatcher() {
+ return matcher;
+ }
+
+ @Override
+ public AbstractWriter getWriter(PhysicalOperator child, String location,
List<String> partitionColumns) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<? extends RelOptRule> getOptimizerRules(PlannerPhase phase) {
+ switch (phase) {
+ case PHYSICAL:
+ case LOGICAL:
+ return storagePluginRulesSupplier.getOptimizerRules();
+ case LOGICAL_PRUNE_AND_JOIN:
+ case LOGICAL_PRUNE:
+ case PARTITION_PRUNING:
+ case JOIN_PLANNING:
+ default:
+ return Collections.emptySet();
+ }
+ }
+
+ @Override
+ public AbstractGroupScan getGroupScan(String userName, FileSelection
selection, List<SchemaPath> columns) throws IOException {
+ return PaimonGroupScan.builder()
+ .userName(userName)
+ .formatPlugin(this)
+ .path(getPath(selection))
+ .columns(columns)
+ .maxRecords(-1)
+ .build();
+ }
+
+ @Override
+ public AbstractGroupScan getGroupScan(String userName, FileSelection
selection,
+ List<SchemaPath> columns, MetadataProviderManager metadataProviderManager)
throws IOException {
+ SchemaProvider schemaProvider =
metadataProviderManager.getSchemaProvider();
+ TupleMetadata schema = schemaProvider != null
+ ? schemaProvider.read().getSchema()
+ : null;
+ return PaimonGroupScan.builder()
+ .userName(userName)
+ .formatPlugin(this)
+ .schema(schema)
+ .path(getPath(selection))
+ .columns(columns)
+ .maxRecords(-1)
+ .build();
+ }
+
+ @Override
+ public boolean supportsStatistics() {
+ return false;
+ }
+
+ @Override
+ public DrillStatsTable.TableStatistics readStatistics(FileSystem fs, Path
statsTablePath) {
Review Comment:
Just as a note, I don' think you actually have to overwrite all these
methods that are unimplemented. I believe the parent class already does this.
##########
contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonWork.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.paimon.table.source.Split;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Base64;
+import java.util.Objects;
+import java.util.StringJoiner;
+
+@JsonSerialize(using = PaimonWork.PaimonWorkSerializer.class)
+@JsonDeserialize(using = PaimonWork.PaimonWorkDeserializer.class)
+public class PaimonWork {
+ private final Split split;
+
+ public PaimonWork(Split split) {
+ this.split = split;
+ }
+
+ public Split getSplit() {
+ return split;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PaimonWork that = (PaimonWork) o;
+ return Objects.equals(split, that.split);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(split);
+ }
+
+ @Override
+ public String toString() {
Review Comment:
Please use `PlanStringBuilder` here and elsewhere.
##########
contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPluginConfig.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+import java.util.Map;
+import java.util.Objects;
+
+@JsonTypeName(PaimonFormatPluginConfig.NAME)
+@JsonDeserialize(builder =
PaimonFormatPluginConfig.PaimonFormatPluginConfigBuilder.class)
+public class PaimonFormatPluginConfig implements FormatPluginConfig {
+
+ public static final String NAME = "paimon";
+
+ private final Map<String, String> properties;
+
+ private final Long snapshotId;
+
+ private final Long snapshotAsOfTime;
+
+ @JsonCreator
+ public PaimonFormatPluginConfig(PaimonFormatPluginConfigBuilder builder) {
+ this.properties = builder.properties;
+ this.snapshotId = builder.snapshotId;
+ this.snapshotAsOfTime = builder.snapshotAsOfTime;
+ }
+
+ public static PaimonFormatPluginConfigBuilder builder() {
+ return new PaimonFormatPluginConfigBuilder();
+ }
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ public Long getSnapshotId() {
+ return snapshotId;
+ }
+
+ public Long getSnapshotAsOfTime() {
+ return snapshotAsOfTime;
+ }
+
+ @Override
+ public boolean equals(Object o) {
Review Comment:
I think all of our format plugins also override `toString()`. Please use
the `PlanStringBuilder` to do so.
##########
contrib/format-paimon/src/test/java/org/apache/drill/exec/store/paimon/PaimonQueriesTest.java:
##########
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon;
+
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.paimon.format.PaimonFormatPluginConfig;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.types.DataTypes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Date;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.StringContains.containsString;
+import static
org.apache.drill.exec.util.StoragePluginTestUtils.DFS_PLUGIN_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class PaimonQueriesTest extends ClusterTest {
+
+ private static final String DB_NAME = "default";
+ private static final String TABLE_NAME = "append_table";
+ private static final String PK_TABLE_NAME = "pk_table";
+ private static String tableRelativePath;
+ private static String pkTableRelativePath;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ startCluster(ClusterFixture.builder(dirTestWatcher));
+
+ StoragePluginRegistry pluginRegistry =
cluster.drillbit().getContext().getStorage();
+ FileSystemConfig pluginConfig = (FileSystemConfig)
pluginRegistry.getPlugin(DFS_PLUGIN_NAME).getConfig();
+ Map<String, FormatPluginConfig> formats = new
HashMap<>(pluginConfig.getFormats());
+ formats.put("paimon", PaimonFormatPluginConfig.builder().build());
+ FileSystemConfig newPluginConfig = new FileSystemConfig(
+ pluginConfig.getConnection(),
+ pluginConfig.getConfig(),
+ pluginConfig.getWorkspaces(),
+ formats,
+ PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+ newPluginConfig.setEnabled(pluginConfig.isEnabled());
+ pluginRegistry.put(DFS_PLUGIN_NAME, newPluginConfig);
+
+ tableRelativePath = createAppendTable();
+ pkTableRelativePath = createPrimaryKeyTable();
+ }
+
+ @Test
+ public void testReadAppendTable() throws Exception {
+ String query = String.format("select id, name from dfs.tmp.`%s`",
tableRelativePath);
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("id", "name")
+ .baselineValues(1, "alice")
+ .baselineValues(2, "bob")
+ .baselineValues(3, "carol")
+ .go();
+ }
+
+ @Test
+ public void testReadPrimaryKeyTable() throws Exception {
Review Comment:
First of all, thank you for including robust unit tests.
However, I have a few comments:
1. The tests where you are testing the predicate pushdowns. These tests
are not actually testing whether the pushdowns are happening. For that, you
need to look at the query plan. Take a look here:
https://github.com/apache/drill/blob/bcb43863810360a81d550ed905eccc70b1ba3ec8/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkLimitPushDownTest.java#L38-L48
2. For other tests, please refactor like this one:
https://github.com/apache/drill/blob/bcb43863810360a81d550ed905eccc70b1ba3ec8/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java#L164-L183.
These tests ensure that the Drill is actually returning the correct data
types and so forth.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]