This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 66430f4b13 [test] Rename to SimpleTableTestBase and refactor overwrite
tests (#5334)
66430f4b13 is described below
commit 66430f4b136a3f57c093c8b83f16ba4358d4afb9
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 24 13:37:53 2025 +0800
[test] Rename to SimpleTableTestBase and refactor overwrite tests (#5334)
---
...bleTest.java => AppendOnlySimpleTableTest.java} | 44 +---
.../apache/paimon/table/OverwriteTableTest.java | 232 +++++++++++++++++++++
...bleTest.java => PrimaryKeySimpleTableTest.java} | 104 ++++-----
...TableTestBase.java => SimpleTableTestBase.java} | 189 ++---------------
.../paimon/table/WritePreemptMemoryTest.java | 161 --------------
5 files changed, 295 insertions(+), 435 deletions(-)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
similarity index 96%
rename from
paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index 85ab7484f1..715584cdc3 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -94,7 +94,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link AppendOnlyFileStoreTable}. */
-public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
+public class AppendOnlySimpleTableTest extends SimpleTableTestBase {
@Test
public void testMultipleWriters() throws Exception {
@@ -218,7 +218,7 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
FileStoreTable table = createFileStoreTable();
generateBranch(table);
- FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME);
+ FileStoreTable tableBranch = createBranchTable(BRANCH_NAME);
writeBranchData(tableBranch);
List<Split> splits =
toSplits(tableBranch.newSnapshotReader().read().dataSplits());
TableRead read = tableBranch.newRead();
@@ -408,7 +408,7 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
FileStoreTable table = createFileStoreTable();
generateBranch(table);
- FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME);
+ FileStoreTable tableBranch = createBranchTable(BRANCH_NAME);
writeBranchData(tableBranch);
List<Split> splits =
@@ -1198,44 +1198,6 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath),
tablePath, tableSchema);
}
- @Override
- protected FileStoreTable createFileStoreTable(String branch,
Consumer<Options> configure)
- throws Exception {
- Options conf = new Options();
- conf.set(CoreOptions.PATH, tablePath.toString());
- conf.set(CoreOptions.BRANCH, branch);
- configure.accept(conf);
- if (!conf.contains(BUCKET_KEY) && conf.get(BUCKET) != -1) {
- conf.set(BUCKET_KEY, "a");
- }
- TableSchema tableSchema =
- SchemaUtils.forceCommit(
- new SchemaManager(LocalFileIO.create(), tablePath,
branch),
- new Schema(
- ROW_TYPE.getFields(),
- Collections.singletonList("pt"),
- Collections.emptyList(),
- conf.toMap(),
- ""));
- return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath),
tablePath, tableSchema);
- }
-
- @Override
- protected FileStoreTable overwriteTestFileStoreTable() throws Exception {
- Options conf = new Options();
- conf.set(CoreOptions.PATH, tablePath.toString());
- TableSchema tableSchema =
- SchemaUtils.forceCommit(
- new SchemaManager(LocalFileIO.create(), tablePath),
- new Schema(
- OVERWRITE_TEST_ROW_TYPE.getFields(),
- Arrays.asList("pt0", "pt1"),
- Collections.emptyList(),
- conf.toMap(),
- ""));
- return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath),
tablePath, tableSchema);
- }
-
protected FileStoreTable
createUnawareBucketFileStoreTable(Consumer<Options> configure)
throws Exception {
Options conf = new Options();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/OverwriteTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/OverwriteTableTest.java
new file mode 100644
index 0000000000..eccadd38b2
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/table/OverwriteTableTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.DataFormatTestUtil;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.sink.InnerTableCommit;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.table.SimpleTableTestBase.getResult;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+
+/** Unit tests for overwrite table. */
+public class OverwriteTableTest extends TableTestBase {
+
+ @ParameterizedTest(name = "dynamic = {0}, partition={2}")
+ @MethodSource("overwriteTestData")
+ public void testOverwriteAppend(
+ boolean dynamicPartitionOverwrite,
+ List<InternalRow> overwriteData,
+ Map<String, String> overwritePartition,
+ List<String> expected)
+ throws Exception {
+ innerTestOverwrite(
+ false, dynamicPartitionOverwrite, overwriteData,
overwritePartition, expected);
+ }
+
+ @ParameterizedTest(name = "dynamic = {0}, partition={2}")
+ @MethodSource("overwriteTestData")
+ public void testOverwritePrimaryKey(
+ boolean dynamicPartitionOverwrite,
+ List<InternalRow> overwriteData,
+ Map<String, String> overwritePartition,
+ List<String> expected)
+ throws Exception {
+ innerTestOverwrite(
+ true, dynamicPartitionOverwrite, overwriteData,
overwritePartition, expected);
+ }
+
+ private void innerTestOverwrite(
+ boolean withPrimaryKey,
+ boolean dynamicPartitionOverwrite,
+ List<InternalRow> overwriteData,
+ Map<String, String> overwritePartition,
+ List<String> expected)
+ throws Exception {
+ Identifier identifier = identifier("T");
+ Schema.Builder builder =
+ Schema.newBuilder()
+ .column("pk", DataTypes.INT())
+ .column("pt0", DataTypes.INT())
+ .column("pt1", DataTypes.STRING())
+ .column("v", DataTypes.STRING())
+ .partitionKeys("pt0", "pt1");
+ if (withPrimaryKey) {
+ builder = builder.primaryKey("pk", "pt0", "pt1");
+ builder.option(BUCKET.key(), "1");
+ }
+ catalog.createTable(identifier, builder.build(), true);
+ Table originTable = catalog.getTable(identifier("T"));
+ FileStoreTable table = (FileStoreTable) originTable;
+ if (!dynamicPartitionOverwrite) {
+ table =
+ table.copy(
+ Collections.singletonMap(
+
CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false"));
+ }
+
+ // prepare data
+ // (1, 1, 'A', 'Hi'), (2, 1, 'A', 'Hello'), (3, 1, 'A', 'World'),
+ // (4, 1, 'B', 'To'), (5, 1, 'B', 'Apache'), (6, 1, 'B', 'Paimon')
+ // (7, 2, 'A', 'Test')
+ // (8, 2, 'B', 'Case')
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ InnerTableCommit commit = table.newCommit(commitUser)) {
+ write.write(overwriteRow(1, 1, "A", "Hi"));
+ write.write(overwriteRow(2, 1, "A", "Hello"));
+ write.write(overwriteRow(3, 1, "A", "World"));
+ write.write(overwriteRow(4, 1, "B", "To"));
+ write.write(overwriteRow(5, 1, "B", "Apache"));
+ write.write(overwriteRow(6, 1, "B", "Paimon"));
+ write.write(overwriteRow(7, 2, "A", "Test"));
+ write.write(overwriteRow(8, 2, "B", "Case"));
+ commit.commit(0, write.prepareCommit(true, 0));
+ }
+
+ // overwrite data
+ try (StreamTableWrite write =
table.newWrite(commitUser).withIgnorePreviousFiles(true);
+ InnerTableCommit commit = table.newCommit(commitUser)) {
+ for (InternalRow row : overwriteData) {
+ write.write(row);
+ }
+ commit.withOverwrite(overwritePartition).commit(1,
write.prepareCommit(true, 1));
+ }
+
+ // validate
+ List<Split> splits = new
ArrayList<>(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newRead();
+ assertThat(
+ getResult(
+ read,
+ splits,
+ row ->
+ DataFormatTestUtil.toStringNoRowKind(
+ row, originTable.rowType())))
+ .hasSameElementsAs(expected);
+ }
+
+ private static List<Arguments> overwriteTestData() {
+ // dynamic, overwrite data, overwrite partition, expected
+ return Arrays.asList(
+ // nothing happen
+ arguments(
+ true,
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ Arrays.asList(
+ "1, 1, A, Hi",
+ "2, 1, A, Hello",
+ "3, 1, A, World",
+ "4, 1, B, To",
+ "5, 1, B, Apache",
+ "6, 1, B, Paimon",
+ "7, 2, A, Test",
+ "8, 2, B, Case")),
+ // delete all data
+ arguments(
+ false,
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ Collections.emptyList()),
+ // specify one partition key
+ arguments(
+ true,
+ Arrays.asList(
+ overwriteRow(1, 1, "A", "Where"),
overwriteRow(2, 1, "A", "When")),
+ Collections.singletonMap("pt0", "1"),
+ Arrays.asList(
+ "1, 1, A, Where",
+ "2, 1, A, When",
+ "4, 1, B, To",
+ "5, 1, B, Apache",
+ "6, 1, B, Paimon",
+ "7, 2, A, Test",
+ "8, 2, B, Case")),
+ arguments(
+ false,
+ Arrays.asList(
+ overwriteRow(1, 1, "A", "Where"),
overwriteRow(2, 1, "A", "When")),
+ Collections.singletonMap("pt0", "1"),
+ Arrays.asList(
+ "1, 1, A, Where",
+ "2, 1, A, When",
+ "7, 2, A, Test",
+ "8, 2, B, Case")),
+ // all dynamic
+ arguments(
+ true,
+ Arrays.asList(
+ overwriteRow(4, 1, "B", "Where"),
+ overwriteRow(5, 1, "B", "When"),
+ overwriteRow(10, 2, "A", "Static"),
+ overwriteRow(11, 2, "A", "Dynamic")),
+ Collections.emptyMap(),
+ Arrays.asList(
+ "1, 1, A, Hi",
+ "2, 1, A, Hello",
+ "3, 1, A, World",
+ "4, 1, B, Where",
+ "5, 1, B, When",
+ "10, 2, A, Static",
+ "11, 2, A, Dynamic",
+ "8, 2, B, Case")),
+ arguments(
+ false,
+ Arrays.asList(
+ overwriteRow(4, 1, "B", "Where"),
+ overwriteRow(5, 1, "B", "When"),
+ overwriteRow(10, 2, "A", "Static"),
+ overwriteRow(11, 2, "A", "Dynamic")),
+ Collections.emptyMap(),
+ Arrays.asList(
+ "4, 1, B, Where",
+ "5, 1, B, When",
+ "10, 2, A, Static",
+ "11, 2, A, Dynamic")));
+ }
+
+ private static InternalRow overwriteRow(Object... values) {
+ return GenericRow.of(
+ values[0],
+ values[1],
+ BinaryString.fromString((String) values[2]),
+ BinaryString.fromString((String) values[3]));
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
similarity index 97%
rename from
paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 94bc52305d..40b8498efc 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -96,7 +96,6 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
-import static org.apache.paimon.CoreOptions.BRANCH;
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MAX;
import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MIN;
@@ -124,19 +123,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link PrimaryKeyFileStoreTable}. */
-public class PrimaryKeyFileStoreTableTest extends FileStoreTableTestBase {
-
- protected static final Function<InternalRow, String>
COMPATIBILITY_BATCH_ROW_TO_STRING =
- rowData ->
- rowData.getInt(0)
- + "|"
- + rowData.getInt(1)
- + "|"
- + rowData.getLong(2)
- + "|"
- + new String(rowData.getBinary(3))
- + "|"
- + new String(rowData.getBinary(4));
+public class PrimaryKeySimpleTableTest extends SimpleTableTestBase {
@Test
public void testMultipleWriters() throws Exception {
@@ -320,7 +307,7 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
public void testBranchBatchReadWrite() throws Exception {
FileStoreTable table = createFileStoreTable();
generateBranch(table);
- FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME);
+ FileStoreTable tableBranch = createBranchTable(BRANCH_NAME);
writeBranchData(tableBranch);
List<Split> splits =
toSplits(tableBranch.newSnapshotReader().read().dataSplits());
TableRead read = tableBranch.newRead();
@@ -402,7 +389,7 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
FileStoreTable table = createFileStoreTable();
generateBranch(table);
- FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME);
+ FileStoreTable tableBranch = createBranchTable(BRANCH_NAME);
writeBranchData(tableBranch);
List<Split> splits =
@@ -2210,21 +2197,49 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
}
}
- @Override
- protected FileStoreTable overwriteTestFileStoreTable() throws Exception {
- Options conf = new Options();
- conf.set(CoreOptions.PATH, tablePath.toString());
- conf.set(BUCKET, 1);
- TableSchema tableSchema =
- SchemaUtils.forceCommit(
- new SchemaManager(LocalFileIO.create(), tablePath),
- new Schema(
- OVERWRITE_TEST_ROW_TYPE.getFields(),
- Arrays.asList("pt0", "pt1"),
- Arrays.asList("pk", "pt0", "pt1"),
- conf.toMap(),
- ""));
- return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath),
tablePath, tableSchema);
+ @Test
+ public void writeMultiplePartitions() throws Exception {
+ testWritePreemptMemory(false);
+ }
+
+ @Test
+ public void writeSinglePartition() throws Exception {
+ testWritePreemptMemory(true);
+ }
+
+ private void testWritePreemptMemory(boolean singlePartition) throws
Exception {
+ // write
+ FileStoreTable table =
+ createFileStoreTable(
+ options -> {
+ // Run with minimal memory to ensure a more
intense preempt
+ // Currently a writer needs at least one page
+ int pages = 10;
+ options.set(
+ CoreOptions.WRITE_BUFFER_SIZE, new
MemorySize(pages * 1024));
+ options.set(CoreOptions.PAGE_SIZE, new
MemorySize(1024));
+ });
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+ Random random = new Random();
+ List<String> expected = new ArrayList<>();
+ for (int i = 0; i < 10_000; i++) {
+ GenericRow row = rowData(singlePartition ? 0 : random.nextInt(5),
i, i * 10L);
+ write.write(row);
+ expected.add(BATCH_ROW_TO_STRING.apply(row));
+ }
+ commit.commit(0, write.prepareCommit(true, 0));
+ write.close();
+ commit.close();
+
+ // read
+ List<Split> splits =
toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newRead();
+ List<String> results = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ results.addAll(getResult(read, splits, binaryRow(i), 0,
BATCH_ROW_TO_STRING));
+ }
+ assertThat(results).containsExactlyInAnyOrder(expected.toArray(new
String[0]));
}
@Override
@@ -2245,31 +2260,4 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
""));
return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath),
tablePath, tableSchema);
}
-
- @Override
- protected FileStoreTable createFileStoreTable(String branch,
Consumer<Options> configure)
- throws Exception {
- return createFileStoreTable(branch, configure, ROW_TYPE);
- }
-
- private FileStoreTable createFileStoreTable(
- String branch, Consumer<Options> configure, RowType rowType)
throws Exception {
- Options options = new Options();
- options.set(CoreOptions.PATH, tablePath.toString());
- options.set(BUCKET, 1);
- options.set(BRANCH, branch);
- configure.accept(options);
- TableSchema latestSchema =
- new SchemaManager(LocalFileIO.create(),
tablePath).latest().get();
- TableSchema tableSchema =
- new TableSchema(
- latestSchema.id(),
- latestSchema.fields(),
- latestSchema.highestFieldId(),
- latestSchema.partitionKeys(),
- latestSchema.primaryKeys(),
- options.toMap(),
- latestSchema.comment());
- return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath),
tablePath, tableSchema);
- }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
similarity index 90%
rename from
paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
rename to
paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
index 4f8e913b8f..cbf9e106ae 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
@@ -25,7 +25,6 @@ import org.apache.paimon.TestFileStore;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.DataFormatTestUtil;
import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -78,8 +77,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
@@ -125,10 +122,12 @@ import static
org.apache.paimon.utils.Preconditions.checkNotNull;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.jupiter.params.provider.Arguments.arguments;
-/** Base test class for {@link FileStoreTable}. */
-public abstract class FileStoreTableTestBase {
+/**
+ * Base test class for simple table, simple representation means a fixed
schema, see {@link
+ * #ROW_TYPE}.
+ */
+public abstract class SimpleTableTestBase {
protected static final String BRANCH_NAME = "branch1";
@@ -145,14 +144,6 @@ public abstract class FileStoreTableTestBase {
},
new String[] {"pt", "a", "b", "c", "d", "e", "f"});
- // for overwrite test
- protected static final RowType OVERWRITE_TEST_ROW_TYPE =
- RowType.of(
- new DataType[] {
- DataTypes.INT(), DataTypes.INT(), DataTypes.STRING(),
DataTypes.STRING()
- },
- new String[] {"pk", "pt0", "pt1", "v"});
-
protected static final int[] PROJECTION = new int[] {2, 1};
protected static final Function<InternalRow, String> BATCH_ROW_TO_STRING =
rowData ->
@@ -277,62 +268,6 @@ public abstract class FileStoreTableTestBase {
.containsExactlyElementsOf(expected);
}
- @ParameterizedTest(name = "dynamic = {0}, partition={2}")
- @MethodSource("overwriteTestData")
- public void testOverwriteNothing(
- boolean dynamicPartitionOverwrite,
- List<InternalRow> overwriteData,
- Map<String, String> overwritePartition,
- List<String> expected)
- throws Exception {
- FileStoreTable table = overwriteTestFileStoreTable();
- if (!dynamicPartitionOverwrite) {
- table =
- table.copy(
- Collections.singletonMap(
-
CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false"));
- }
-
- // prepare data
- // (1, 1, 'A', 'Hi'), (2, 1, 'A', 'Hello'), (3, 1, 'A', 'World'),
- // (4, 1, 'B', 'To'), (5, 1, 'B', 'Apache'), (6, 1, 'B', 'Paimon')
- // (7, 2, 'A', 'Test')
- // (8, 2, 'B', 'Case')
- try (StreamTableWrite write = table.newWrite(commitUser);
- InnerTableCommit commit = table.newCommit(commitUser)) {
- write.write(overwriteRow(1, 1, "A", "Hi"));
- write.write(overwriteRow(2, 1, "A", "Hello"));
- write.write(overwriteRow(3, 1, "A", "World"));
- write.write(overwriteRow(4, 1, "B", "To"));
- write.write(overwriteRow(5, 1, "B", "Apache"));
- write.write(overwriteRow(6, 1, "B", "Paimon"));
- write.write(overwriteRow(7, 2, "A", "Test"));
- write.write(overwriteRow(8, 2, "B", "Case"));
- commit.commit(0, write.prepareCommit(true, 0));
- }
-
- // overwrite data
- try (StreamTableWrite write =
table.newWrite(commitUser).withIgnorePreviousFiles(true);
- InnerTableCommit commit = table.newCommit(commitUser)) {
- for (InternalRow row : overwriteData) {
- write.write(row);
- }
- commit.withOverwrite(overwritePartition).commit(1,
write.prepareCommit(true, 1));
- }
-
- // validate
- List<Split> splits =
toSplits(table.newSnapshotReader().read().dataSplits());
- TableRead read = table.newRead();
- assertThat(
- getResult(
- read,
- splits,
- row ->
- DataFormatTestUtil.toStringNoRowKind(
- row, OVERWRITE_TEST_ROW_TYPE)))
- .hasSameElementsAs(expected);
- }
-
@Test
public void testOverwrite() throws Exception {
FileStoreTable table = createFileStoreTable();
@@ -1291,7 +1226,7 @@ public abstract class FileStoreTableTestBase {
public void testFastForward() throws Exception {
FileStoreTable table = createFileStoreTable();
generateBranch(table);
- FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME);
+ FileStoreTable tableBranch = createBranchTable(BRANCH_NAME);
// Verify branch1 and the main branch have the same data
assertThat(
@@ -1619,7 +1554,7 @@ public abstract class FileStoreTableTestBase {
generateBranch(table);
- FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME);
+ FileStoreTable tableBranch = createBranchTable(BRANCH_NAME);
// Write data to branch1
try (StreamTableWrite write = tableBranch.newWrite(commitUser);
StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
@@ -1728,7 +1663,7 @@ public abstract class FileStoreTableTestBase {
}
}
- protected List<String> getResult(
+ public static List<String> getResult(
TableRead read,
List<Split> splits,
BinaryRow partition,
@@ -1738,7 +1673,7 @@ public abstract class FileStoreTableTestBase {
return getResult(read, getSplitsFor(splits, partition, bucket),
rowDataToString);
}
- protected List<String> getResult(
+ public static List<String> getResult(
TableRead read, List<Split> splits, Function<InternalRow, String>
rowDataToString)
throws Exception {
List<ReaderSupplier<InternalRow>> readers = new ArrayList<>();
@@ -1756,7 +1691,7 @@ public abstract class FileStoreTableTestBase {
return result;
}
- private List<Split> getSplitsFor(List<Split> splits, BinaryRow partition,
int bucket) {
+ private static List<Split> getSplitsFor(List<Split> splits, BinaryRow
partition, int bucket) {
List<Split> result = new ArrayList<>();
for (Split split : splits) {
DataSplit dataSplit = (DataSplit) split;
@@ -1808,12 +1743,9 @@ public abstract class FileStoreTableTestBase {
return createFileStoreTable(conf -> conf.set(BUCKET, numOfBucket));
}
- protected FileStoreTable createFileStoreTable(String branch, int
numOfBucket) throws Exception {
- return createFileStoreTable(branch, conf -> conf.set(BUCKET,
numOfBucket));
- }
-
- protected FileStoreTable createFileStoreTable(String branch) throws
Exception {
- return createFileStoreTable(branch, 1);
+ protected FileStoreTable createBranchTable(String branch) throws Exception
{
+ FileStoreTable table = createFileStoreTable();
+ return table.switchToBranch(branch);
}
protected FileStoreTable createFileStoreTable() throws Exception {
@@ -1827,99 +1759,6 @@ public abstract class FileStoreTableTestBase {
protected abstract FileStoreTable createFileStoreTable(
Consumer<Options> configure, RowType rowType) throws Exception;
- protected abstract FileStoreTable createFileStoreTable(
- String branch, Consumer<Options> configure) throws Exception;
-
- protected abstract FileStoreTable overwriteTestFileStoreTable() throws
Exception;
-
- private static InternalRow overwriteRow(Object... values) {
- return GenericRow.of(
- values[0],
- values[1],
- BinaryString.fromString((String) values[2]),
- BinaryString.fromString((String) values[3]));
- }
-
- private static List<Arguments> overwriteTestData() {
- // dynamic, overwrite data, overwrite partition, expected
- return Arrays.asList(
- // nothing happen
- arguments(
- true,
- Collections.emptyList(),
- Collections.emptyMap(),
- Arrays.asList(
- "1, 1, A, Hi",
- "2, 1, A, Hello",
- "3, 1, A, World",
- "4, 1, B, To",
- "5, 1, B, Apache",
- "6, 1, B, Paimon",
- "7, 2, A, Test",
- "8, 2, B, Case")),
- // delete all data
- arguments(
- false,
- Collections.emptyList(),
- Collections.emptyMap(),
- Collections.emptyList()),
- // specify one partition key
- arguments(
- true,
- Arrays.asList(
- overwriteRow(1, 1, "A", "Where"),
overwriteRow(2, 1, "A", "When")),
- Collections.singletonMap("pt0", "1"),
- Arrays.asList(
- "1, 1, A, Where",
- "2, 1, A, When",
- "4, 1, B, To",
- "5, 1, B, Apache",
- "6, 1, B, Paimon",
- "7, 2, A, Test",
- "8, 2, B, Case")),
- arguments(
- false,
- Arrays.asList(
- overwriteRow(1, 1, "A", "Where"),
overwriteRow(2, 1, "A", "When")),
- Collections.singletonMap("pt0", "1"),
- Arrays.asList(
- "1, 1, A, Where",
- "2, 1, A, When",
- "7, 2, A, Test",
- "8, 2, B, Case")),
- // all dynamic
- arguments(
- true,
- Arrays.asList(
- overwriteRow(4, 1, "B", "Where"),
- overwriteRow(5, 1, "B", "When"),
- overwriteRow(10, 2, "A", "Static"),
- overwriteRow(11, 2, "A", "Dynamic")),
- Collections.emptyMap(),
- Arrays.asList(
- "1, 1, A, Hi",
- "2, 1, A, Hello",
- "3, 1, A, World",
- "4, 1, B, Where",
- "5, 1, B, When",
- "10, 2, A, Static",
- "11, 2, A, Dynamic",
- "8, 2, B, Case")),
- arguments(
- false,
- Arrays.asList(
- overwriteRow(4, 1, "B", "Where"),
- overwriteRow(5, 1, "B", "When"),
- overwriteRow(10, 2, "A", "Static"),
- overwriteRow(11, 2, "A", "Dynamic")),
- Collections.emptyMap(),
- Arrays.asList(
- "4, 1, B, Where",
- "5, 1, B, When",
- "10, 2, A, Static",
- "11, 2, A, Dynamic")));
- }
-
protected List<Split> toSplits(List<DataSplit> dataSplits) {
return new ArrayList<>(dataSplits);
}
@@ -1947,7 +1786,7 @@ public abstract class FileStoreTableTestBase {
assertThat(branchManager.branchExists(BRANCH_NAME)).isTrue();
// Verify branch1 and the main branch have the same data
- FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME);
+ FileStoreTable tableBranch = createBranchTable(BRANCH_NAME);
assertThat(
getResult(
tableBranch.newRead(),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java
deleted file mode 100644
index 13f692ac7e..0000000000
---
a/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.fs.FileIOFinder;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.options.MemorySize;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.SchemaUtils;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.sink.StreamTableCommit;
-import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.TableRead;
-import org.apache.paimon.types.RowType;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.function.Consumer;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Tests for {@link PrimaryKeyFileStoreTable}. */
-public class WritePreemptMemoryTest extends FileStoreTableTestBase {
-
- @Test
- public void writeMultiplePartitions() throws Exception {
- testWritePreemptMemory(false);
- }
-
- @Test
- public void writeSinglePartition() throws Exception {
- testWritePreemptMemory(true);
- }
-
- @Override // this has been tested in PrimaryKeyFileStoreTableTest
- @Test
- public void testReadFilter() {}
-
- private void testWritePreemptMemory(boolean singlePartition) throws
Exception {
- // write
- FileStoreTable table = createFileStoreTable();
- StreamTableWrite write = table.newWrite(commitUser);
- StreamTableCommit commit = table.newCommit(commitUser);
- Random random = new Random();
- List<String> expected = new ArrayList<>();
- for (int i = 0; i < 10_000; i++) {
- GenericRow row = rowData(singlePartition ? 0 : random.nextInt(5),
i, i * 10L);
- write.write(row);
- expected.add(BATCH_ROW_TO_STRING.apply(row));
- }
- commit.commit(0, write.prepareCommit(true, 0));
- write.close();
- commit.close();
-
- // read
- List<Split> splits =
toSplits(table.newSnapshotReader().read().dataSplits());
- TableRead read = table.newRead();
- List<String> results = new ArrayList<>();
- for (int i = 0; i < 5; i++) {
- results.addAll(getResult(read, splits, binaryRow(i), 0,
BATCH_ROW_TO_STRING));
- }
- assertThat(results).containsExactlyInAnyOrder(expected.toArray(new
String[0]));
- }
-
- @Override
- protected FileStoreTable createFileStoreTable(Consumer<Options> configure,
RowType rowType)
- throws Exception {
- Options options = new Options();
- options.set(CoreOptions.BUCKET, 1);
- options.set(CoreOptions.PATH, tablePath.toString());
- // Run with minimal memory to ensure a more intense preempt
- // Currently a writer needs at least one page
- int pages = 10;
- options.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(pages *
1024));
- options.set(CoreOptions.PAGE_SIZE, new MemorySize(1024));
- configure.accept(options);
- TableSchema schema =
- SchemaUtils.forceCommit(
- new SchemaManager(LocalFileIO.create(), tablePath),
- new Schema(
- rowType.getFields(),
- Collections.singletonList("pt"),
- Arrays.asList("pt", "a"),
- options.toMap(),
- ""));
- return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath),
tablePath, schema);
- }
-
- @Override
- protected FileStoreTable createFileStoreTable(String branch,
Consumer<Options> configure)
- throws Exception {
- Options options = new Options();
- options.set(CoreOptions.BUCKET, 1);
- options.set(CoreOptions.PATH, tablePath.toString());
- // Run with minimal memory to ensure a more intense preempt
- // Currently a writer needs at least one page
- int pages = 10;
- options.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(pages *
1024));
- options.set(CoreOptions.PAGE_SIZE, new MemorySize(1024));
- options.set(CoreOptions.BRANCH, branch);
- configure.accept(options);
- TableSchema schema =
- SchemaUtils.forceCommit(
- new SchemaManager(LocalFileIO.create(), tablePath,
branch),
- new Schema(
- ROW_TYPE.getFields(),
- Collections.singletonList("pt"),
- Arrays.asList("pt", "a"),
- options.toMap(),
- ""));
- return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath),
tablePath, schema);
- }
-
- @Override
- protected FileStoreTable overwriteTestFileStoreTable() throws Exception {
- Options conf = new Options();
- conf.set(CoreOptions.PATH, tablePath.toString());
- conf.set(CoreOptions.BUCKET, 1);
- // Run with minimal memory to ensure a more intense preempt
- // Currently a writer needs at least one page
- int pages = 10;
- conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(pages * 1024));
- conf.set(CoreOptions.PAGE_SIZE, new MemorySize(1024));
- TableSchema schema =
- SchemaUtils.forceCommit(
- new SchemaManager(LocalFileIO.create(), tablePath),
- new Schema(
- OVERWRITE_TEST_ROW_TYPE.getFields(),
- Arrays.asList("pt0", "pt1"),
- Arrays.asList("pk", "pt0", "pt1"),
- conf.toMap(),
- ""));
- return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath),
tablePath, schema);
- }
-}