This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new db3bfbc2f14a test: add flink mini cluster integration test for append 
function with buffer sort (#17972)
db3bfbc2f14a is described below

commit db3bfbc2f14a81792cea8659b6c941872dafb79f
Author: Shiyan Xu <[email protected]>
AuthorDate: Fri Jan 23 22:40:44 2026 -0600

    test: add flink mini cluster integration test for append function with 
buffer sort (#17972)
---
 .../ITTestAppendWriteFunctionWithBufferSort.java   | 468 ++++++++++-----------
 1 file changed, 211 insertions(+), 257 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
index 764d6c89ab03..36fa4f3b5082 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
@@ -18,314 +18,268 @@
 
 package org.apache.hudi.sink.append;
 
-import org.apache.hudi.sink.buffer.BufferType;
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.config.HoodieLockConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.sink.utils.TestWriteBase;
+import org.apache.hudi.configuration.OptionsInference;
+import org.apache.hudi.sink.buffer.BufferType;
+import org.apache.hudi.sink.utils.Pipelines;
+import org.apache.hudi.util.HoodieSchemaConverter;
+import org.apache.hudi.util.JsonDeserializationFunction;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.FlinkMiniCluster;
+import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
+import org.apache.hudi.utils.source.ContinuousFileSource;
 
-import org.apache.avro.generic.GenericRecord;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.logical.VarCharType;
-import org.junit.jupiter.api.Test;
+import org.apache.flink.util.TestLogger;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
-import java.sql.Timestamp;
-import java.util.ArrayList;
+import java.io.FileWriter;
+import java.io.PrintWriter;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
-import java.util.stream.Collectors;
-
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.Map;
+import java.util.Objects;
 
 /**
- * Test cases for append write functions with buffer sorting.
- * Tests both {@link AppendWriteFunctionWithDisruptorBufferSort} (DISRUPTOR) 
and
- * {@link AppendWriteFunctionWithBIMBufferSort} (BOUNDED_IN_MEMORY) buffer 
types.
+ * Integration tests for append write functions with buffer sorting using 
Flink MiniCluster.
+ *
+ * <p>Tests all buffer types (DISRUPTOR, BOUNDED_IN_MEMORY, NONE) with real 
Flink runtime.
+ *
+ * @see AppendWriteFunctionWithDisruptorBufferSort
+ * @see AppendWriteFunctionWithBIMBufferSort
+ * @see AppendWriteFunction
  */
-public class ITTestAppendWriteFunctionWithBufferSort extends TestWriteBase {
+@ExtendWith(FlinkMiniCluster.class)
+public class ITTestAppendWriteFunctionWithBufferSort extends TestLogger {
 
-  private RowType rowType;
+  private static final Map<String, List<String>> EXPECTED = new HashMap<>();
 
-  @Override
-  protected void setUp(Configuration conf) {
-    conf.set(FlinkOptions.OPERATION, "insert");
-    conf.set(FlinkOptions.WRITE_BUFFER_SORT_KEYS, "name,age");
-    conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 100L);
-
-    List<RowType.RowField> fields = new ArrayList<>();
-    fields.add(new RowType.RowField("uuid", VarCharType.STRING_TYPE));
-    fields.add(new RowType.RowField("name", VarCharType.STRING_TYPE));
-    fields.add(new RowType.RowField("age", new IntType()));
-    fields.add(new RowType.RowField("ts", new TimestampType()));
-    fields.add(new RowType.RowField("partition", VarCharType.STRING_TYPE));
-    this.rowType = new RowType(fields);
+  static {
+    EXPECTED.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", 
"id2,par1,id2,Stephen,33,2000,par1"));
+    EXPECTED.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", 
"id4,par2,id4,Fabian,31,4000,par2"));
+    EXPECTED.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", 
"id6,par3,id6,Emma,20,6000,par3"));
+    EXPECTED.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", 
"id8,par4,id8,Han,56,8000,par4"));
   }
 
-  private void configureBufferType(BufferType bufferType) {
-    conf.set(FlinkOptions.WRITE_BUFFER_TYPE, bufferType.name());
-    if (bufferType == BufferType.DISRUPTOR) {
-      conf.set(FlinkOptions.WRITE_BUFFER_DISRUPTOR_RING_SIZE, 1024);
-    }
-  }
-
-  // ==================== Common tests for both buffer types 
====================
+  @TempDir
+  File tempFile;
 
+  /**
+   * Tests basic write with sorting for all buffer types.
+   * Verifies core functionality: records are written and sorted correctly.
+   */
   @ParameterizedTest
-  @EnumSource(value = BufferType.class, names = {"DISRUPTOR", 
"BOUNDED_IN_MEMORY"})
-  public void testBufferFlushOnRecordNumberLimit(BufferType bufferType) throws 
Exception {
-    configureBufferType(bufferType);
+  @EnumSource(BufferType.class)
+  void testBasicWriteWithSorting(BufferType bufferType) throws Exception {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.toURI().toString());
+    conf.set(FlinkOptions.OPERATION, "insert");
+    conf.set(FlinkOptions.WRITE_BUFFER_TYPE, bufferType.name());
+    conf.set(FlinkOptions.WRITE_BUFFER_SORT_KEYS, "name,age");
 
-    // Create test data that exceeds buffer size (150 records > 100 buffer 
size)
-    List<RowData> inputData = new ArrayList<>();
-    for (int i = 0; i < 150; i++) {
-      inputData.add(createRowData("uuid" + i, "Name" + i, i, "1970-01-01 
00:00:01.123", "p1"));
+    if (bufferType == BufferType.DISRUPTOR) {
+      conf.set(FlinkOptions.WRITE_BUFFER_DISRUPTOR_RING_SIZE, 1024);
     }
 
-    // Write the data
-    preparePipeline(conf)
-        .consume(inputData)
-        .endInput();
-
-    // Verify all data was written
-    List<GenericRecord> actualData = TestData.readAllData(new 
File(conf.get(FlinkOptions.PATH)), rowType, 1);
-    assertEquals(150, actualData.size());
+    executeAndVerify(conf, "basic_write_" + bufferType.name().toLowerCase(), 
1, EXPECTED);
   }
 
+  /**
+   * Tests small buffer with 100 records for DISRUPTOR and BOUNDED_IN_MEMORY.
+   * With buffer size 32 and 100 records, triggers multiple flushes to test 
buffer management.
+   */
   @ParameterizedTest
   @EnumSource(value = BufferType.class, names = {"DISRUPTOR", 
"BOUNDED_IN_MEMORY"})
-  public void testBufferFlushOnCheckpoint(BufferType bufferType) throws 
Exception {
-    configureBufferType(bufferType);
-
-    // Create test data
-    List<RowData> inputData = Arrays.asList(
-        createRowData("uuid1", "Bob", 30, "1970-01-01 00:00:01.123", "p1"),
-        createRowData("uuid2", "Alice", 25, "1970-01-01 00:00:01.124", "p1")
-    );
-
-    // Write the data and flush on checkpoint
-    preparePipeline(conf)
-        .consume(inputData)
-        .checkpoint(1)
-        .endInput();
-
-    // Verify all data was written
-    List<GenericRecord> actualData = TestData.readAllData(new 
File(conf.get(FlinkOptions.PATH)), rowType, 1);
-    assertEquals(2, actualData.size());
-  }
+  void testSmallBufferFlush(BufferType bufferType) throws Exception {
+    // Generate 100 records across 4 partitions
+    int recordCount = 100;
+    // Create source file in a separate directory from the Hudi table
+    File sourceDir = new File(tempFile, "source");
+    sourceDir.mkdirs();
+    File sourceFile = new File(sourceDir, "test_100_records.data");
 
-  @ParameterizedTest
-  @EnumSource(value = BufferType.class, names = {"DISRUPTOR", 
"BOUNDED_IN_MEMORY"})
-  public void testSortedResult(BufferType bufferType) throws Exception {
-    configureBufferType(bufferType);
-
-    // Create test data with unsorted records (sort keys: name, age)
-    List<RowData> inputData = Arrays.asList(
-        createRowData("uuid1", "Bob", 30, "1970-01-01 00:00:01.123", "p1"),
-        createRowData("uuid2", "Alice", 25, "1970-01-01 00:00:01.124", "p1"),
-        createRowData("uuid3", "Bob", 21, "1970-01-01 00:00:31.124", "p1")
-    );
-
-    // Expected order after sorting by name, then age
-    List<String> expected = Arrays.asList(
-        "uuid2,Alice,25,1970-01-01 00:00:01.124,p1",
-        "uuid3,Bob,21,1970-01-01 00:00:31.124,p1",
-        "uuid1,Bob,30,1970-01-01 00:00:01.123,p1");
-
-    // Write the data
-    preparePipeline(conf)
-        .consume(inputData)
-        .checkpoint(1)
-        .endInput();
-
-    // Verify data is sorted correctly
-    List<GenericRecord> result = TestData.readAllData(new 
File(conf.get(FlinkOptions.PATH)), rowType, 1);
-    assertEquals(3, result.size());
-
-    List<String> filteredResult = result.stream()
-        .map(TestData::filterOutVariablesWithoutHudiMetadata)
-        .collect(Collectors.toList());
-
-    assertArrayEquals(expected.toArray(), filteredResult.toArray());
-  }
+    // Create Hudi table in a separate directory
+    File tableDir = new File(tempFile, "table");
+    tableDir.mkdirs();
 
-  @ParameterizedTest
-  @EnumSource(value = BufferType.class, names = {"DISRUPTOR", 
"BOUNDED_IN_MEMORY"})
-  public void testMultipleCheckpoints(BufferType bufferType) throws Exception {
-    configureBufferType(bufferType);
-
-    // Create first batch of test data
-    List<RowData> batch1 = Arrays.asList(
-        createRowData("uuid1", "Charlie", 35, "1970-01-01 00:00:01.123", "p1"),
-        createRowData("uuid2", "Alice", 25, "1970-01-01 00:00:01.124", "p1")
-    );
-
-    // Create second batch of test data
-    List<RowData> batch2 = Arrays.asList(
-        createRowData("uuid3", "Bob", 30, "1970-01-01 00:00:01.125", "p1"),
-        createRowData("uuid4", "Diana", 28, "1970-01-01 00:00:01.126", "p1")
-    );
-
-    // Write batches with checkpoints between them
-    TestHarness testHarness = preparePipeline(conf);
-
-    testHarness.consume(batch1).checkpoint(1);
-    testHarness.consume(batch2).checkpoint(2);
-    testHarness.endInput();
-
-    // Verify all data from both batches was written
-    List<GenericRecord> actualData = TestData.readAllData(new 
File(conf.get(FlinkOptions.PATH)), rowType, 1);
-    assertEquals(4, actualData.size());
-  }
+    generateTestDataFile(sourceFile, recordCount);
 
-  @ParameterizedTest
-  @EnumSource(value = BufferType.class, names = {"DISRUPTOR", 
"BOUNDED_IN_MEMORY"})
-  public void testLargeDatasetWithMultipleFlushes(BufferType bufferType) 
throws Exception {
-    configureBufferType(bufferType);
-    // Configure small buffer to force multiple flushes
-    conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 50L);
-
-    // Create large dataset across multiple partitions
-    List<RowData> inputData = new ArrayList<>();
-    for (int i = 0; i < 500; i++) {
-      inputData.add(createRowData("uuid" + i, "Name" + (i % 10), i % 100, 
"1970-01-01 00:00:01.123", "p" + (i % 3)));
-    }
+    Configuration conf = 
TestConfigurations.getDefaultConf(tableDir.toURI().toString());
+    conf.set(FlinkOptions.OPERATION, "insert");
+    conf.set(FlinkOptions.WRITE_BUFFER_TYPE, bufferType.name());
+    conf.set(FlinkOptions.WRITE_BUFFER_SORT_KEYS, "name,age");
+    conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 32L);
 
-    // Write the data
-    preparePipeline(conf)
-        .consume(inputData)
-        .endInput();
+    if (bufferType == BufferType.DISRUPTOR) {
+      conf.set(FlinkOptions.WRITE_BUFFER_DISRUPTOR_RING_SIZE, 64);
+    }
 
-    // Verify all data was written across all partitions
-    List<GenericRecord> actualData = TestData.readAllData(new 
File(conf.get(FlinkOptions.PATH)), rowType, 3);
-    assertEquals(500, actualData.size());
+    executeAndVerifyRecordCount(conf, "small_buffer_" + 
bufferType.name().toLowerCase(),
+        sourceFile.toURI().toString(), 1, recordCount, tableDir);
   }
 
+  /**
+   * Tests concurrent write with multiple parallel tasks for all buffer types.
+   * Verifies that buffer management works correctly under concurrent access.
+   */
   @ParameterizedTest
-  @EnumSource(value = BufferType.class, names = {"DISRUPTOR", 
"BOUNDED_IN_MEMORY"})
-  public void testSortStabilityWithDuplicateKeys(BufferType bufferType) throws 
Exception {
-    configureBufferType(bufferType);
-
-    // Create test data with duplicate sort keys (same name and age for first 
3 records)
-    List<RowData> inputData = Arrays.asList(
-        createRowData("uuid1", "Alice", 25, "1970-01-01 00:00:01.123", "p1"),
-        createRowData("uuid2", "Alice", 25, "1970-01-01 00:00:01.124", "p1"),
-        createRowData("uuid3", "Alice", 25, "1970-01-01 00:00:01.125", "p1"),
-        createRowData("uuid4", "Bob", 30, "1970-01-01 00:00:01.126", "p1")
-    );
-
-    // Write the data and flush on checkpoint
-    preparePipeline(conf)
-        .consume(inputData)
-        .checkpoint(1)
-        .endInput();
-
-    // Verify all data was written and sorted (Alice records before Bob)
-    List<GenericRecord> actualData = TestData.readAllData(new 
File(conf.get(FlinkOptions.PATH)), rowType, 1);
-    assertEquals(4, actualData.size());
-
-    List<String> filteredResult = actualData.stream()
-        .map(TestData::filterOutVariablesWithoutHudiMetadata)
-        .collect(Collectors.toList());
-
-    assertTrue(filteredResult.get(0).contains("Alice"));
-    assertTrue(filteredResult.get(1).contains("Alice"));
-    assertTrue(filteredResult.get(2).contains("Alice"));
-    assertTrue(filteredResult.get(3).contains("Bob"));
-  }
+  @EnumSource(BufferType.class)
+  void testConcurrentWrite(BufferType bufferType) throws Exception {
+    // Generate 200 records across 4 partitions (50 per partition)
+    int recordCount = 200;
+    File sourceDir = new File(tempFile, "source");
+    sourceDir.mkdirs();
+    File sourceFile = new File(sourceDir, "test_200_records.data");
 
-  @ParameterizedTest
-  @EnumSource(value = BufferType.class, names = {"DISRUPTOR", 
"BOUNDED_IN_MEMORY"})
-  public void testDifferentPartitions(BufferType bufferType) throws Exception {
-    configureBufferType(bufferType);
-
-    // Create test data across different partitions
-    List<RowData> inputData = Arrays.asList(
-        createRowData("uuid1", "Alice", 25, "1970-01-01 00:00:01.123", "p1"),
-        createRowData("uuid2", "Bob", 30, "1970-01-01 00:00:01.124", "p2"),
-        createRowData("uuid3", "Charlie", 35, "1970-01-01 00:00:01.125", "p3"),
-        createRowData("uuid4", "Diana", 28, "1970-01-01 00:00:01.126", "p1")
-    );
-
-    // Write the data and flush on checkpoint
-    preparePipeline(conf)
-        .consume(inputData)
-        .checkpoint(1)
-        .endInput();
-
-    // Verify all data was written across all partitions
-    List<GenericRecord> actualData = TestData.readAllData(new 
File(conf.get(FlinkOptions.PATH)), rowType, 3);
-    assertEquals(4, actualData.size());
-  }
+    File tableDir = new File(tempFile, "table");
+    tableDir.mkdirs();
+
+    generateTestDataFile(sourceFile, recordCount);
 
-  // ==================== BOUNDED_IN_MEMORY-specific tests ====================
+    Configuration conf = 
TestConfigurations.getDefaultConf(tableDir.toURI().toString());
+    conf.set(FlinkOptions.OPERATION, "insert");
+    conf.set(FlinkOptions.WRITE_BUFFER_TYPE, bufferType.name());
+    conf.set(FlinkOptions.WRITE_BUFFER_SORT_KEYS, "name,age");
 
-  @Test
-  public void testBIMBufferFlushOnBufferSizeLimit() throws Exception {
-    configureBufferType(BufferType.BOUNDED_IN_MEMORY);
-    // Configure large record count limit but small memory limit to trigger 
memory-based flush
-    conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 10000L);
-    conf.set(FlinkOptions.WRITE_TASK_MAX_SIZE, 400.1D);
+    // Enable OCC with InProcessLockProvider for concurrent write testing
+    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
+        WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+    conf.setString(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
+        InProcessLockProvider.class.getName());
 
-    // Create large dataset
-    List<RowData> inputData = new ArrayList<>();
-    for (int i = 0; i < 2000; i++) {
-      inputData.add(createRowData("uuid" + i, "Name" + i, i, "1970-01-01 
00:00:01.123", "p1"));
+    if (bufferType == BufferType.DISRUPTOR) {
+      conf.set(FlinkOptions.WRITE_BUFFER_DISRUPTOR_RING_SIZE, 1024);
     }
 
-    // Write the data
-    preparePipeline(conf)
-        .consume(inputData)
-        .endInput();
+    executeAndVerifyRecordCount(conf, "concurrent_write_" + 
bufferType.name().toLowerCase(),
+        sourceFile.toURI().toString(), 1, recordCount, tableDir);
+  }
 
-    // Verify all data was written
-    List<GenericRecord> actualData = TestData.readAllData(new 
File(conf.get(FlinkOptions.PATH)), rowType, 1);
-    assertEquals(2000, actualData.size());
+  /**
+   * Generates test data file with specified record count.
+   */
+  private void generateTestDataFile(File outputFile, int recordCount) throws 
Exception {
+    String[] partitions = {"par1", "par2", "par3", "par4"};
+    String[] names = {"Alice", "Bob", "Charlie", "Diana", "Emma", "Frank", 
"Grace", "Han"};
+
+    try (PrintWriter writer = new PrintWriter(new FileWriter(outputFile))) {
+      for (int i = 0; i < recordCount; i++) {
+        String partition = partitions[i % partitions.length];
+        String name = names[i % names.length];
+        int age = 20 + (i % 50);
+        int seconds = (i % 60) + 1;
+        String uuid = "id" + i;
+
+        // Write JSON line (ts format matches original test_source.data)
+        writer.printf("{\"uuid\": \"%s\", \"name\": \"%s\", \"age\": %d, 
\"ts\": \"1970-01-01T00:00:%02d\", \"partition\": \"%s\"}%n",
+            uuid, name, age, seconds, partition);
+      }
+    }
   }
 
-  @Test
-  public void testBIMConcurrentWriteScenario() throws Exception {
-    configureBufferType(BufferType.BOUNDED_IN_MEMORY);
-    // Configure small buffer to trigger frequent async writes
-    conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 20L);
+  /**
+   * Tests different Disruptor wait strategies.
+   * Each strategy has different CPU/latency tradeoffs.
+   */
+  @ParameterizedTest
+  @ValueSource(strings = {"BLOCKING_WAIT", "SLEEPING_WAIT", "YIELDING_WAIT"})
+  void testDisruptorWaitStrategies(String waitStrategy) throws Exception {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.toURI().toString());
+    conf.set(FlinkOptions.OPERATION, "insert");
+    conf.set(FlinkOptions.WRITE_BUFFER_TYPE, BufferType.DISRUPTOR.name());
+    conf.set(FlinkOptions.WRITE_BUFFER_DISRUPTOR_RING_SIZE, 1024);
+    conf.set(FlinkOptions.WRITE_BUFFER_DISRUPTOR_WAIT_STRATEGY, waitStrategy);
+    conf.set(FlinkOptions.WRITE_BUFFER_SORT_KEYS, "name,age");
 
-    // Create test data
-    List<RowData> inputData = new ArrayList<>();
-    for (int i = 0; i < 200; i++) {
-      inputData.add(createRowData("uuid" + i, "Name" + (i % 5), i % 50, 
"1970-01-01 00:00:01.123", "p1"));
-    }
+    executeAndVerify(conf, "disruptor_wait_" + waitStrategy.toLowerCase(), 1, 
EXPECTED);
+  }
 
-    // Write data in small batches with periodic checkpoints
-    TestHarness testHarness = preparePipeline(conf);
+  private void executeAndVerify(
+      Configuration conf,
+      String jobName,
+      int checkpoints,
+      Map<String, List<String>> expected) throws Exception {
+    String sourcePath = Objects.requireNonNull(Thread.currentThread()
+        .getContextClassLoader().getResource("test_source.data")).toString();
+    executeAndVerifyWithSource(conf, jobName, sourcePath, checkpoints, 
expected, tempFile);
+  }
 
-    for (int i = 0; i < inputData.size(); i += 10) {
-      List<RowData> batch = inputData.subList(i, Math.min(i + 10, 
inputData.size()));
-      testHarness.consume(batch);
-      if (i % 50 == 0) {
-        testHarness.checkpoint(i / 50 + 1);
-      }
-    }
-    testHarness.endInput();
+  private void executeAndVerifyWithSource(
+      Configuration conf,
+      String jobName,
+      String sourcePath,
+      int checkpoints,
+      Map<String, List<String>> expected,
+      File tableDir) throws Exception {
 
-    // Verify all data was written
-    List<GenericRecord> actualData = TestData.readAllData(new 
File(conf.get(FlinkOptions.PATH)), rowType, 1);
-    assertEquals(200, actualData.size());
+    runPipeline(conf, jobName, sourcePath, checkpoints);
+    TestData.checkWrittenDataCOW(tableDir, expected);
   }
 
-  // ==================== Helper methods ====================
+  private void executeAndVerifyRecordCount(
+      Configuration conf,
+      String jobName,
+      String sourcePath,
+      int checkpoints,
+      int expectedCount,
+      File tableDir) throws Exception {
+
+    runPipeline(conf, jobName, sourcePath, checkpoints);
+
+    // Count records in all partitions
+    RowType rowType =
+        (RowType) 
HoodieSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+            .getLogicalType();
+    List<org.apache.avro.generic.GenericRecord> records = 
TestData.readAllData(tableDir, rowType, 4);
+    org.junit.jupiter.api.Assertions.assertEquals(expectedCount, 
records.size(),
+        "Expected " + expectedCount + " records but found " + records.size());
+  }
 
-  private GenericRowData createRowData(String uuid, String name, int age, 
String timestamp, String partition) {
-    return GenericRowData.of(StringData.fromString(uuid), 
StringData.fromString(name),
-        age, TimestampData.fromTimestamp(Timestamp.valueOf(timestamp)), 
StringData.fromString(partition));
+  private void runPipeline(
+      Configuration conf,
+      String jobName,
+      String sourcePath,
+      int checkpoints) throws Exception {
+
+    Configuration envConf = new Configuration();
+    envConf.setString("execution.checkpointing.interval", "4s");
+    envConf.setString("execution.checkpointing.mode", "EXACTLY_ONCE");
+    envConf.setString("execution.checkpointing.max-concurrent-checkpoints", 
"1");
+
+    StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment(envConf);
+    execEnv.getConfig().disableObjectReuse();
+    execEnv.setParallelism(FlinkMiniCluster.DEFAULT_PARALLELISM);
+
+    RowType rowType =
+        (RowType) 
HoodieSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+            .getLogicalType();
+
+    DataStream<RowData> dataStream = execEnv
+        .addSource(new ContinuousFileSource.BoundedSourceFunction(new 
Path(sourcePath), checkpoints))
+        .name("continuous_file_source")
+        .setParallelism(1)
+        .map(JsonDeserializationFunction.getInstance(rowType))
+        .setParallelism(FlinkMiniCluster.DEFAULT_PARALLELISM);
+
+    OptionsInference.setupSinkTasks(conf, execEnv.getParallelism());
+    DataStream<RowData> pipeline = Pipelines.append(conf, rowType, dataStream);
+    execEnv.addOperator(pipeline.getTransformation());
+
+    execEnv.execute(jobName);
   }
 }

Reply via email to