This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new db3bfbc2f14a test: add flink mini cluster integration test for append
function with buffer sort (#17972)
db3bfbc2f14a is described below
commit db3bfbc2f14a81792cea8659b6c941872dafb79f
Author: Shiyan Xu <[email protected]>
AuthorDate: Fri Jan 23 22:40:44 2026 -0600
test: add flink mini cluster integration test for append function with
buffer sort (#17972)
---
.../ITTestAppendWriteFunctionWithBufferSort.java | 468 ++++++++++-----------
1 file changed, 211 insertions(+), 257 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
index 764d6c89ab03..36fa4f3b5082 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
@@ -18,314 +18,268 @@
package org.apache.hudi.sink.append;
-import org.apache.hudi.sink.buffer.BufferType;
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.config.HoodieLockConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.sink.utils.TestWriteBase;
+import org.apache.hudi.configuration.OptionsInference;
+import org.apache.hudi.sink.buffer.BufferType;
+import org.apache.hudi.sink.utils.Pipelines;
+import org.apache.hudi.util.HoodieSchemaConverter;
+import org.apache.hudi.util.JsonDeserializationFunction;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.FlinkMiniCluster;
+import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
+import org.apache.hudi.utils.source.ContinuousFileSource;
-import org.apache.avro.generic.GenericRecord;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.logical.VarCharType;
-import org.junit.jupiter.api.Test;
+import org.apache.flink.util.TestLogger;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
-import java.sql.Timestamp;
-import java.util.ArrayList;
+import java.io.FileWriter;
+import java.io.PrintWriter;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
-import java.util.stream.Collectors;
-
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.Map;
+import java.util.Objects;
/**
- * Test cases for append write functions with buffer sorting.
- * Tests both {@link AppendWriteFunctionWithDisruptorBufferSort} (DISRUPTOR)
and
- * {@link AppendWriteFunctionWithBIMBufferSort} (BOUNDED_IN_MEMORY) buffer
types.
+ * Integration tests for append write functions with buffer sorting using
Flink MiniCluster.
+ *
+ * <p>Tests all buffer types (DISRUPTOR, BOUNDED_IN_MEMORY, NONE) with real
Flink runtime.
+ *
+ * @see AppendWriteFunctionWithDisruptorBufferSort
+ * @see AppendWriteFunctionWithBIMBufferSort
+ * @see AppendWriteFunction
*/
-public class ITTestAppendWriteFunctionWithBufferSort extends TestWriteBase {
+@ExtendWith(FlinkMiniCluster.class)
+public class ITTestAppendWriteFunctionWithBufferSort extends TestLogger {
- private RowType rowType;
+ private static final Map<String, List<String>> EXPECTED = new HashMap<>();
- @Override
- protected void setUp(Configuration conf) {
- conf.set(FlinkOptions.OPERATION, "insert");
- conf.set(FlinkOptions.WRITE_BUFFER_SORT_KEYS, "name,age");
- conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 100L);
-
- List<RowType.RowField> fields = new ArrayList<>();
- fields.add(new RowType.RowField("uuid", VarCharType.STRING_TYPE));
- fields.add(new RowType.RowField("name", VarCharType.STRING_TYPE));
- fields.add(new RowType.RowField("age", new IntType()));
- fields.add(new RowType.RowField("ts", new TimestampType()));
- fields.add(new RowType.RowField("partition", VarCharType.STRING_TYPE));
- this.rowType = new RowType(fields);
+ static {
+ EXPECTED.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1",
"id2,par1,id2,Stephen,33,2000,par1"));
+ EXPECTED.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2",
"id4,par2,id4,Fabian,31,4000,par2"));
+ EXPECTED.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3",
"id6,par3,id6,Emma,20,6000,par3"));
+ EXPECTED.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4",
"id8,par4,id8,Han,56,8000,par4"));
}
- private void configureBufferType(BufferType bufferType) {
- conf.set(FlinkOptions.WRITE_BUFFER_TYPE, bufferType.name());
- if (bufferType == BufferType.DISRUPTOR) {
- conf.set(FlinkOptions.WRITE_BUFFER_DISRUPTOR_RING_SIZE, 1024);
- }
- }
-
- // ==================== Common tests for both buffer types
====================
+ @TempDir
+ File tempFile;
+ /**
+ * Tests basic write with sorting for all buffer types.
+ * Verifies core functionality: records are written and sorted correctly.
+ */
@ParameterizedTest
- @EnumSource(value = BufferType.class, names = {"DISRUPTOR",
"BOUNDED_IN_MEMORY"})
- public void testBufferFlushOnRecordNumberLimit(BufferType bufferType) throws
Exception {
- configureBufferType(bufferType);
+ @EnumSource(BufferType.class)
+ void testBasicWriteWithSorting(BufferType bufferType) throws Exception {
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.toURI().toString());
+ conf.set(FlinkOptions.OPERATION, "insert");
+ conf.set(FlinkOptions.WRITE_BUFFER_TYPE, bufferType.name());
+ conf.set(FlinkOptions.WRITE_BUFFER_SORT_KEYS, "name,age");
- // Create test data that exceeds buffer size (150 records > 100 buffer
size)
- List<RowData> inputData = new ArrayList<>();
- for (int i = 0; i < 150; i++) {
- inputData.add(createRowData("uuid" + i, "Name" + i, i, "1970-01-01
00:00:01.123", "p1"));
+ if (bufferType == BufferType.DISRUPTOR) {
+ conf.set(FlinkOptions.WRITE_BUFFER_DISRUPTOR_RING_SIZE, 1024);
}
- // Write the data
- preparePipeline(conf)
- .consume(inputData)
- .endInput();
-
- // Verify all data was written
- List<GenericRecord> actualData = TestData.readAllData(new
File(conf.get(FlinkOptions.PATH)), rowType, 1);
- assertEquals(150, actualData.size());
+ executeAndVerify(conf, "basic_write_" + bufferType.name().toLowerCase(),
1, EXPECTED);
}
+ /**
+ * Tests small buffer with 100 records for DISRUPTOR and BOUNDED_IN_MEMORY.
+ * With buffer size 32 and 100 records, triggers multiple flushes to test
buffer management.
+ */
@ParameterizedTest
@EnumSource(value = BufferType.class, names = {"DISRUPTOR",
"BOUNDED_IN_MEMORY"})
- public void testBufferFlushOnCheckpoint(BufferType bufferType) throws
Exception {
- configureBufferType(bufferType);
-
- // Create test data
- List<RowData> inputData = Arrays.asList(
- createRowData("uuid1", "Bob", 30, "1970-01-01 00:00:01.123", "p1"),
- createRowData("uuid2", "Alice", 25, "1970-01-01 00:00:01.124", "p1")
- );
-
- // Write the data and flush on checkpoint
- preparePipeline(conf)
- .consume(inputData)
- .checkpoint(1)
- .endInput();
-
- // Verify all data was written
- List<GenericRecord> actualData = TestData.readAllData(new
File(conf.get(FlinkOptions.PATH)), rowType, 1);
- assertEquals(2, actualData.size());
- }
+ void testSmallBufferFlush(BufferType bufferType) throws Exception {
+ // Generate 100 records across 4 partitions
+ int recordCount = 100;
+ // Create source file in a separate directory from the Hudi table
+ File sourceDir = new File(tempFile, "source");
+ sourceDir.mkdirs();
+ File sourceFile = new File(sourceDir, "test_100_records.data");
- @ParameterizedTest
- @EnumSource(value = BufferType.class, names = {"DISRUPTOR",
"BOUNDED_IN_MEMORY"})
- public void testSortedResult(BufferType bufferType) throws Exception {
- configureBufferType(bufferType);
-
- // Create test data with unsorted records (sort keys: name, age)
- List<RowData> inputData = Arrays.asList(
- createRowData("uuid1", "Bob", 30, "1970-01-01 00:00:01.123", "p1"),
- createRowData("uuid2", "Alice", 25, "1970-01-01 00:00:01.124", "p1"),
- createRowData("uuid3", "Bob", 21, "1970-01-01 00:00:31.124", "p1")
- );
-
- // Expected order after sorting by name, then age
- List<String> expected = Arrays.asList(
- "uuid2,Alice,25,1970-01-01 00:00:01.124,p1",
- "uuid3,Bob,21,1970-01-01 00:00:31.124,p1",
- "uuid1,Bob,30,1970-01-01 00:00:01.123,p1");
-
- // Write the data
- preparePipeline(conf)
- .consume(inputData)
- .checkpoint(1)
- .endInput();
-
- // Verify data is sorted correctly
- List<GenericRecord> result = TestData.readAllData(new
File(conf.get(FlinkOptions.PATH)), rowType, 1);
- assertEquals(3, result.size());
-
- List<String> filteredResult = result.stream()
- .map(TestData::filterOutVariablesWithoutHudiMetadata)
- .collect(Collectors.toList());
-
- assertArrayEquals(expected.toArray(), filteredResult.toArray());
- }
+ // Create Hudi table in a separate directory
+ File tableDir = new File(tempFile, "table");
+ tableDir.mkdirs();
- @ParameterizedTest
- @EnumSource(value = BufferType.class, names = {"DISRUPTOR",
"BOUNDED_IN_MEMORY"})
- public void testMultipleCheckpoints(BufferType bufferType) throws Exception {
- configureBufferType(bufferType);
-
- // Create first batch of test data
- List<RowData> batch1 = Arrays.asList(
- createRowData("uuid1", "Charlie", 35, "1970-01-01 00:00:01.123", "p1"),
- createRowData("uuid2", "Alice", 25, "1970-01-01 00:00:01.124", "p1")
- );
-
- // Create second batch of test data
- List<RowData> batch2 = Arrays.asList(
- createRowData("uuid3", "Bob", 30, "1970-01-01 00:00:01.125", "p1"),
- createRowData("uuid4", "Diana", 28, "1970-01-01 00:00:01.126", "p1")
- );
-
- // Write batches with checkpoints between them
- TestHarness testHarness = preparePipeline(conf);
-
- testHarness.consume(batch1).checkpoint(1);
- testHarness.consume(batch2).checkpoint(2);
- testHarness.endInput();
-
- // Verify all data from both batches was written
- List<GenericRecord> actualData = TestData.readAllData(new
File(conf.get(FlinkOptions.PATH)), rowType, 1);
- assertEquals(4, actualData.size());
- }
+ generateTestDataFile(sourceFile, recordCount);
- @ParameterizedTest
- @EnumSource(value = BufferType.class, names = {"DISRUPTOR",
"BOUNDED_IN_MEMORY"})
- public void testLargeDatasetWithMultipleFlushes(BufferType bufferType)
throws Exception {
- configureBufferType(bufferType);
- // Configure small buffer to force multiple flushes
- conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 50L);
-
- // Create large dataset across multiple partitions
- List<RowData> inputData = new ArrayList<>();
- for (int i = 0; i < 500; i++) {
- inputData.add(createRowData("uuid" + i, "Name" + (i % 10), i % 100,
"1970-01-01 00:00:01.123", "p" + (i % 3)));
- }
+ Configuration conf =
TestConfigurations.getDefaultConf(tableDir.toURI().toString());
+ conf.set(FlinkOptions.OPERATION, "insert");
+ conf.set(FlinkOptions.WRITE_BUFFER_TYPE, bufferType.name());
+ conf.set(FlinkOptions.WRITE_BUFFER_SORT_KEYS, "name,age");
+ conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 32L);
- // Write the data
- preparePipeline(conf)
- .consume(inputData)
- .endInput();
+ if (bufferType == BufferType.DISRUPTOR) {
+ conf.set(FlinkOptions.WRITE_BUFFER_DISRUPTOR_RING_SIZE, 64);
+ }
- // Verify all data was written across all partitions
- List<GenericRecord> actualData = TestData.readAllData(new
File(conf.get(FlinkOptions.PATH)), rowType, 3);
- assertEquals(500, actualData.size());
+ executeAndVerifyRecordCount(conf, "small_buffer_" +
bufferType.name().toLowerCase(),
+ sourceFile.toURI().toString(), 1, recordCount, tableDir);
}
+ /**
+ * Tests concurrent write with multiple parallel tasks for all buffer types.
+ * Verifies that buffer management works correctly under concurrent access.
+ */
@ParameterizedTest
- @EnumSource(value = BufferType.class, names = {"DISRUPTOR",
"BOUNDED_IN_MEMORY"})
- public void testSortStabilityWithDuplicateKeys(BufferType bufferType) throws
Exception {
- configureBufferType(bufferType);
-
- // Create test data with duplicate sort keys (same name and age for first
3 records)
- List<RowData> inputData = Arrays.asList(
- createRowData("uuid1", "Alice", 25, "1970-01-01 00:00:01.123", "p1"),
- createRowData("uuid2", "Alice", 25, "1970-01-01 00:00:01.124", "p1"),
- createRowData("uuid3", "Alice", 25, "1970-01-01 00:00:01.125", "p1"),
- createRowData("uuid4", "Bob", 30, "1970-01-01 00:00:01.126", "p1")
- );
-
- // Write the data and flush on checkpoint
- preparePipeline(conf)
- .consume(inputData)
- .checkpoint(1)
- .endInput();
-
- // Verify all data was written and sorted (Alice records before Bob)
- List<GenericRecord> actualData = TestData.readAllData(new
File(conf.get(FlinkOptions.PATH)), rowType, 1);
- assertEquals(4, actualData.size());
-
- List<String> filteredResult = actualData.stream()
- .map(TestData::filterOutVariablesWithoutHudiMetadata)
- .collect(Collectors.toList());
-
- assertTrue(filteredResult.get(0).contains("Alice"));
- assertTrue(filteredResult.get(1).contains("Alice"));
- assertTrue(filteredResult.get(2).contains("Alice"));
- assertTrue(filteredResult.get(3).contains("Bob"));
- }
+ @EnumSource(BufferType.class)
+ void testConcurrentWrite(BufferType bufferType) throws Exception {
+ // Generate 200 records across 4 partitions (50 per partition)
+ int recordCount = 200;
+ File sourceDir = new File(tempFile, "source");
+ sourceDir.mkdirs();
+ File sourceFile = new File(sourceDir, "test_200_records.data");
- @ParameterizedTest
- @EnumSource(value = BufferType.class, names = {"DISRUPTOR",
"BOUNDED_IN_MEMORY"})
- public void testDifferentPartitions(BufferType bufferType) throws Exception {
- configureBufferType(bufferType);
-
- // Create test data across different partitions
- List<RowData> inputData = Arrays.asList(
- createRowData("uuid1", "Alice", 25, "1970-01-01 00:00:01.123", "p1"),
- createRowData("uuid2", "Bob", 30, "1970-01-01 00:00:01.124", "p2"),
- createRowData("uuid3", "Charlie", 35, "1970-01-01 00:00:01.125", "p3"),
- createRowData("uuid4", "Diana", 28, "1970-01-01 00:00:01.126", "p1")
- );
-
- // Write the data and flush on checkpoint
- preparePipeline(conf)
- .consume(inputData)
- .checkpoint(1)
- .endInput();
-
- // Verify all data was written across all partitions
- List<GenericRecord> actualData = TestData.readAllData(new
File(conf.get(FlinkOptions.PATH)), rowType, 3);
- assertEquals(4, actualData.size());
- }
+ File tableDir = new File(tempFile, "table");
+ tableDir.mkdirs();
+
+ generateTestDataFile(sourceFile, recordCount);
- // ==================== BOUNDED_IN_MEMORY-specific tests ====================
+ Configuration conf =
TestConfigurations.getDefaultConf(tableDir.toURI().toString());
+ conf.set(FlinkOptions.OPERATION, "insert");
+ conf.set(FlinkOptions.WRITE_BUFFER_TYPE, bufferType.name());
+ conf.set(FlinkOptions.WRITE_BUFFER_SORT_KEYS, "name,age");
- @Test
- public void testBIMBufferFlushOnBufferSizeLimit() throws Exception {
- configureBufferType(BufferType.BOUNDED_IN_MEMORY);
- // Configure large record count limit but small memory limit to trigger
memory-based flush
- conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 10000L);
- conf.set(FlinkOptions.WRITE_TASK_MAX_SIZE, 400.1D);
+ // Enable OCC with InProcessLockProvider for concurrent write testing
+ conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
+ WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+ conf.setString(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
+ InProcessLockProvider.class.getName());
- // Create large dataset
- List<RowData> inputData = new ArrayList<>();
- for (int i = 0; i < 2000; i++) {
- inputData.add(createRowData("uuid" + i, "Name" + i, i, "1970-01-01
00:00:01.123", "p1"));
+ if (bufferType == BufferType.DISRUPTOR) {
+ conf.set(FlinkOptions.WRITE_BUFFER_DISRUPTOR_RING_SIZE, 1024);
}
- // Write the data
- preparePipeline(conf)
- .consume(inputData)
- .endInput();
+ executeAndVerifyRecordCount(conf, "concurrent_write_" +
bufferType.name().toLowerCase(),
+ sourceFile.toURI().toString(), 1, recordCount, tableDir);
+ }
- // Verify all data was written
- List<GenericRecord> actualData = TestData.readAllData(new
File(conf.get(FlinkOptions.PATH)), rowType, 1);
- assertEquals(2000, actualData.size());
+ /**
+ * Generates test data file with specified record count.
+ */
+ private void generateTestDataFile(File outputFile, int recordCount) throws
Exception {
+ String[] partitions = {"par1", "par2", "par3", "par4"};
+ String[] names = {"Alice", "Bob", "Charlie", "Diana", "Emma", "Frank",
"Grace", "Han"};
+
+ try (PrintWriter writer = new PrintWriter(new FileWriter(outputFile))) {
+ for (int i = 0; i < recordCount; i++) {
+ String partition = partitions[i % partitions.length];
+ String name = names[i % names.length];
+ int age = 20 + (i % 50);
+ int seconds = (i % 60) + 1;
+ String uuid = "id" + i;
+
+ // Write JSON line (ts format matches original test_source.data)
+ writer.printf("{\"uuid\": \"%s\", \"name\": \"%s\", \"age\": %d,
\"ts\": \"1970-01-01T00:00:%02d\", \"partition\": \"%s\"}%n",
+ uuid, name, age, seconds, partition);
+ }
+ }
}
- @Test
- public void testBIMConcurrentWriteScenario() throws Exception {
- configureBufferType(BufferType.BOUNDED_IN_MEMORY);
- // Configure small buffer to trigger frequent async writes
- conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 20L);
+ /**
+ * Tests different Disruptor wait strategies.
+ * Each strategy has different CPU/latency tradeoffs.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = {"BLOCKING_WAIT", "SLEEPING_WAIT", "YIELDING_WAIT"})
+ void testDisruptorWaitStrategies(String waitStrategy) throws Exception {
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.toURI().toString());
+ conf.set(FlinkOptions.OPERATION, "insert");
+ conf.set(FlinkOptions.WRITE_BUFFER_TYPE, BufferType.DISRUPTOR.name());
+ conf.set(FlinkOptions.WRITE_BUFFER_DISRUPTOR_RING_SIZE, 1024);
+ conf.set(FlinkOptions.WRITE_BUFFER_DISRUPTOR_WAIT_STRATEGY, waitStrategy);
+ conf.set(FlinkOptions.WRITE_BUFFER_SORT_KEYS, "name,age");
- // Create test data
- List<RowData> inputData = new ArrayList<>();
- for (int i = 0; i < 200; i++) {
- inputData.add(createRowData("uuid" + i, "Name" + (i % 5), i % 50,
"1970-01-01 00:00:01.123", "p1"));
- }
+ executeAndVerify(conf, "disruptor_wait_" + waitStrategy.toLowerCase(), 1,
EXPECTED);
+ }
- // Write data in small batches with periodic checkpoints
- TestHarness testHarness = preparePipeline(conf);
+ private void executeAndVerify(
+ Configuration conf,
+ String jobName,
+ int checkpoints,
+ Map<String, List<String>> expected) throws Exception {
+ String sourcePath = Objects.requireNonNull(Thread.currentThread()
+ .getContextClassLoader().getResource("test_source.data")).toString();
+ executeAndVerifyWithSource(conf, jobName, sourcePath, checkpoints,
expected, tempFile);
+ }
- for (int i = 0; i < inputData.size(); i += 10) {
- List<RowData> batch = inputData.subList(i, Math.min(i + 10,
inputData.size()));
- testHarness.consume(batch);
- if (i % 50 == 0) {
- testHarness.checkpoint(i / 50 + 1);
- }
- }
- testHarness.endInput();
+ private void executeAndVerifyWithSource(
+ Configuration conf,
+ String jobName,
+ String sourcePath,
+ int checkpoints,
+ Map<String, List<String>> expected,
+ File tableDir) throws Exception {
- // Verify all data was written
- List<GenericRecord> actualData = TestData.readAllData(new
File(conf.get(FlinkOptions.PATH)), rowType, 1);
- assertEquals(200, actualData.size());
+ runPipeline(conf, jobName, sourcePath, checkpoints);
+ TestData.checkWrittenDataCOW(tableDir, expected);
}
- // ==================== Helper methods ====================
+ private void executeAndVerifyRecordCount(
+ Configuration conf,
+ String jobName,
+ String sourcePath,
+ int checkpoints,
+ int expectedCount,
+ File tableDir) throws Exception {
+
+ runPipeline(conf, jobName, sourcePath, checkpoints);
+
+ // Count records in all partitions
+ RowType rowType =
+ (RowType)
HoodieSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+ .getLogicalType();
+ List<org.apache.avro.generic.GenericRecord> records =
TestData.readAllData(tableDir, rowType, 4);
+ org.junit.jupiter.api.Assertions.assertEquals(expectedCount,
records.size(),
+ "Expected " + expectedCount + " records but found " + records.size());
+ }
- private GenericRowData createRowData(String uuid, String name, int age,
String timestamp, String partition) {
- return GenericRowData.of(StringData.fromString(uuid),
StringData.fromString(name),
- age, TimestampData.fromTimestamp(Timestamp.valueOf(timestamp)),
StringData.fromString(partition));
+ private void runPipeline(
+ Configuration conf,
+ String jobName,
+ String sourcePath,
+ int checkpoints) throws Exception {
+
+ Configuration envConf = new Configuration();
+ envConf.setString("execution.checkpointing.interval", "4s");
+ envConf.setString("execution.checkpointing.mode", "EXACTLY_ONCE");
+ envConf.setString("execution.checkpointing.max-concurrent-checkpoints",
"1");
+
+ StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment(envConf);
+ execEnv.getConfig().disableObjectReuse();
+ execEnv.setParallelism(FlinkMiniCluster.DEFAULT_PARALLELISM);
+
+ RowType rowType =
+ (RowType)
HoodieSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+ .getLogicalType();
+
+ DataStream<RowData> dataStream = execEnv
+ .addSource(new ContinuousFileSource.BoundedSourceFunction(new
Path(sourcePath), checkpoints))
+ .name("continuous_file_source")
+ .setParallelism(1)
+ .map(JsonDeserializationFunction.getInstance(rowType))
+ .setParallelism(FlinkMiniCluster.DEFAULT_PARALLELISM);
+
+ OptionsInference.setupSinkTasks(conf, execEnv.getParallelism());
+ DataStream<RowData> pipeline = Pipelines.append(conf, rowType, dataStream);
+ execEnv.addOperator(pipeline.getTransformation());
+
+ execEnv.execute(jobName);
}
}