This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f5d1bb84e29b [HUDI-9784] Adding mysql binlog validation for debezium 
(#13837)
f5d1bb84e29b is described below

commit f5d1bb84e29b7637f58d6434d4c0c8c945ea3e7a
Author: Alex R <[email protected]>
AuthorDate: Thu Sep 4 16:29:43 2025 -0700

    [HUDI-9784] Adding mysql binlog validation for debezium (#13837)
---
 .../sources/debezium/MysqlDebeziumSource.java      |  7 +++++
 .../sources/debezium/TestMysqlDebeziumSource.java  | 36 ++++++++++++++++++++++
 2 files changed, 43 insertions(+)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/MysqlDebeziumSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/MysqlDebeziumSource.java
index 79c990c10d5d..399b94fb959b 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/MysqlDebeziumSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/MysqlDebeziumSource.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.sources.debezium;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.debezium.DebeziumConstants;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
 import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 
@@ -97,6 +98,12 @@ public class MysqlDebeziumSource extends DebeziumSource {
   }
 
   private static String generateUniqueSequence(String fileId, Long pos) {
+    // Minimal validations to ensure fileId and pos are valid.
+    if (fileId == null || fileId.trim().isEmpty() || pos == null || pos < 0) {
+      throw new HoodieReadFromSourceException(
+          String.format("Invalid binlog file information from Debezium: 
fileId=%s, pos=%s", fileId, pos));
+    }
+
     return fileId.substring(fileId.lastIndexOf('.') + 1).concat("." + pos);
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestMysqlDebeziumSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestMysqlDebeziumSource.java
index 1d09cc8e4aae..7ceafba814a7 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestMysqlDebeziumSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestMysqlDebeziumSource.java
@@ -19,13 +19,20 @@
 package org.apache.hudi.utilities.sources.debezium;
 
 import org.apache.hudi.common.model.debezium.DebeziumConstants;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.Test;
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestMysqlDebeziumSource extends TestAbstractDebeziumSource {
@@ -93,4 +100,33 @@ public class TestMysqlDebeziumSource extends 
TestAbstractDebeziumSource {
     
assertTrue(records.select(DebeziumConstants.ADDED_SEQ_COL_NAME).collectAsList().stream()
         .allMatch(r -> r.getString(0).equals(EXPECTED_TEST_SEQ)));
   }
+
+  private String invokeGenerateUniqueSequence(String fileId, Long pos) throws 
Exception {
+    Method method = 
MysqlDebeziumSource.class.getDeclaredMethod("generateUniqueSequence", 
String.class, Long.class);
+    method.setAccessible(true);
+    try {
+      return (String) method.invoke(null, fileId, pos);
+    } catch (InvocationTargetException e) {
+      throw (Exception) e.getCause();
+    }
+  }
+
+  @Test
+  public void testGenerateUniqueSequenceValid() throws Exception {
+    assertEquals("288947.12345", 
invokeGenerateUniqueSequence("mysql-bin-changelog.288947", 12345L));
+    assertEquals("binlog.0", invokeGenerateUniqueSequence("binlog", 0L));
+  }
+
+  @Test
+  public void testGenerateUniqueSequenceInvalidFileId() {
+    assertThrows(HoodieReadFromSourceException.class, () -> 
invokeGenerateUniqueSequence(null, 12345L));
+    assertThrows(HoodieReadFromSourceException.class, () -> 
invokeGenerateUniqueSequence("", 0L));
+    assertThrows(HoodieReadFromSourceException.class, () -> 
invokeGenerateUniqueSequence("   ", 12345L));
+  }
+
+  @Test
+  public void testGenerateUniqueSequenceInvalidPos() {
+    assertThrows(HoodieReadFromSourceException.class, () -> 
invokeGenerateUniqueSequence("mysql-bin.288947", null));
+    assertThrows(HoodieReadFromSourceException.class, () -> 
invokeGenerateUniqueSequence("mysql-bin.288947", -1L));
+  }
 }

Reply via email to