This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/release-1.3 by this push:
new cf0515d4e3 [core] Disable data evolution manifest filter for now
(#6443)
cf0515d4e3 is described below
commit cf0515d4e334e290f89db9724f23be65d4e4e34c
Author: YeJunHao <[email protected]>
AuthorDate: Tue Oct 21 18:46:50 2025 +0800
[core] Disable data evolution manifest filter for now (#6443)
---
.../org/apache/paimon/AppendOnlyFileStore.java | 12 +++++
.../operation/DataEvolutionFileStoreScan.java | 59 ++++++++++++++++++++++
.../paimon/table/DataEvolutionTableTest.java | 34 +++++++++++++
3 files changed, 105 insertions(+)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index da40bd20ca..9fb9694e56 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -27,6 +27,7 @@ import org.apache.paimon.operation.AppendOnlyFileStoreScan;
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
import org.apache.paimon.operation.BucketSelectConverter;
import org.apache.paimon.operation.BucketedAppendFileStoreWrite;
+import org.apache.paimon.operation.DataEvolutionFileStoreScan;
import org.apache.paimon.operation.DataEvolutionSplitRead;
import org.apache.paimon.operation.RawFileSplitRead;
import org.apache.paimon.predicate.Predicate;
@@ -168,6 +169,17 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
return Optional.empty();
};
+ if (options().dataEvolutionEnabled()) {
+ return new DataEvolutionFileStoreScan(
+ newManifestsReader(),
+ bucketSelectConverter,
+ snapshotManager(),
+ schemaManager,
+ schema,
+ manifestFileFactory(),
+ options.scanManifestParallelism());
+ }
+
return new AppendOnlyFileStoreScan(
newManifestsReader(),
bucketSelectConverter,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
new file mode 100644
index 0000000000..fad67e3183
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.utils.SnapshotManager;
+
+/** {@link FileStoreScan} for data-evolution enabled table. */
+public class DataEvolutionFileStoreScan extends AppendOnlyFileStoreScan {
+
+ public DataEvolutionFileStoreScan(
+ ManifestsReader manifestsReader,
+ BucketSelectConverter bucketSelectConverter,
+ SnapshotManager snapshotManager,
+ SchemaManager schemaManager,
+ TableSchema schema,
+ ManifestFile.Factory manifestFileFactory,
+ Integer scanManifestParallelism) {
+ super(
+ manifestsReader,
+ bucketSelectConverter,
+ snapshotManager,
+ schemaManager,
+ schema,
+ manifestFileFactory,
+ scanManifestParallelism,
+ false);
+ }
+
+ public DataEvolutionFileStoreScan withFilter(Predicate predicate) {
+ return this;
+ }
+
+ /** Note: Keep this thread-safe. */
+ @Override
+ protected boolean filterByStats(ManifestEntry entry) {
+ return true;
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index 8cbc0413ff..dc4db3da5b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -23,6 +23,8 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.DataEvolutionFileReader;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
@@ -31,6 +33,7 @@ import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -385,6 +388,37 @@ public class DataEvolutionTableTest extends TableTestBase {
});
}
+ @Test
+ public void testPredicate() throws Exception {
+ createTableDefault();
+ Schema schema = schemaDefault();
+ BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+ try (BatchTableWrite write =
builder.newWrite().withWriteType(schema.rowType())) {
+ write.write(
+ GenericRow.of(1, BinaryString.fromString("a"),
BinaryString.fromString("b")));
+ BatchTableCommit commit = builder.newCommit();
+ List<CommitMessage> commitables = write.prepareCommit();
+ commit.commit(commitables);
+ }
+
+ RowType writeType1 =
schema.rowType().project(Collections.singletonList("f2"));
+ try (BatchTableWrite write1 =
builder.newWrite().withWriteType(writeType1)) {
+ write1.write(GenericRow.of(BinaryString.fromString("c")));
+
+ BatchTableCommit commit = builder.newCommit();
+ List<CommitMessage> commitables = write1.prepareCommit();
+ setFirstRowId(commitables, 0L);
+ commit.commit(commitables);
+ }
+
+ ReadBuilder readBuilder = getTableDefault().newReadBuilder();
+ PredicateBuilder predicateBuilder = new
PredicateBuilder(schema.rowType());
+ Predicate predicate = predicateBuilder.notEqual(2,
BinaryString.fromString("b"));
+ readBuilder.withFilter(predicate);
+ assertThat(((DataSplit)
readBuilder.newScan().plan().splits().get(0)).dataFiles().size())
+ .isEqualTo(2);
+ }
+
protected Schema schemaDefault() {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());