This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f5d1bb84e29b [HUDI-9784] Adding mysql binlog validation for debezium
(#13837)
f5d1bb84e29b is described below
commit f5d1bb84e29b7637f58d6434d4c0c8c945ea3e7a
Author: Alex R <[email protected]>
AuthorDate: Thu Sep 4 16:29:43 2025 -0700
[HUDI-9784] Adding mysql binlog validation for debezium (#13837)
---
.../sources/debezium/MysqlDebeziumSource.java | 7 +++++
.../sources/debezium/TestMysqlDebeziumSource.java | 36 ++++++++++++++++++++++
2 files changed, 43 insertions(+)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/MysqlDebeziumSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/MysqlDebeziumSource.java
index 79c990c10d5d..399b94fb959b 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/MysqlDebeziumSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/MysqlDebeziumSource.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.sources.debezium;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.debezium.DebeziumConstants;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -97,6 +98,12 @@ public class MysqlDebeziumSource extends DebeziumSource {
}
private static String generateUniqueSequence(String fileId, Long pos) {
+ // Minimal validations to ensure fileId and pos are valid.
+ if (fileId == null || fileId.trim().isEmpty() || pos == null || pos < 0) {
+ throw new HoodieReadFromSourceException(
+ String.format("Invalid binlog file information from Debezium:
fileId=%s, pos=%s", fileId, pos));
+ }
+
return fileId.substring(fileId.lastIndexOf('.') + 1).concat("." + pos);
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestMysqlDebeziumSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestMysqlDebeziumSource.java
index 1d09cc8e4aae..7ceafba814a7 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestMysqlDebeziumSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestMysqlDebeziumSource.java
@@ -19,13 +19,20 @@
package org.apache.hudi.utilities.sources.debezium;
import org.apache.hudi.common.model.debezium.DebeziumConstants;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.Test;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestMysqlDebeziumSource extends TestAbstractDebeziumSource {
@@ -93,4 +100,33 @@ public class TestMysqlDebeziumSource extends
TestAbstractDebeziumSource {
assertTrue(records.select(DebeziumConstants.ADDED_SEQ_COL_NAME).collectAsList().stream()
.allMatch(r -> r.getString(0).equals(EXPECTED_TEST_SEQ)));
}
+
+ private String invokeGenerateUniqueSequence(String fileId, Long pos) throws
Exception {
+ Method method =
MysqlDebeziumSource.class.getDeclaredMethod("generateUniqueSequence",
String.class, Long.class);
+ method.setAccessible(true);
+ try {
+ return (String) method.invoke(null, fileId, pos);
+ } catch (InvocationTargetException e) {
+ throw (Exception) e.getCause();
+ }
+ }
+
+ @Test
+ public void testGenerateUniqueSequenceValid() throws Exception {
+ assertEquals("288947.12345",
invokeGenerateUniqueSequence("mysql-bin-changelog.288947", 12345L));
+ assertEquals("binlog.0", invokeGenerateUniqueSequence("binlog", 0L));
+ }
+
+ @Test
+ public void testGenerateUniqueSequenceInvalidFileId() {
+ assertThrows(HoodieReadFromSourceException.class, () ->
invokeGenerateUniqueSequence(null, 12345L));
+ assertThrows(HoodieReadFromSourceException.class, () ->
invokeGenerateUniqueSequence("", 0L));
+ assertThrows(HoodieReadFromSourceException.class, () ->
invokeGenerateUniqueSequence(" ", 12345L));
+ }
+
+ @Test
+ public void testGenerateUniqueSequenceInvalidPos() {
+ assertThrows(HoodieReadFromSourceException.class, () ->
invokeGenerateUniqueSequence("mysql-bin.288947", null));
+ assertThrows(HoodieReadFromSourceException.class, () ->
invokeGenerateUniqueSequence("mysql-bin.288947", -1L));
+ }
}