This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push: new 3a48f6dd2 [test/flink] Collect flink rows should always be with timeout (#1622) 3a48f6dd2 is described below commit 3a48f6dd22bed4e6bf3ebe009f34750228aa0cd7 Author: yuxia Luo <luoyu...@alumni.sjtu.edu.cn> AuthorDate: Thu Sep 4 10:47:32 2025 +0800 [test/flink] Collect flink rows should always be with timeout (#1622) --- .../fluss/flink/sink/FlinkTableSinkITCase.java | 7 +- .../flink/source/FlinkTableSourceBatchITCase.java | 15 +-- .../fluss/flink/source/FlinkTableSourceITCase.java | 126 ++++++--------------- .../source/testutils/FlinkRowAssertionsUtils.java | 83 ++++++++++---- .../testutils/FlinkRowAssertionsUtilsTest.java | 76 +++++++++++++ .../apache/fluss/flink/utils/FlinkTestBase.java | 14 --- 6 files changed, 179 insertions(+), 142 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java index 048ba42bc..54fb906ae 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java @@ -66,6 +66,7 @@ import java.util.stream.Stream; import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder; +import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout; import static org.apache.fluss.flink.utils.FlinkTestBase.waitUntilPartitions; import static org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE; import static org.assertj.core.api.Assertions.assertThat; @@ -210,11 +211,7 @@ abstract class FlinkTableSinkITCase extends AbstractTestBase { List<String> expectedRows = expectedGroups.stream().flatMap(List::stream).collect(Collectors.toList()); - List<String> actual = new ArrayList<>(expectedRows.size()); - for (int i = 0; i < expectedRows.size(); i++) { - actual.add(rowIter.next().toString()); - } - rowIter.close(); + List<String> actual = collectRowsWithTimeout(rowIter, expectedRows.size()); assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedRows); // check data with the same bucket key should be read in sequence. diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java index a8353491e..7f3894f96 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java @@ -45,6 +45,7 @@ import java.util.Map; import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder; +import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout; import static org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE; import static org.apache.fluss.testutils.DataTestUtils.row; import static org.assertj.core.api.Assertions.assertThat; @@ -189,7 +190,7 @@ abstract class FlinkTableSourceBatchITCase extends FlinkTestBase { // normal scan String query = String.format("SELECT * FROM %s limit 2", tableName); CloseableIterator<Row> iterRows = tEnv.executeSql(query).collect(); - List<String> collected = assertAndCollectRecords(iterRows, 2); + List<String> collected = collectRowsWithTimeout(iterRows, 2); List<String> expected = Arrays.asList( "+I[1, address1, name1]", @@ -203,14 +204,14 @@ abstract class FlinkTableSourceBatchITCase extends FlinkTestBase { // limit which is larger than all the data. query = String.format("SELECT * FROM %s limit 10", tableName); iterRows = tEnv.executeSql(query).collect(); - collected = assertAndCollectRecords(iterRows, 5); + collected = collectRowsWithTimeout(iterRows, 5); assertThat(collected).isSubsetOf(expected); assertThat(collected).hasSize(5); // projection scan query = String.format("SELECT id, name FROM %s limit 3", tableName); iterRows = tEnv.executeSql(query).collect(); - collected = assertAndCollectRecords(iterRows, 3); + collected = collectRowsWithTimeout(iterRows, 3); expected = Arrays.asList( "+I[1, name1]", @@ -237,7 +238,7 @@ abstract class FlinkTableSourceBatchITCase extends FlinkTestBase { // normal scan String query = String.format("SELECT * FROM %s limit 2", tableName); CloseableIterator<Row> iterRows = tEnv.executeSql(query).collect(); - List<String> collected = assertAndCollectRecords(iterRows, 2); + List<String> collected = collectRowsWithTimeout(iterRows, 2); List<String> expected = Arrays.asList( "+I[1, address1, name1]", @@ -251,7 +252,7 @@ abstract class FlinkTableSourceBatchITCase extends FlinkTestBase { // projection scan query = String.format("SELECT id, name FROM %s limit 3", tableName); iterRows = tEnv.executeSql(query).collect(); - collected = assertAndCollectRecords(iterRows, 3); + collected = collectRowsWithTimeout(iterRows, 3); expected = Arrays.asList( "+I[1, name1]", @@ -266,7 +267,7 @@ abstract class FlinkTableSourceBatchITCase extends FlinkTestBase { String partitionTable = preparePartitionedLogTable(); query = String.format("SELECT id, name FROM %s limit 3", partitionTable); iterRows = tEnv.executeSql(query).collect(); - collected = assertAndCollectRecords(iterRows, 3); + collected = collectRowsWithTimeout(iterRows, 3); assertThat(collected).isSubsetOf(expected); assertThat(collected).hasSize(3); } @@ -286,7 +287,7 @@ abstract class FlinkTableSourceBatchITCase extends FlinkTestBase { + "fields=[count1$0])", tableName)); CloseableIterator<Row> iterRows = tEnv.executeSql(query).collect(); - List<String> collected = assertAndCollectRecords(iterRows, 1); + List<String> collected = collectRowsWithTimeout(iterRows, 1); List<String> expected = Collections.singletonList(String.format("+I[%s]", expectedRows)); assertThat(collected).isEqualTo(expected); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java index 24757cb05..c279c15b3 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java @@ -70,7 +70,9 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; +import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertQueryResultExactOrder; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder; +import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout; import static org.apache.fluss.flink.utils.FlinkTestBase.waitUntilPartitions; import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows; import static org.apache.fluss.flink.utils.FlinkTestBase.writeRowsToPartition; @@ -205,16 +207,7 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase { writeRows(conn, tablePath, rows, true); List<String> expected = Arrays.asList("+I[1, v1]", "+I[2, v2]", "+I[3, v3]"); - try (org.apache.flink.util.CloseableIterator<Row> rowIter = - tEnv.executeSql("select * from non_pk_table_test").collect()) { - int expectRecords = expected.size(); - List<String> actual = new ArrayList<>(expectRecords); - for (int i = 0; i < expectRecords; i++) { - String row = rowIter.next().toString(); - actual.add(row); - } - assertThat(actual).containsExactlyElementsOf(expected); - } + assertQueryResultExactOrder(tEnv, "select * from non_pk_table_test", expected); } @ParameterizedTest @@ -262,17 +255,7 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase { "+I[v8, 8000, 800]", "+I[v9, 9000, 900]", "+I[v10, 10000, 1000]"); - try (org.apache.flink.util.CloseableIterator<Row> rowIter = - tEnv.executeSql(query).collect()) { - int expectRecords = expected.size(); - List<String> actual = new ArrayList<>(expectRecords); - for (int i = 0; i < expectRecords; i++) { - Row r = rowIter.next(); - String row = r.toString(); - actual.add(row); - } - assertThat(actual).containsExactlyElementsOf(expected); - } + assertQueryResultExactOrder(tEnv, query, expected); } @ParameterizedTest @@ -331,22 +314,15 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase { "+I[v8, 8, 800]", "+I[v9, 9, 900]", "+I[v10, 10, 1000]"); - try (org.apache.flink.util.CloseableIterator<Row> rowIter = - tEnv.executeSql(query).collect()) { - int expectRecords = expected.size(); - List<String> actual = new ArrayList<>(expectRecords); - if (testPkLog) { - // delay the write after collect job start, - // to make sure reading from log instead of snapshot - writeRows(conn, tablePath, rows, false); - } - for (int i = 0; i < expectRecords; i++) { - Row r = rowIter.next(); - String row = r.toString(); - actual.add(row); - } - assertThat(actual).containsExactlyElementsOf(expected); + org.apache.flink.util.CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect(); + if (testPkLog) { + // delay the write after collect job start, + // to make sure reading from log instead of snapshot + writeRows(conn, tablePath, rows, false); } + int expectRecords = expected.size(); + List<String> actual = collectRowsWithTimeout(rowIter, expectRecords); + assertThat(actual).containsExactlyElementsOf(expected); } @Test @@ -451,12 +427,12 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase { "+I[8, v8, 800, 8000]", "+I[9, v9, 900, 9000]", "+I[10, v10, 1000, 10000]"); - assertQueryResult(query, expected); + assertQueryResultExactOrder(tEnv, query, expected); // 2. read kv table with scan.startup.mode='earliest' options = " /*+ OPTIONS('scan.startup.mode' = 'earliest') */"; query = "select a, b, c, d from " + tableName + options; - assertQueryResult(query, expected); + assertQueryResultExactOrder(tEnv, query, expected); // 3. read log table with scan.startup.mode='timestamp' expected = @@ -471,7 +447,7 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase { " /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' ='%d') */", timestamp); query = "select a, b, c, d from " + tableName + options; - assertQueryResult(query, expected); + assertQueryResultExactOrder(tEnv, query, expected); } @Test @@ -501,20 +477,13 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase { "-U[2, v2]", "+U[2, v22]", "+I[4, v4]"); - try (org.apache.flink.util.CloseableIterator<Row> rowIter = - tEnv.executeSql(query).collect()) { - int expectRecords = 8; - List<String> actual = new ArrayList<>(expectRecords); - // delay to write after collect job start, to make sure reading from log instead of - // snapshot - writeRows(conn, tablePath, rows2, false); - for (int i = 0; i < expectRecords; i++) { - Row r = rowIter.next(); - String row = r.toString(); - actual.add(row); - } - assertThat(actual).containsExactlyElementsOf(expected); - } + org.apache.flink.util.CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect(); + int expectRecords = 8; + // delay to write after collect job start, to make sure reading from log instead of + // snapshot + writeRows(conn, tablePath, rows2, false); + List<String> actual = collectRowsWithTimeout(rowIter, expectRecords); + assertThat(actual).containsExactlyElementsOf(expected); } private static Stream<Arguments> readKvTableScanStartupModeArgs() { @@ -595,17 +564,7 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase { "-U[2, v2]", "+U[2, v22]", "+I[4, v4]"); - try (org.apache.flink.util.CloseableIterator<Row> rowIter = - tEnv.executeSql(query).collect()) { - int expectRecords = 10; - List<String> actual = new ArrayList<>(expectRecords); - for (int i = 0; i < expectRecords; i++) { - Row r = rowIter.next(); - String row = r.toString(); - actual.add(row); - } - assertThat(actual).containsExactlyElementsOf(expected); - } + assertQueryResultExactOrder(tEnv, query, expected); } @ParameterizedTest @@ -687,25 +646,20 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase { "the fetch timestamp %s is larger than the current timestamp", currentTimeMillis + Duration.ofMinutes(5).toMillis())); - try (org.apache.flink.util.CloseableIterator<Row> rowIter = + org.apache.flink.util.CloseableIterator<Row> rowIter = tEnv.executeSql( String.format( "select * from timestamp_table /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '%s') */ ", currentTimeMillis)) - .collect()) { - CLOCK.advanceTime(Duration.ofMillis(100L)); - // write second batch record. - rows = Arrays.asList(row(4, "v4"), row(5, "v5"), row(6, "v6")); - writeRows(conn, tablePath, rows, true); - List<String> expected = Arrays.asList("+I[4, v4]", "+I[5, v5]", "+I[6, v6]"); - int expectRecords = expected.size(); - List<String> actual = new ArrayList<>(expectRecords); - for (int i = 0; i < expectRecords; i++) { - String row = rowIter.next().toString(); - actual.add(row); - } - assertThat(actual).containsExactlyElementsOf(expected); - } + .collect(); + CLOCK.advanceTime(Duration.ofMillis(100L)); + // write second batch record. + rows = Arrays.asList(row(4, "v4"), row(5, "v5"), row(6, "v6")); + writeRows(conn, tablePath, rows, true); + List<String> expected = Arrays.asList("+I[4, v4]", "+I[5, v5]", "+I[6, v6]"); + int expectRecords = expected.size(); + List<String> actual = collectRowsWithTimeout(rowIter, expectRecords); + assertThat(actual).containsExactlyElementsOf(expected); } // ------------------------------------------------------------------------------------- @@ -1319,20 +1273,6 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase { "Fail to wait until all bucket finish snapshot"); } - private void assertQueryResult(String query, List<String> expected) throws Exception { - try (org.apache.flink.util.CloseableIterator<Row> rowIter = - tEnv.executeSql(query).collect()) { - int expectRecords = expected.size(); - List<String> actual = new ArrayList<>(expectRecords); - for (int i = 0; i < expectRecords; i++) { - Row r = rowIter.next(); - String row = r.toString(); - actual.add(row); - } - assertThat(actual).containsExactlyElementsOf(expected); - } - } - private GenericRow rowWithPartition(Object[] values, @Nullable String partition) { if (partition == null) { return row(values); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java index 092220244..87c7e14c1 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java @@ -21,8 +21,12 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; +import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -62,29 +66,44 @@ public class FlinkRowAssertionsUtils { } } - private static List<String> collectRowsWithTimeout( + public static List<String> collectRowsWithTimeout( + CloseableIterator<Row> iterator, int expectedCount) { + return collectRowsWithTimeout(iterator, expectedCount, true); + } + + public static List<String> collectRowsWithTimeout( CloseableIterator<Row> iterator, int expectedCount, boolean closeIterator) { - List<String> actual = new ArrayList<>(); - long startTime = System.currentTimeMillis(); - int maxWaitTime = 60000; // 60 seconds + if (expectedCount < 0) { + throw new IllegalArgumentException( + "Expected count must be non-negative: " + expectedCount); + } + if (iterator == null) { + throw new IllegalArgumentException("Iterator cannot be null"); + } + return collectRowsWithTimeout( + iterator, + expectedCount, + closeIterator, + // max wait 1 minute + Duration.ofMinutes(1)); + } + protected static List<String> collectRowsWithTimeout( + CloseableIterator<Row> iterator, + int expectedCount, + boolean closeIterator, + Duration maxWaitTime) { + List<String> actual = new ArrayList<>(); + long startTimeMs = System.currentTimeMillis(); + long deadlineTimeMs = startTimeMs + maxWaitTime.toMillis(); try { for (int i = 0; i < expectedCount; i++) { // Wait for next record with timeout - while (!iterator.hasNext()) { - long elapsedTime = System.currentTimeMillis() - startTime; - if (elapsedTime > maxWaitTime) { - // Timeout reached - provide detailed failure info - throw new AssertionError( - String.format( - "Timeout after waiting %d ms for Flink job results. " - + "Expected %d records but only received %d. " - + "This might indicate a job hang or insufficient data generation.", - elapsedTime, expectedCount, actual.size())); - } - Thread.sleep(10); + if (!waitForNextWithTimeout( + iterator, deadlineTimeMs - System.currentTimeMillis())) { + throw timeoutError( + System.currentTimeMillis() - startTimeMs, expectedCount, actual.size()); } - if (iterator.hasNext()) { actual.add(iterator.next().toString()); } else { @@ -92,12 +111,7 @@ public class FlinkRowAssertionsUtils { break; } } - return actual; - - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Test interrupted while waiting for Flink job results", e); } catch (AssertionError e) { // Re-throw our timeout assertion errors throw e; @@ -107,7 +121,7 @@ public class FlinkRowAssertionsUtils { // Job completed normally - return what we have return actual; } else { - long elapsedTime = System.currentTimeMillis() - startTime; + long elapsedTime = System.currentTimeMillis() - startTimeMs; throw new RuntimeException( String.format( "Unexpected error after waiting %d ms for Flink job results. " @@ -126,6 +140,29 @@ public class FlinkRowAssertionsUtils { } } + private static AssertionError timeoutError( + long elapsedTime, int expectedCount, int actualCount) { + return new AssertionError( + String.format( + "Timeout after waiting %d ms for Flink job results. " + + "Expected %d records but only received %d. " + + "This might indicate a job hang or insufficient data generation.", + elapsedTime, expectedCount, actualCount)); + } + + private static boolean waitForNextWithTimeout( + CloseableIterator<Row> iterator, long maxWaitTime) { + CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(iterator::hasNext); + try { + return future.get(maxWaitTime, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + future.cancel(true); + return false; + } catch (Exception e) { + throw new RuntimeException("Error checking iterator.hasNext()", e); + } + } + private static boolean isMiniClusterCompletionException(Exception e) { return e.getCause() instanceof IllegalStateException && e.getMessage() != null diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtilsTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtilsTest.java new file mode 100644 index 000000000..0086adeb0 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtilsTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source.testutils; + +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; + +/** Test for {@link FlinkRowAssertionsUtils}. */ +class FlinkRowAssertionsUtilsTest { + + @Test + void testCollectRowsWithTimeout() { + // should throw AssertionError if wait rows timeout + assertThatThrownBy( + () -> + collectRowsWithTimeout( + createBlockingHasNextIterator(), + 10, + true, + Duration.ofSeconds(1))) + .isInstanceOf(AssertionError.class) + .hasMessageContaining("Timeout after waiting") + .hasMessageContaining( + "ms for Flink job results. Expected 10 records but only received 0."); + } + + CloseableIterator<Row> createBlockingHasNextIterator() { + return new CloseableIterator<Row>() { + @Override + public void close() throws Exception {} + + @SuppressWarnings("all") + @Override + public boolean hasNext() { + // to mock blocking + try { + while (true) { + Thread.sleep(1_000); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + fail("Thread sleeping for blocking hasNext() was interrupted."); + } + return true; + } + + @Override + public Row next() { + return null; + } + }; + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java index 914ad2239..d4ac97bf0 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java @@ -43,7 +43,6 @@ import org.apache.fluss.server.zk.data.TableAssignment; import org.apache.fluss.types.DataTypes; import org.apache.flink.test.util.AbstractTestBase; -import org.apache.flink.types.Row; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -61,7 +60,6 @@ import java.util.Set; import static org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment; import static org.apache.fluss.testutils.DataTestUtils.row; import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue; -import static org.assertj.core.api.Assertions.assertThat; /** A base class for testing with Fluss cluster prepared. */ public class FlinkTestBase extends AbstractTestBase { @@ -182,18 +180,6 @@ public class FlinkTestBase extends AbstractTestBase { return admin.getTableInfo(tablePath).get().getTableId(); } - public static List<String> assertAndCollectRecords( - org.apache.flink.util.CloseableIterator<Row> iterator, int expectedNum) - throws Exception { - List<String> actual = new ArrayList<>(expectedNum); - for (int i = 0; i < expectedNum; i++) { - actual.add(iterator.next().toString()); - } - assertThat(iterator.hasNext()).isFalse(); - iterator.close(); - return actual; - } - protected void waitUntilSnapshot(long tableId, long snapshotId) { for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) { TableBucket tableBucket = new TableBucket(tableId, i);