This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new aed1830de [flink] Use new ReadBuilder and WriteBuilder API in tests of
paimon-flink module (#719)
aed1830de is described below
commit aed1830de2078a89c9a556df439f6a971aaf660e
Author: Tyrantlucifer <[email protected]>
AuthorDate: Thu Apr 20 14:21:09 2023 +0800
[flink] Use new ReadBuilder and WriteBuilder API in tests of paimon-flink
module (#719)
---
.../paimon/flink/action/CompactActionITCase.java | 17 ++++++++-----
.../paimon/flink/action/DeleteActionITCase.java | 7 ++++--
.../flink/action/DropPartitionActionITCase.java | 17 ++++++++-----
.../action/cdc/mysql/MySqlActionITCaseBase.java | 6 +++--
.../paimon/flink/sink/CommitterOperatorTest.java | 29 ++++++++++++++++------
.../flink/sink/CommitterOperatorTestBase.java | 5 ++--
.../paimon/flink/sink/CompactorSinkITCase.java | 9 ++++---
.../sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java | 6 +++--
.../sink/cdc/FlinkCdcSyncTableSinkITCase.java | 6 +++--
.../paimon/flink/source/CompactorSourceITCase.java | 19 +++++++++-----
10 files changed, 82 insertions(+), 39 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index db4e107fd..cd363210c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -23,6 +23,7 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableScan;
@@ -70,8 +71,10 @@ public class CompactActionITCase extends ActionITCaseBase {
Arrays.asList("dt", "hh", "k"),
options);
snapshotManager = table.snapshotManager();
- write = table.newWrite(commitUser);
- commit = table.newCommit(commitUser);
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = streamWriteBuilder.newWrite();
+ commit = streamWriteBuilder.newCommit();
writeData(
rowData(1, 100, 15, BinaryString.fromString("20221208")),
@@ -132,8 +135,10 @@ public class CompactActionITCase extends ActionITCaseBase {
Arrays.asList("dt", "hh", "k"),
options);
snapshotManager = table.snapshotManager();
- write = table.newWrite(commitUser);
- commit = table.newCommit(commitUser);
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = streamWriteBuilder.newWrite();
+ commit = streamWriteBuilder.newCommit();
// base records
writeData(
@@ -146,7 +151,7 @@ public class CompactActionITCase extends ActionITCaseBase {
Assertions.assertEquals(Snapshot.CommitKind.APPEND,
snapshot.commitKind());
// no full compaction has happened, so plan should be empty
- StreamTableScan scan = table.newStreamScan();
+ StreamTableScan scan = table.newReadBuilder().newStreamScan();
TableScan.Plan plan = scan.plan();
Assertions.assertTrue(plan.splits().isEmpty());
@@ -215,7 +220,7 @@ public class CompactActionITCase extends ActionITCaseBase {
long start = System.currentTimeMillis();
while (actual.size() != expected.size()) {
TableScan.Plan plan = scan.plan();
- actual.addAll(getResult(table.newRead(), plan.splits(), ROW_TYPE));
+ actual.addAll(getResult(table.newReadBuilder().newRead(),
plan.splits(), ROW_TYPE));
if (System.currentTimeMillis() - start > timeout) {
break;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
index 57617de66..18cf0d9a1 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -150,8 +151,10 @@ public class DeleteActionITCase extends ActionITCaseBase {
hasPk ? Collections.singletonList("k") :
Collections.emptyList(),
new HashMap<>());
snapshotManager = table.snapshotManager();
- write = table.newWrite(commitUser);
- commit = table.newCommit(commitUser);
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = streamWriteBuilder.newWrite();
+ commit = streamWriteBuilder.newCommit();
// prepare data
writeData(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
index 65bcf6ba7..5ed895264 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
@@ -21,6 +21,8 @@ package org.apache.paimon.flink.action;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -62,9 +64,9 @@ public class DropPartitionActionITCase extends
ActionITCaseBase {
assertThat(snapshot.id()).isEqualTo(5);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
- TableScan.Plan plan = table.newScan().plan();
+ TableScan.Plan plan = table.newReadBuilder().newScan().plan();
assertThat(plan.splits().size()).isEqualTo(2);
- List<String> actual = getResult(table.newRead(), plan.splits(),
ROW_TYPE);
+ List<String> actual = getResult(table.newReadBuilder().newRead(),
plan.splits(), ROW_TYPE);
List<String> expected;
if (hasPk) {
@@ -109,9 +111,10 @@ public class DropPartitionActionITCase extends
ActionITCaseBase {
assertThat(snapshot.id()).isEqualTo(5);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
- TableScan.Plan plan = table.newScan().plan();
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableScan.Plan plan = readBuilder.newScan().plan();
assertThat(plan.splits().size()).isEqualTo(2);
- List<String> actual = getResult(table.newRead(), plan.splits(),
ROW_TYPE);
+ List<String> actual = getResult(readBuilder.newRead(), plan.splits(),
ROW_TYPE);
List<String> expected;
if (hasPk) {
@@ -146,8 +149,10 @@ public class DropPartitionActionITCase extends
ActionITCaseBase {
: Collections.emptyList(),
new HashMap<>());
snapshotManager = table.snapshotManager();
- write = table.newWrite(commitUser);
- commit = table.newCommit(commitUser);
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = streamWriteBuilder.newWrite();
+ commit = streamWriteBuilder.newCommit();
// prepare data
writeData(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
index cff787528..4b5adc79b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.flink.action.ActionITCaseBase;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
@@ -101,10 +102,11 @@ public class MySqlActionITCaseBase extends
ActionITCaseBase {
List<String> sortedExpected = new ArrayList<>(expected);
Collections.sort(sortedExpected);
while (true) {
- TableScan.Plan plan = table.newScan().plan();
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableScan.Plan plan = readBuilder.newScan().plan();
List<String> result =
getResult(
- table.newRead(),
+ readBuilder.newRead(),
plan == null ? Collections.emptyList() :
plan.splits(),
rowType);
List<String> sortedActual = new ArrayList<>(result);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 16c896e16..23da4d0da 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -25,6 +25,7 @@ import
org.apache.paimon.manifest.ManifestCommittableSerializer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.flink.api.common.ExecutionConfig;
@@ -62,8 +63,8 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
OneInputStreamOperatorTestHarness<Committable, Committable>
testHarness =
createRecoverableTestHarness(table);
testHarness.open();
-
- StreamTableWrite write = table.newWrite(initialCommitUser);
+ StreamTableWrite write =
+
table.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite();
write.write(GenericRow.of(1, 10L));
write.write(GenericRow.of(2, 20L));
@@ -111,7 +112,8 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
long cpId = 0;
for (int i = 0; i < 10; i++) {
cpId++;
- StreamTableWrite write = table.newWrite(initialCommitUser);
+ StreamTableWrite write =
+
table.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite();
write.write(GenericRow.of(1, 10L));
write.write(GenericRow.of(2, 20L));
for (CommitMessage committable : write.prepareCommit(false, cpId))
{
@@ -144,8 +146,10 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
long timestamp = 1;
+ StreamWriteBuilder streamWriteBuilder =
+
table.newStreamWriteBuilder().withCommitUser(initialCommitUser);
// this checkpoint is notified, should be committed
- StreamTableWrite write = table.newWrite(initialCommitUser);
+ StreamTableWrite write = streamWriteBuilder.newWrite();
write.write(GenericRow.of(1, 10L));
write.write(GenericRow.of(2, 20L));
for (CommitMessage committable : write.prepareCommit(false, 1)) {
@@ -173,7 +177,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
testHarness.open();
// this checkpoint is notified, should be committed
- write = table.newWrite(initialCommitUser);
+ write = streamWriteBuilder.newWrite();
write.write(GenericRow.of(5, 50L));
write.write(GenericRow.of(6, 60L));
for (CommitMessage committable : write.prepareCommit(false, 3)) {
@@ -197,7 +201,8 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
createRecoverableTestHarness(table);
testHarness.open();
long timestamp = 0;
- StreamTableWrite write = table.newWrite(initialCommitUser);
+ StreamTableWrite write =
+
table.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite();
long cpId = 1;
write.write(GenericRow.of(1, 10L));
@@ -232,7 +237,11 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
new CommitterOperator(
true,
initialCommitUser,
- user -> new StoreCommitter(table.newCommit(user)),
+ user ->
+ new StoreCommitter(
+ table.newStreamWriteBuilder()
+ .withCommitUser(user)
+ .newCommit()),
new RestoreAndFailCommittableStateManager(
() ->
new VersionedSerializerWrapper<>(
@@ -246,7 +255,11 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
new CommitterOperator(
true,
initialCommitUser,
- user -> new StoreCommitter(table.newCommit(user)),
+ user ->
+ new StoreCommitter(
+ table.newStreamWriteBuilder()
+ .withCommitUser(user)
+ .newCommit()),
new NoopCommittableStateManager());
return createTestHarness(operator);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java
index 054ab27fe..c383ff063 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java
@@ -61,9 +61,10 @@ public abstract class CommitterOperatorTestBase {
}
protected void assertResults(FileStoreTable table, String... expected) {
- TableRead read = table.newRead();
+ TableRead read = table.newReadBuilder().newRead();
List<String> actual = new ArrayList<>();
- table.newScan()
+ table.newReadBuilder()
+ .newScan()
.plan()
.splits()
.forEach(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
index 3739f7bc0..75d6e9b52 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
@@ -32,6 +32,7 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableScan;
@@ -80,8 +81,10 @@ public class CompactorSinkITCase extends AbstractTestBase {
public void testCompact() throws Exception {
FileStoreTable table = createFileStoreTable();
SnapshotManager snapshotManager = table.snapshotManager();
- StreamTableWrite write = table.newWrite(commitUser);
- StreamTableCommit commit = table.newCommit(commitUser);
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ StreamTableWrite write = streamWriteBuilder.newWrite();
+ StreamTableCommit commit = streamWriteBuilder.newCommit();
write.write(rowData(1, 100, 15, BinaryString.fromString("20221208")));
write.write(rowData(1, 100, 16, BinaryString.fromString("20221208")));
@@ -117,7 +120,7 @@ public class CompactorSinkITCase extends AbstractTestBase {
assertEquals(3, snapshot.id());
assertEquals(Snapshot.CommitKind.COMPACT, snapshot.commitKind());
- TableScan.Plan plan = table.newScan().plan();
+ TableScan.Plan plan = table.newReadBuilder().newScan().plan();
assertEquals(3, plan.splits().size());
for (Split split : plan.splits()) {
DataSplit dataSplit = (DataSplit) split;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
index 6d8e93daf..1df02a2f1 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
@@ -34,6 +34,7 @@ import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FailingFileIO;
@@ -157,9 +158,10 @@ public class FlinkCdcSyncDatabaseSinkITCase extends
AbstractTestBase {
SchemaManager schemaManager = new SchemaManager(table.fileIO(),
table.location());
TableSchema schema = schemaManager.latest().get();
- TableScan.Plan plan = table.newScan().plan();
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableScan.Plan plan = readBuilder.newScan().plan();
try (RecordReaderIterator<InternalRow> it =
- new
RecordReaderIterator<>(table.newRead().createReader(plan))) {
+ new
RecordReaderIterator<>(readBuilder.newRead().createReader(plan))) {
testTables.get(i).assertResult(schema, it);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
index 45d6b34e5..101c6390f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
@@ -33,6 +33,7 @@ import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FailingFileIO;
@@ -122,9 +123,10 @@ public class FlinkCdcSyncTableSinkITCase extends
AbstractTestBase {
SchemaManager schemaManager = new SchemaManager(table.fileIO(),
table.location());
TableSchema schema = schemaManager.latest().get();
- TableScan.Plan plan = table.newScan().plan();
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableScan.Plan plan = readBuilder.newScan().plan();
try (RecordReaderIterator<InternalRow> it =
- new
RecordReaderIterator<>(table.newRead().createReader(plan))) {
+ new
RecordReaderIterator<>(readBuilder.newRead().createReader(plan))) {
testTable.assertResult(schema, it);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
index 2e8ab836f..b72729594 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
@@ -34,6 +34,7 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -89,8 +90,10 @@ public class CompactorSourceITCase extends AbstractTestBase {
// change options to test whether CompactorSourceBuilder work
normally
table =
table.copy(Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), "2"));
}
- StreamTableWrite write = table.newWrite(commitUser);
- StreamTableCommit commit = table.newCommit(commitUser);
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ StreamTableWrite write = streamWriteBuilder.newWrite();
+ StreamTableCommit commit = streamWriteBuilder.newCommit();
write.write(rowData(1, 1510, BinaryString.fromString("20221208"), 15));
write.write(rowData(2, 1620, BinaryString.fromString("20221208"), 16));
@@ -138,8 +141,10 @@ public class CompactorSourceITCase extends
AbstractTestBase {
dynamicOptions.put(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), "0");
table = table.copy(dynamicOptions);
}
- StreamTableWrite write = table.newWrite(commitUser);
- StreamTableCommit commit = table.newCommit(commitUser);
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ StreamTableWrite write = streamWriteBuilder.newWrite();
+ StreamTableCommit commit = streamWriteBuilder.newCommit();
write.write(rowData(1, 1510, BinaryString.fromString("20221208"), 15));
write.write(rowData(2, 1620, BinaryString.fromString("20221208"), 16));
@@ -234,8 +239,10 @@ public class CompactorSourceITCase extends
AbstractTestBase {
List<String> expected)
throws Exception {
FileStoreTable table = createFileStoreTable();
- StreamTableWrite write = table.newWrite(commitUser);
- StreamTableCommit commit = table.newCommit(commitUser);
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ StreamTableWrite write = streamWriteBuilder.newWrite();
+ StreamTableCommit commit = streamWriteBuilder.newCommit();
write.write(rowData(1, 1510, BinaryString.fromString("20221208"), 15));
write.write(rowData(2, 1620, BinaryString.fromString("20221208"), 16));