This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new aed1830de [flink] Use new ReadBuilder and WriteBuilder API in tests of 
paimon-flink module (#719)
aed1830de is described below

commit aed1830de2078a89c9a556df439f6a971aaf660e
Author: Tyrantlucifer <[email protected]>
AuthorDate: Thu Apr 20 14:21:09 2023 +0800

    [flink] Use new ReadBuilder and WriteBuilder API in tests of paimon-flink 
module (#719)
---
 .../paimon/flink/action/CompactActionITCase.java   | 17 ++++++++-----
 .../paimon/flink/action/DeleteActionITCase.java    |  7 ++++--
 .../flink/action/DropPartitionActionITCase.java    | 17 ++++++++-----
 .../action/cdc/mysql/MySqlActionITCaseBase.java    |  6 +++--
 .../paimon/flink/sink/CommitterOperatorTest.java   | 29 ++++++++++++++++------
 .../flink/sink/CommitterOperatorTestBase.java      |  5 ++--
 .../paimon/flink/sink/CompactorSinkITCase.java     |  9 ++++---
 .../sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java   |  6 +++--
 .../sink/cdc/FlinkCdcSyncTableSinkITCase.java      |  6 +++--
 .../paimon/flink/source/CompactorSourceITCase.java | 19 +++++++++-----
 10 files changed, 82 insertions(+), 39 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index db4e107fd..cd363210c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -23,6 +23,7 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableScan;
@@ -70,8 +71,10 @@ public class CompactActionITCase extends ActionITCaseBase {
                         Arrays.asList("dt", "hh", "k"),
                         options);
         snapshotManager = table.snapshotManager();
-        write = table.newWrite(commitUser);
-        commit = table.newCommit(commitUser);
+        StreamWriteBuilder streamWriteBuilder =
+                table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = streamWriteBuilder.newWrite();
+        commit = streamWriteBuilder.newCommit();
 
         writeData(
                 rowData(1, 100, 15, BinaryString.fromString("20221208")),
@@ -132,8 +135,10 @@ public class CompactActionITCase extends ActionITCaseBase {
                         Arrays.asList("dt", "hh", "k"),
                         options);
         snapshotManager = table.snapshotManager();
-        write = table.newWrite(commitUser);
-        commit = table.newCommit(commitUser);
+        StreamWriteBuilder streamWriteBuilder =
+                table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = streamWriteBuilder.newWrite();
+        commit = streamWriteBuilder.newCommit();
 
         // base records
         writeData(
@@ -146,7 +151,7 @@ public class CompactActionITCase extends ActionITCaseBase {
         Assertions.assertEquals(Snapshot.CommitKind.APPEND, 
snapshot.commitKind());
 
         // no full compaction has happened, so plan should be empty
-        StreamTableScan scan = table.newStreamScan();
+        StreamTableScan scan = table.newReadBuilder().newStreamScan();
         TableScan.Plan plan = scan.plan();
         Assertions.assertTrue(plan.splits().isEmpty());
 
@@ -215,7 +220,7 @@ public class CompactActionITCase extends ActionITCaseBase {
         long start = System.currentTimeMillis();
         while (actual.size() != expected.size()) {
             TableScan.Plan plan = scan.plan();
-            actual.addAll(getResult(table.newRead(), plan.splits(), ROW_TYPE));
+            actual.addAll(getResult(table.newReadBuilder().newRead(), 
plan.splits(), ROW_TYPE));
 
             if (System.currentTimeMillis() - start > timeout) {
                 break;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
index 57617de66..18cf0d9a1 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
@@ -150,8 +151,10 @@ public class DeleteActionITCase extends ActionITCaseBase {
                         hasPk ? Collections.singletonList("k") : 
Collections.emptyList(),
                         new HashMap<>());
         snapshotManager = table.snapshotManager();
-        write = table.newWrite(commitUser);
-        commit = table.newCommit(commitUser);
+        StreamWriteBuilder streamWriteBuilder =
+                table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = streamWriteBuilder.newWrite();
+        commit = streamWriteBuilder.newCommit();
 
         // prepare data
         writeData(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
index 65bcf6ba7..5ed895264 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
@@ -21,6 +21,8 @@ package org.apache.paimon.flink.action;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
@@ -62,9 +64,9 @@ public class DropPartitionActionITCase extends 
ActionITCaseBase {
         assertThat(snapshot.id()).isEqualTo(5);
         
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
 
-        TableScan.Plan plan = table.newScan().plan();
+        TableScan.Plan plan = table.newReadBuilder().newScan().plan();
         assertThat(plan.splits().size()).isEqualTo(2);
-        List<String> actual = getResult(table.newRead(), plan.splits(), 
ROW_TYPE);
+        List<String> actual = getResult(table.newReadBuilder().newRead(), 
plan.splits(), ROW_TYPE);
 
         List<String> expected;
         if (hasPk) {
@@ -109,9 +111,10 @@ public class DropPartitionActionITCase extends 
ActionITCaseBase {
         assertThat(snapshot.id()).isEqualTo(5);
         
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
 
-        TableScan.Plan plan = table.newScan().plan();
+        ReadBuilder readBuilder = table.newReadBuilder();
+        TableScan.Plan plan = readBuilder.newScan().plan();
         assertThat(plan.splits().size()).isEqualTo(2);
-        List<String> actual = getResult(table.newRead(), plan.splits(), 
ROW_TYPE);
+        List<String> actual = getResult(readBuilder.newRead(), plan.splits(), 
ROW_TYPE);
 
         List<String> expected;
         if (hasPk) {
@@ -146,8 +149,10 @@ public class DropPartitionActionITCase extends 
ActionITCaseBase {
                                 : Collections.emptyList(),
                         new HashMap<>());
         snapshotManager = table.snapshotManager();
-        write = table.newWrite(commitUser);
-        commit = table.newCommit(commitUser);
+        StreamWriteBuilder streamWriteBuilder =
+                table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = streamWriteBuilder.newWrite();
+        commit = streamWriteBuilder.newCommit();
 
         // prepare data
         writeData(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
index cff787528..4b5adc79b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.flink.action.ActionITCaseBase;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
@@ -101,10 +102,11 @@ public class MySqlActionITCaseBase extends 
ActionITCaseBase {
         List<String> sortedExpected = new ArrayList<>(expected);
         Collections.sort(sortedExpected);
         while (true) {
-            TableScan.Plan plan = table.newScan().plan();
+            ReadBuilder readBuilder = table.newReadBuilder();
+            TableScan.Plan plan = readBuilder.newScan().plan();
             List<String> result =
                     getResult(
-                            table.newRead(),
+                            readBuilder.newRead(),
                             plan == null ? Collections.emptyList() : 
plan.splits(),
                             rowType);
             List<String> sortedActual = new ArrayList<>(result);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 16c896e16..23da4d0da 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -25,6 +25,7 @@ import 
org.apache.paimon.manifest.ManifestCommittableSerializer;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -62,8 +63,8 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
         OneInputStreamOperatorTestHarness<Committable, Committable> 
testHarness =
                 createRecoverableTestHarness(table);
         testHarness.open();
-
-        StreamTableWrite write = table.newWrite(initialCommitUser);
+        StreamTableWrite write =
+                
table.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite();
         write.write(GenericRow.of(1, 10L));
         write.write(GenericRow.of(2, 20L));
 
@@ -111,7 +112,8 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
         long cpId = 0;
         for (int i = 0; i < 10; i++) {
             cpId++;
-            StreamTableWrite write = table.newWrite(initialCommitUser);
+            StreamTableWrite write =
+                    
table.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite();
             write.write(GenericRow.of(1, 10L));
             write.write(GenericRow.of(2, 20L));
             for (CommitMessage committable : write.prepareCommit(false, cpId)) 
{
@@ -144,8 +146,10 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
 
         long timestamp = 1;
 
+        StreamWriteBuilder streamWriteBuilder =
+                
table.newStreamWriteBuilder().withCommitUser(initialCommitUser);
         // this checkpoint is notified, should be committed
-        StreamTableWrite write = table.newWrite(initialCommitUser);
+        StreamTableWrite write = streamWriteBuilder.newWrite();
         write.write(GenericRow.of(1, 10L));
         write.write(GenericRow.of(2, 20L));
         for (CommitMessage committable : write.prepareCommit(false, 1)) {
@@ -173,7 +177,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
         testHarness.open();
 
         // this checkpoint is notified, should be committed
-        write = table.newWrite(initialCommitUser);
+        write = streamWriteBuilder.newWrite();
         write.write(GenericRow.of(5, 50L));
         write.write(GenericRow.of(6, 60L));
         for (CommitMessage committable : write.prepareCommit(false, 3)) {
@@ -197,7 +201,8 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                 createRecoverableTestHarness(table);
         testHarness.open();
         long timestamp = 0;
-        StreamTableWrite write = table.newWrite(initialCommitUser);
+        StreamTableWrite write =
+                
table.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite();
 
         long cpId = 1;
         write.write(GenericRow.of(1, 10L));
@@ -232,7 +237,11 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                 new CommitterOperator(
                         true,
                         initialCommitUser,
-                        user -> new StoreCommitter(table.newCommit(user)),
+                        user ->
+                                new StoreCommitter(
+                                        table.newStreamWriteBuilder()
+                                                .withCommitUser(user)
+                                                .newCommit()),
                         new RestoreAndFailCommittableStateManager(
                                 () ->
                                         new VersionedSerializerWrapper<>(
@@ -246,7 +255,11 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                 new CommitterOperator(
                         true,
                         initialCommitUser,
-                        user -> new StoreCommitter(table.newCommit(user)),
+                        user ->
+                                new StoreCommitter(
+                                        table.newStreamWriteBuilder()
+                                                .withCommitUser(user)
+                                                .newCommit()),
                         new NoopCommittableStateManager());
         return createTestHarness(operator);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java
index 054ab27fe..c383ff063 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java
@@ -61,9 +61,10 @@ public abstract class CommitterOperatorTestBase {
     }
 
     protected void assertResults(FileStoreTable table, String... expected) {
-        TableRead read = table.newRead();
+        TableRead read = table.newReadBuilder().newRead();
         List<String> actual = new ArrayList<>();
-        table.newScan()
+        table.newReadBuilder()
+                .newScan()
                 .plan()
                 .splits()
                 .forEach(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
index 3739f7bc0..75d6e9b52 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
@@ -32,6 +32,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableScan;
@@ -80,8 +81,10 @@ public class CompactorSinkITCase extends AbstractTestBase {
     public void testCompact() throws Exception {
         FileStoreTable table = createFileStoreTable();
         SnapshotManager snapshotManager = table.snapshotManager();
-        StreamTableWrite write = table.newWrite(commitUser);
-        StreamTableCommit commit = table.newCommit(commitUser);
+        StreamWriteBuilder streamWriteBuilder =
+                table.newStreamWriteBuilder().withCommitUser(commitUser);
+        StreamTableWrite write = streamWriteBuilder.newWrite();
+        StreamTableCommit commit = streamWriteBuilder.newCommit();
 
         write.write(rowData(1, 100, 15, BinaryString.fromString("20221208")));
         write.write(rowData(1, 100, 16, BinaryString.fromString("20221208")));
@@ -117,7 +120,7 @@ public class CompactorSinkITCase extends AbstractTestBase {
         assertEquals(3, snapshot.id());
         assertEquals(Snapshot.CommitKind.COMPACT, snapshot.commitKind());
 
-        TableScan.Plan plan = table.newScan().plan();
+        TableScan.Plan plan = table.newReadBuilder().newScan().plan();
         assertEquals(3, plan.splits().size());
         for (Split split : plan.splits()) {
             DataSplit dataSplit = (DataSplit) split;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
index 6d8e93daf..1df02a2f1 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
@@ -34,6 +34,7 @@ import org.apache.paimon.schema.SchemaUtils;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FailingFileIO;
@@ -157,9 +158,10 @@ public class FlinkCdcSyncDatabaseSinkITCase extends 
AbstractTestBase {
             SchemaManager schemaManager = new SchemaManager(table.fileIO(), 
table.location());
             TableSchema schema = schemaManager.latest().get();
 
-            TableScan.Plan plan = table.newScan().plan();
+            ReadBuilder readBuilder = table.newReadBuilder();
+            TableScan.Plan plan = readBuilder.newScan().plan();
             try (RecordReaderIterator<InternalRow> it =
-                    new 
RecordReaderIterator<>(table.newRead().createReader(plan))) {
+                    new 
RecordReaderIterator<>(readBuilder.newRead().createReader(plan))) {
                 testTables.get(i).assertResult(schema, it);
             }
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
index 45d6b34e5..101c6390f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
@@ -33,6 +33,7 @@ import org.apache.paimon.schema.SchemaUtils;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FailingFileIO;
@@ -122,9 +123,10 @@ public class FlinkCdcSyncTableSinkITCase extends 
AbstractTestBase {
         SchemaManager schemaManager = new SchemaManager(table.fileIO(), 
table.location());
         TableSchema schema = schemaManager.latest().get();
 
-        TableScan.Plan plan = table.newScan().plan();
+        ReadBuilder readBuilder = table.newReadBuilder();
+        TableScan.Plan plan = readBuilder.newScan().plan();
         try (RecordReaderIterator<InternalRow> it =
-                new 
RecordReaderIterator<>(table.newRead().createReader(plan))) {
+                new 
RecordReaderIterator<>(readBuilder.newRead().createReader(plan))) {
             testTable.assertResult(schema, it);
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
index 2e8ab836f..b72729594 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
@@ -34,6 +34,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
@@ -89,8 +90,10 @@ public class CompactorSourceITCase extends AbstractTestBase {
             // change options to test whether CompactorSourceBuilder work 
normally
             table = 
table.copy(Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), "2"));
         }
-        StreamTableWrite write = table.newWrite(commitUser);
-        StreamTableCommit commit = table.newCommit(commitUser);
+        StreamWriteBuilder streamWriteBuilder =
+                table.newStreamWriteBuilder().withCommitUser(commitUser);
+        StreamTableWrite write = streamWriteBuilder.newWrite();
+        StreamTableCommit commit = streamWriteBuilder.newCommit();
 
         write.write(rowData(1, 1510, BinaryString.fromString("20221208"), 15));
         write.write(rowData(2, 1620, BinaryString.fromString("20221208"), 16));
@@ -138,8 +141,10 @@ public class CompactorSourceITCase extends 
AbstractTestBase {
             dynamicOptions.put(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), "0");
             table = table.copy(dynamicOptions);
         }
-        StreamTableWrite write = table.newWrite(commitUser);
-        StreamTableCommit commit = table.newCommit(commitUser);
+        StreamWriteBuilder streamWriteBuilder =
+                table.newStreamWriteBuilder().withCommitUser(commitUser);
+        StreamTableWrite write = streamWriteBuilder.newWrite();
+        StreamTableCommit commit = streamWriteBuilder.newCommit();
 
         write.write(rowData(1, 1510, BinaryString.fromString("20221208"), 15));
         write.write(rowData(2, 1620, BinaryString.fromString("20221208"), 16));
@@ -234,8 +239,10 @@ public class CompactorSourceITCase extends 
AbstractTestBase {
             List<String> expected)
             throws Exception {
         FileStoreTable table = createFileStoreTable();
-        StreamTableWrite write = table.newWrite(commitUser);
-        StreamTableCommit commit = table.newCommit(commitUser);
+        StreamWriteBuilder streamWriteBuilder =
+                table.newStreamWriteBuilder().withCommitUser(commitUser);
+        StreamTableWrite write = streamWriteBuilder.newWrite();
+        StreamTableCommit commit = streamWriteBuilder.newCommit();
 
         write.write(rowData(1, 1510, BinaryString.fromString("20221208"), 15));
         write.write(rowData(2, 1620, BinaryString.fromString("20221208"), 16));

Reply via email to