This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 66430f4b13 [test] Rename to SimpleTableTestBase and refactor overwrite 
tests (#5334)
66430f4b13 is described below

commit 66430f4b136a3f57c093c8b83f16ba4358d4afb9
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 24 13:37:53 2025 +0800

    [test] Rename to SimpleTableTestBase and refactor overwrite tests (#5334)
---
 ...bleTest.java => AppendOnlySimpleTableTest.java} |  44 +---
 .../apache/paimon/table/OverwriteTableTest.java    | 232 +++++++++++++++++++++
 ...bleTest.java => PrimaryKeySimpleTableTest.java} | 104 ++++-----
 ...TableTestBase.java => SimpleTableTestBase.java} | 189 ++---------------
 .../paimon/table/WritePreemptMemoryTest.java       | 161 --------------
 5 files changed, 295 insertions(+), 435 deletions(-)

diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
similarity index 96%
rename from 
paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index 85ab7484f1..715584cdc3 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -94,7 +94,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link AppendOnlyFileStoreTable}. */
-public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
+public class AppendOnlySimpleTableTest extends SimpleTableTestBase {
 
     @Test
     public void testMultipleWriters() throws Exception {
@@ -218,7 +218,7 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         FileStoreTable table = createFileStoreTable();
         generateBranch(table);
 
-        FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME);
+        FileStoreTable tableBranch = createBranchTable(BRANCH_NAME);
         writeBranchData(tableBranch);
         List<Split> splits = 
toSplits(tableBranch.newSnapshotReader().read().dataSplits());
         TableRead read = tableBranch.newRead();
@@ -408,7 +408,7 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         FileStoreTable table = createFileStoreTable();
         generateBranch(table);
 
-        FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME);
+        FileStoreTable tableBranch = createBranchTable(BRANCH_NAME);
         writeBranchData(tableBranch);
 
         List<Split> splits =
@@ -1198,44 +1198,6 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath), 
tablePath, tableSchema);
     }
 
-    @Override
-    protected FileStoreTable createFileStoreTable(String branch, 
Consumer<Options> configure)
-            throws Exception {
-        Options conf = new Options();
-        conf.set(CoreOptions.PATH, tablePath.toString());
-        conf.set(CoreOptions.BRANCH, branch);
-        configure.accept(conf);
-        if (!conf.contains(BUCKET_KEY) && conf.get(BUCKET) != -1) {
-            conf.set(BUCKET_KEY, "a");
-        }
-        TableSchema tableSchema =
-                SchemaUtils.forceCommit(
-                        new SchemaManager(LocalFileIO.create(), tablePath, 
branch),
-                        new Schema(
-                                ROW_TYPE.getFields(),
-                                Collections.singletonList("pt"),
-                                Collections.emptyList(),
-                                conf.toMap(),
-                                ""));
-        return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath), 
tablePath, tableSchema);
-    }
-
-    @Override
-    protected FileStoreTable overwriteTestFileStoreTable() throws Exception {
-        Options conf = new Options();
-        conf.set(CoreOptions.PATH, tablePath.toString());
-        TableSchema tableSchema =
-                SchemaUtils.forceCommit(
-                        new SchemaManager(LocalFileIO.create(), tablePath),
-                        new Schema(
-                                OVERWRITE_TEST_ROW_TYPE.getFields(),
-                                Arrays.asList("pt0", "pt1"),
-                                Collections.emptyList(),
-                                conf.toMap(),
-                                ""));
-        return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath), 
tablePath, tableSchema);
-    }
-
     protected FileStoreTable 
createUnawareBucketFileStoreTable(Consumer<Options> configure)
             throws Exception {
         Options conf = new Options();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/OverwriteTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/OverwriteTableTest.java
new file mode 100644
index 0000000000..eccadd38b2
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/table/OverwriteTableTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.DataFormatTestUtil;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.sink.InnerTableCommit;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.table.SimpleTableTestBase.getResult;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+
+/** Unit tests for overwrite table. */
+public class OverwriteTableTest extends TableTestBase {
+
+    @ParameterizedTest(name = "dynamic = {0}, partition={2}")
+    @MethodSource("overwriteTestData")
+    public void testOverwriteAppend(
+            boolean dynamicPartitionOverwrite,
+            List<InternalRow> overwriteData,
+            Map<String, String> overwritePartition,
+            List<String> expected)
+            throws Exception {
+        innerTestOverwrite(
+                false, dynamicPartitionOverwrite, overwriteData, 
overwritePartition, expected);
+    }
+
+    @ParameterizedTest(name = "dynamic = {0}, partition={2}")
+    @MethodSource("overwriteTestData")
+    public void testOverwritePrimaryKey(
+            boolean dynamicPartitionOverwrite,
+            List<InternalRow> overwriteData,
+            Map<String, String> overwritePartition,
+            List<String> expected)
+            throws Exception {
+        innerTestOverwrite(
+                true, dynamicPartitionOverwrite, overwriteData, 
overwritePartition, expected);
+    }
+
+    private void innerTestOverwrite(
+            boolean withPrimaryKey,
+            boolean dynamicPartitionOverwrite,
+            List<InternalRow> overwriteData,
+            Map<String, String> overwritePartition,
+            List<String> expected)
+            throws Exception {
+        Identifier identifier = identifier("T");
+        Schema.Builder builder =
+                Schema.newBuilder()
+                        .column("pk", DataTypes.INT())
+                        .column("pt0", DataTypes.INT())
+                        .column("pt1", DataTypes.STRING())
+                        .column("v", DataTypes.STRING())
+                        .partitionKeys("pt0", "pt1");
+        if (withPrimaryKey) {
+            builder = builder.primaryKey("pk", "pt0", "pt1");
+            builder.option(BUCKET.key(), "1");
+        }
+        catalog.createTable(identifier, builder.build(), true);
+        Table originTable = catalog.getTable(identifier("T"));
+        FileStoreTable table = (FileStoreTable) originTable;
+        if (!dynamicPartitionOverwrite) {
+            table =
+                    table.copy(
+                            Collections.singletonMap(
+                                    
CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false"));
+        }
+
+        // prepare data
+        // (1, 1, 'A', 'Hi'), (2, 1, 'A', 'Hello'), (3, 1, 'A', 'World'),
+        // (4, 1, 'B', 'To'), (5, 1, 'B', 'Apache'), (6, 1, 'B', 'Paimon')
+        // (7, 2, 'A', 'Test')
+        // (8, 2, 'B', 'Case')
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                InnerTableCommit commit = table.newCommit(commitUser)) {
+            write.write(overwriteRow(1, 1, "A", "Hi"));
+            write.write(overwriteRow(2, 1, "A", "Hello"));
+            write.write(overwriteRow(3, 1, "A", "World"));
+            write.write(overwriteRow(4, 1, "B", "To"));
+            write.write(overwriteRow(5, 1, "B", "Apache"));
+            write.write(overwriteRow(6, 1, "B", "Paimon"));
+            write.write(overwriteRow(7, 2, "A", "Test"));
+            write.write(overwriteRow(8, 2, "B", "Case"));
+            commit.commit(0, write.prepareCommit(true, 0));
+        }
+
+        // overwrite data
+        try (StreamTableWrite write = 
table.newWrite(commitUser).withIgnorePreviousFiles(true);
+                InnerTableCommit commit = table.newCommit(commitUser)) {
+            for (InternalRow row : overwriteData) {
+                write.write(row);
+            }
+            commit.withOverwrite(overwritePartition).commit(1, 
write.prepareCommit(true, 1));
+        }
+
+        // validate
+        List<Split> splits = new 
ArrayList<>(table.newSnapshotReader().read().dataSplits());
+        TableRead read = table.newRead();
+        assertThat(
+                        getResult(
+                                read,
+                                splits,
+                                row ->
+                                        DataFormatTestUtil.toStringNoRowKind(
+                                                row, originTable.rowType())))
+                .hasSameElementsAs(expected);
+    }
+
+    private static List<Arguments> overwriteTestData() {
+        // dynamic, overwrite data, overwrite partition, expected
+        return Arrays.asList(
+                // nothing happen
+                arguments(
+                        true,
+                        Collections.emptyList(),
+                        Collections.emptyMap(),
+                        Arrays.asList(
+                                "1, 1, A, Hi",
+                                "2, 1, A, Hello",
+                                "3, 1, A, World",
+                                "4, 1, B, To",
+                                "5, 1, B, Apache",
+                                "6, 1, B, Paimon",
+                                "7, 2, A, Test",
+                                "8, 2, B, Case")),
+                // delete all data
+                arguments(
+                        false,
+                        Collections.emptyList(),
+                        Collections.emptyMap(),
+                        Collections.emptyList()),
+                // specify one partition key
+                arguments(
+                        true,
+                        Arrays.asList(
+                                overwriteRow(1, 1, "A", "Where"), 
overwriteRow(2, 1, "A", "When")),
+                        Collections.singletonMap("pt0", "1"),
+                        Arrays.asList(
+                                "1, 1, A, Where",
+                                "2, 1, A, When",
+                                "4, 1, B, To",
+                                "5, 1, B, Apache",
+                                "6, 1, B, Paimon",
+                                "7, 2, A, Test",
+                                "8, 2, B, Case")),
+                arguments(
+                        false,
+                        Arrays.asList(
+                                overwriteRow(1, 1, "A", "Where"), 
overwriteRow(2, 1, "A", "When")),
+                        Collections.singletonMap("pt0", "1"),
+                        Arrays.asList(
+                                "1, 1, A, Where",
+                                "2, 1, A, When",
+                                "7, 2, A, Test",
+                                "8, 2, B, Case")),
+                // all dynamic
+                arguments(
+                        true,
+                        Arrays.asList(
+                                overwriteRow(4, 1, "B", "Where"),
+                                overwriteRow(5, 1, "B", "When"),
+                                overwriteRow(10, 2, "A", "Static"),
+                                overwriteRow(11, 2, "A", "Dynamic")),
+                        Collections.emptyMap(),
+                        Arrays.asList(
+                                "1, 1, A, Hi",
+                                "2, 1, A, Hello",
+                                "3, 1, A, World",
+                                "4, 1, B, Where",
+                                "5, 1, B, When",
+                                "10, 2, A, Static",
+                                "11, 2, A, Dynamic",
+                                "8, 2, B, Case")),
+                arguments(
+                        false,
+                        Arrays.asList(
+                                overwriteRow(4, 1, "B", "Where"),
+                                overwriteRow(5, 1, "B", "When"),
+                                overwriteRow(10, 2, "A", "Static"),
+                                overwriteRow(11, 2, "A", "Dynamic")),
+                        Collections.emptyMap(),
+                        Arrays.asList(
+                                "4, 1, B, Where",
+                                "5, 1, B, When",
+                                "10, 2, A, Static",
+                                "11, 2, A, Dynamic")));
+    }
+
+    private static InternalRow overwriteRow(Object... values) {
+        return GenericRow.of(
+                values[0],
+                values[1],
+                BinaryString.fromString((String) values[2]),
+                BinaryString.fromString((String) values[3]));
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
similarity index 97%
rename from 
paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 94bc52305d..40b8498efc 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -96,7 +96,6 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import static org.apache.paimon.CoreOptions.BRANCH;
 import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MAX;
 import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MIN;
@@ -124,19 +123,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link PrimaryKeyFileStoreTable}. */
-public class PrimaryKeyFileStoreTableTest extends FileStoreTableTestBase {
-
-    protected static final Function<InternalRow, String> 
COMPATIBILITY_BATCH_ROW_TO_STRING =
-            rowData ->
-                    rowData.getInt(0)
-                            + "|"
-                            + rowData.getInt(1)
-                            + "|"
-                            + rowData.getLong(2)
-                            + "|"
-                            + new String(rowData.getBinary(3))
-                            + "|"
-                            + new String(rowData.getBinary(4));
+public class PrimaryKeySimpleTableTest extends SimpleTableTestBase {
 
     @Test
     public void testMultipleWriters() throws Exception {
@@ -320,7 +307,7 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
     public void testBranchBatchReadWrite() throws Exception {
         FileStoreTable table = createFileStoreTable();
         generateBranch(table);
-        FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME);
+        FileStoreTable tableBranch = createBranchTable(BRANCH_NAME);
         writeBranchData(tableBranch);
         List<Split> splits = 
toSplits(tableBranch.newSnapshotReader().read().dataSplits());
         TableRead read = tableBranch.newRead();
@@ -402,7 +389,7 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         FileStoreTable table = createFileStoreTable();
         generateBranch(table);
 
-        FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME);
+        FileStoreTable tableBranch = createBranchTable(BRANCH_NAME);
         writeBranchData(tableBranch);
 
         List<Split> splits =
@@ -2210,21 +2197,49 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         }
     }
 
-    @Override
-    protected FileStoreTable overwriteTestFileStoreTable() throws Exception {
-        Options conf = new Options();
-        conf.set(CoreOptions.PATH, tablePath.toString());
-        conf.set(BUCKET, 1);
-        TableSchema tableSchema =
-                SchemaUtils.forceCommit(
-                        new SchemaManager(LocalFileIO.create(), tablePath),
-                        new Schema(
-                                OVERWRITE_TEST_ROW_TYPE.getFields(),
-                                Arrays.asList("pt0", "pt1"),
-                                Arrays.asList("pk", "pt0", "pt1"),
-                                conf.toMap(),
-                                ""));
-        return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), 
tablePath, tableSchema);
+    @Test
+    public void writeMultiplePartitions() throws Exception {
+        testWritePreemptMemory(false);
+    }
+
+    @Test
+    public void writeSinglePartition() throws Exception {
+        testWritePreemptMemory(true);
+    }
+
+    private void testWritePreemptMemory(boolean singlePartition) throws 
Exception {
+        // write
+        FileStoreTable table =
+                createFileStoreTable(
+                        options -> {
+                            // Run with minimal memory to ensure a more 
intense preempt
+                            // Currently a writer needs at least one page
+                            int pages = 10;
+                            options.set(
+                                    CoreOptions.WRITE_BUFFER_SIZE, new 
MemorySize(pages * 1024));
+                            options.set(CoreOptions.PAGE_SIZE, new 
MemorySize(1024));
+                        });
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+        Random random = new Random();
+        List<String> expected = new ArrayList<>();
+        for (int i = 0; i < 10_000; i++) {
+            GenericRow row = rowData(singlePartition ? 0 : random.nextInt(5), 
i, i * 10L);
+            write.write(row);
+            expected.add(BATCH_ROW_TO_STRING.apply(row));
+        }
+        commit.commit(0, write.prepareCommit(true, 0));
+        write.close();
+        commit.close();
+
+        // read
+        List<Split> splits = 
toSplits(table.newSnapshotReader().read().dataSplits());
+        TableRead read = table.newRead();
+        List<String> results = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            results.addAll(getResult(read, splits, binaryRow(i), 0, 
BATCH_ROW_TO_STRING));
+        }
+        assertThat(results).containsExactlyInAnyOrder(expected.toArray(new 
String[0]));
     }
 
     @Override
@@ -2245,31 +2260,4 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
                                 ""));
         return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), 
tablePath, tableSchema);
     }
-
-    @Override
-    protected FileStoreTable createFileStoreTable(String branch, 
Consumer<Options> configure)
-            throws Exception {
-        return createFileStoreTable(branch, configure, ROW_TYPE);
-    }
-
-    private FileStoreTable createFileStoreTable(
-            String branch, Consumer<Options> configure, RowType rowType) 
throws Exception {
-        Options options = new Options();
-        options.set(CoreOptions.PATH, tablePath.toString());
-        options.set(BUCKET, 1);
-        options.set(BRANCH, branch);
-        configure.accept(options);
-        TableSchema latestSchema =
-                new SchemaManager(LocalFileIO.create(), 
tablePath).latest().get();
-        TableSchema tableSchema =
-                new TableSchema(
-                        latestSchema.id(),
-                        latestSchema.fields(),
-                        latestSchema.highestFieldId(),
-                        latestSchema.partitionKeys(),
-                        latestSchema.primaryKeys(),
-                        options.toMap(),
-                        latestSchema.comment());
-        return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), 
tablePath, tableSchema);
-    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
similarity index 90%
rename from 
paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
rename to 
paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
index 4f8e913b8f..cbf9e106ae 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
@@ -25,7 +25,6 @@ import org.apache.paimon.TestFileStore;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.DataFormatTestUtil;
 import org.apache.paimon.data.GenericMap;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -78,8 +77,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
@@ -125,10 +122,12 @@ import static 
org.apache.paimon.utils.Preconditions.checkNotNull;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.jupiter.params.provider.Arguments.arguments;
 
-/** Base test class for {@link FileStoreTable}. */
-public abstract class FileStoreTableTestBase {
+/**
+ * Base test class for simple table, simple representation means a fixed 
schema, see {@link
+ * #ROW_TYPE}.
+ */
+public abstract class SimpleTableTestBase {
 
     protected static final String BRANCH_NAME = "branch1";
 
@@ -145,14 +144,6 @@ public abstract class FileStoreTableTestBase {
                     },
                     new String[] {"pt", "a", "b", "c", "d", "e", "f"});
 
-    // for overwrite test
-    protected static final RowType OVERWRITE_TEST_ROW_TYPE =
-            RowType.of(
-                    new DataType[] {
-                        DataTypes.INT(), DataTypes.INT(), DataTypes.STRING(), 
DataTypes.STRING()
-                    },
-                    new String[] {"pk", "pt0", "pt1", "v"});
-
     protected static final int[] PROJECTION = new int[] {2, 1};
     protected static final Function<InternalRow, String> BATCH_ROW_TO_STRING =
             rowData ->
@@ -277,62 +268,6 @@ public abstract class FileStoreTableTestBase {
                 .containsExactlyElementsOf(expected);
     }
 
-    @ParameterizedTest(name = "dynamic = {0}, partition={2}")
-    @MethodSource("overwriteTestData")
-    public void testOverwriteNothing(
-            boolean dynamicPartitionOverwrite,
-            List<InternalRow> overwriteData,
-            Map<String, String> overwritePartition,
-            List<String> expected)
-            throws Exception {
-        FileStoreTable table = overwriteTestFileStoreTable();
-        if (!dynamicPartitionOverwrite) {
-            table =
-                    table.copy(
-                            Collections.singletonMap(
-                                    
CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false"));
-        }
-
-        // prepare data
-        // (1, 1, 'A', 'Hi'), (2, 1, 'A', 'Hello'), (3, 1, 'A', 'World'),
-        // (4, 1, 'B', 'To'), (5, 1, 'B', 'Apache'), (6, 1, 'B', 'Paimon')
-        // (7, 2, 'A', 'Test')
-        // (8, 2, 'B', 'Case')
-        try (StreamTableWrite write = table.newWrite(commitUser);
-                InnerTableCommit commit = table.newCommit(commitUser)) {
-            write.write(overwriteRow(1, 1, "A", "Hi"));
-            write.write(overwriteRow(2, 1, "A", "Hello"));
-            write.write(overwriteRow(3, 1, "A", "World"));
-            write.write(overwriteRow(4, 1, "B", "To"));
-            write.write(overwriteRow(5, 1, "B", "Apache"));
-            write.write(overwriteRow(6, 1, "B", "Paimon"));
-            write.write(overwriteRow(7, 2, "A", "Test"));
-            write.write(overwriteRow(8, 2, "B", "Case"));
-            commit.commit(0, write.prepareCommit(true, 0));
-        }
-
-        // overwrite data
-        try (StreamTableWrite write = 
table.newWrite(commitUser).withIgnorePreviousFiles(true);
-                InnerTableCommit commit = table.newCommit(commitUser)) {
-            for (InternalRow row : overwriteData) {
-                write.write(row);
-            }
-            commit.withOverwrite(overwritePartition).commit(1, 
write.prepareCommit(true, 1));
-        }
-
-        // validate
-        List<Split> splits = 
toSplits(table.newSnapshotReader().read().dataSplits());
-        TableRead read = table.newRead();
-        assertThat(
-                        getResult(
-                                read,
-                                splits,
-                                row ->
-                                        DataFormatTestUtil.toStringNoRowKind(
-                                                row, OVERWRITE_TEST_ROW_TYPE)))
-                .hasSameElementsAs(expected);
-    }
-
     @Test
     public void testOverwrite() throws Exception {
         FileStoreTable table = createFileStoreTable();
@@ -1291,7 +1226,7 @@ public abstract class FileStoreTableTestBase {
     public void testFastForward() throws Exception {
         FileStoreTable table = createFileStoreTable();
         generateBranch(table);
-        FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME);
+        FileStoreTable tableBranch = createBranchTable(BRANCH_NAME);
 
         // Verify branch1 and the main branch have the same data
         assertThat(
@@ -1619,7 +1554,7 @@ public abstract class FileStoreTableTestBase {
 
         generateBranch(table);
 
-        FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME);
+        FileStoreTable tableBranch = createBranchTable(BRANCH_NAME);
         // Write data to branch1
         try (StreamTableWrite write = tableBranch.newWrite(commitUser);
                 StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
@@ -1728,7 +1663,7 @@ public abstract class FileStoreTableTestBase {
         }
     }
 
-    protected List<String> getResult(
+    public static List<String> getResult(
             TableRead read,
             List<Split> splits,
             BinaryRow partition,
@@ -1738,7 +1673,7 @@ public abstract class FileStoreTableTestBase {
         return getResult(read, getSplitsFor(splits, partition, bucket), 
rowDataToString);
     }
 
-    protected List<String> getResult(
+    public static List<String> getResult(
             TableRead read, List<Split> splits, Function<InternalRow, String> 
rowDataToString)
             throws Exception {
         List<ReaderSupplier<InternalRow>> readers = new ArrayList<>();
@@ -1756,7 +1691,7 @@ public abstract class FileStoreTableTestBase {
         return result;
     }
 
-    private List<Split> getSplitsFor(List<Split> splits, BinaryRow partition, 
int bucket) {
+    private static List<Split> getSplitsFor(List<Split> splits, BinaryRow 
partition, int bucket) {
         List<Split> result = new ArrayList<>();
         for (Split split : splits) {
             DataSplit dataSplit = (DataSplit) split;
@@ -1808,12 +1743,9 @@ public abstract class FileStoreTableTestBase {
         return createFileStoreTable(conf -> conf.set(BUCKET, numOfBucket));
     }
 
-    protected FileStoreTable createFileStoreTable(String branch, int 
numOfBucket) throws Exception {
-        return createFileStoreTable(branch, conf -> conf.set(BUCKET, 
numOfBucket));
-    }
-
-    protected FileStoreTable createFileStoreTable(String branch) throws 
Exception {
-        return createFileStoreTable(branch, 1);
+    protected FileStoreTable createBranchTable(String branch) throws Exception 
{
+        FileStoreTable table = createFileStoreTable();
+        return table.switchToBranch(branch);
     }
 
     protected FileStoreTable createFileStoreTable() throws Exception {
@@ -1827,99 +1759,6 @@ public abstract class FileStoreTableTestBase {
     protected abstract FileStoreTable createFileStoreTable(
             Consumer<Options> configure, RowType rowType) throws Exception;
 
-    protected abstract FileStoreTable createFileStoreTable(
-            String branch, Consumer<Options> configure) throws Exception;
-
-    protected abstract FileStoreTable overwriteTestFileStoreTable() throws 
Exception;
-
-    private static InternalRow overwriteRow(Object... values) {
-        return GenericRow.of(
-                values[0],
-                values[1],
-                BinaryString.fromString((String) values[2]),
-                BinaryString.fromString((String) values[3]));
-    }
-
-    private static List<Arguments> overwriteTestData() {
-        // dynamic, overwrite data, overwrite partition, expected
-        return Arrays.asList(
-                // nothing happen
-                arguments(
-                        true,
-                        Collections.emptyList(),
-                        Collections.emptyMap(),
-                        Arrays.asList(
-                                "1, 1, A, Hi",
-                                "2, 1, A, Hello",
-                                "3, 1, A, World",
-                                "4, 1, B, To",
-                                "5, 1, B, Apache",
-                                "6, 1, B, Paimon",
-                                "7, 2, A, Test",
-                                "8, 2, B, Case")),
-                // delete all data
-                arguments(
-                        false,
-                        Collections.emptyList(),
-                        Collections.emptyMap(),
-                        Collections.emptyList()),
-                // specify one partition key
-                arguments(
-                        true,
-                        Arrays.asList(
-                                overwriteRow(1, 1, "A", "Where"), 
overwriteRow(2, 1, "A", "When")),
-                        Collections.singletonMap("pt0", "1"),
-                        Arrays.asList(
-                                "1, 1, A, Where",
-                                "2, 1, A, When",
-                                "4, 1, B, To",
-                                "5, 1, B, Apache",
-                                "6, 1, B, Paimon",
-                                "7, 2, A, Test",
-                                "8, 2, B, Case")),
-                arguments(
-                        false,
-                        Arrays.asList(
-                                overwriteRow(1, 1, "A", "Where"), 
overwriteRow(2, 1, "A", "When")),
-                        Collections.singletonMap("pt0", "1"),
-                        Arrays.asList(
-                                "1, 1, A, Where",
-                                "2, 1, A, When",
-                                "7, 2, A, Test",
-                                "8, 2, B, Case")),
-                // all dynamic
-                arguments(
-                        true,
-                        Arrays.asList(
-                                overwriteRow(4, 1, "B", "Where"),
-                                overwriteRow(5, 1, "B", "When"),
-                                overwriteRow(10, 2, "A", "Static"),
-                                overwriteRow(11, 2, "A", "Dynamic")),
-                        Collections.emptyMap(),
-                        Arrays.asList(
-                                "1, 1, A, Hi",
-                                "2, 1, A, Hello",
-                                "3, 1, A, World",
-                                "4, 1, B, Where",
-                                "5, 1, B, When",
-                                "10, 2, A, Static",
-                                "11, 2, A, Dynamic",
-                                "8, 2, B, Case")),
-                arguments(
-                        false,
-                        Arrays.asList(
-                                overwriteRow(4, 1, "B", "Where"),
-                                overwriteRow(5, 1, "B", "When"),
-                                overwriteRow(10, 2, "A", "Static"),
-                                overwriteRow(11, 2, "A", "Dynamic")),
-                        Collections.emptyMap(),
-                        Arrays.asList(
-                                "4, 1, B, Where",
-                                "5, 1, B, When",
-                                "10, 2, A, Static",
-                                "11, 2, A, Dynamic")));
-    }
-
     protected List<Split> toSplits(List<DataSplit> dataSplits) {
         return new ArrayList<>(dataSplits);
     }
@@ -1947,7 +1786,7 @@ public abstract class FileStoreTableTestBase {
         assertThat(branchManager.branchExists(BRANCH_NAME)).isTrue();
 
         // Verify branch1 and the main branch have the same data
-        FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME);
+        FileStoreTable tableBranch = createBranchTable(BRANCH_NAME);
         assertThat(
                         getResult(
                                 tableBranch.newRead(),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java
deleted file mode 100644
index 13f692ac7e..0000000000
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.fs.FileIOFinder;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.options.MemorySize;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.SchemaUtils;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.sink.StreamTableCommit;
-import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.TableRead;
-import org.apache.paimon.types.RowType;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.function.Consumer;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Tests for {@link PrimaryKeyFileStoreTable}. */
-public class WritePreemptMemoryTest extends FileStoreTableTestBase {
-
-    @Test
-    public void writeMultiplePartitions() throws Exception {
-        testWritePreemptMemory(false);
-    }
-
-    @Test
-    public void writeSinglePartition() throws Exception {
-        testWritePreemptMemory(true);
-    }
-
-    @Override // this has been tested in PrimaryKeyFileStoreTableTest
-    @Test
-    public void testReadFilter() {}
-
-    private void testWritePreemptMemory(boolean singlePartition) throws 
Exception {
-        // write
-        FileStoreTable table = createFileStoreTable();
-        StreamTableWrite write = table.newWrite(commitUser);
-        StreamTableCommit commit = table.newCommit(commitUser);
-        Random random = new Random();
-        List<String> expected = new ArrayList<>();
-        for (int i = 0; i < 10_000; i++) {
-            GenericRow row = rowData(singlePartition ? 0 : random.nextInt(5), 
i, i * 10L);
-            write.write(row);
-            expected.add(BATCH_ROW_TO_STRING.apply(row));
-        }
-        commit.commit(0, write.prepareCommit(true, 0));
-        write.close();
-        commit.close();
-
-        // read
-        List<Split> splits = 
toSplits(table.newSnapshotReader().read().dataSplits());
-        TableRead read = table.newRead();
-        List<String> results = new ArrayList<>();
-        for (int i = 0; i < 5; i++) {
-            results.addAll(getResult(read, splits, binaryRow(i), 0, 
BATCH_ROW_TO_STRING));
-        }
-        assertThat(results).containsExactlyInAnyOrder(expected.toArray(new 
String[0]));
-    }
-
-    @Override
-    protected FileStoreTable createFileStoreTable(Consumer<Options> configure, 
RowType rowType)
-            throws Exception {
-        Options options = new Options();
-        options.set(CoreOptions.BUCKET, 1);
-        options.set(CoreOptions.PATH, tablePath.toString());
-        // Run with minimal memory to ensure a more intense preempt
-        // Currently a writer needs at least one page
-        int pages = 10;
-        options.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(pages * 
1024));
-        options.set(CoreOptions.PAGE_SIZE, new MemorySize(1024));
-        configure.accept(options);
-        TableSchema schema =
-                SchemaUtils.forceCommit(
-                        new SchemaManager(LocalFileIO.create(), tablePath),
-                        new Schema(
-                                rowType.getFields(),
-                                Collections.singletonList("pt"),
-                                Arrays.asList("pt", "a"),
-                                options.toMap(),
-                                ""));
-        return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), 
tablePath, schema);
-    }
-
-    @Override
-    protected FileStoreTable createFileStoreTable(String branch, 
Consumer<Options> configure)
-            throws Exception {
-        Options options = new Options();
-        options.set(CoreOptions.BUCKET, 1);
-        options.set(CoreOptions.PATH, tablePath.toString());
-        // Run with minimal memory to ensure a more intense preempt
-        // Currently a writer needs at least one page
-        int pages = 10;
-        options.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(pages * 
1024));
-        options.set(CoreOptions.PAGE_SIZE, new MemorySize(1024));
-        options.set(CoreOptions.BRANCH, branch);
-        configure.accept(options);
-        TableSchema schema =
-                SchemaUtils.forceCommit(
-                        new SchemaManager(LocalFileIO.create(), tablePath, 
branch),
-                        new Schema(
-                                ROW_TYPE.getFields(),
-                                Collections.singletonList("pt"),
-                                Arrays.asList("pt", "a"),
-                                options.toMap(),
-                                ""));
-        return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), 
tablePath, schema);
-    }
-
-    @Override
-    protected FileStoreTable overwriteTestFileStoreTable() throws Exception {
-        Options conf = new Options();
-        conf.set(CoreOptions.PATH, tablePath.toString());
-        conf.set(CoreOptions.BUCKET, 1);
-        // Run with minimal memory to ensure a more intense preempt
-        // Currently a writer needs at least one page
-        int pages = 10;
-        conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(pages * 1024));
-        conf.set(CoreOptions.PAGE_SIZE, new MemorySize(1024));
-        TableSchema schema =
-                SchemaUtils.forceCommit(
-                        new SchemaManager(LocalFileIO.create(), tablePath),
-                        new Schema(
-                                OVERWRITE_TEST_ROW_TYPE.getFields(),
-                                Arrays.asList("pt0", "pt1"),
-                                Arrays.asList("pk", "pt0", "pt1"),
-                                conf.toMap(),
-                                ""));
-        return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), 
tablePath, schema);
-    }
-}


Reply via email to