XuQianJin-Stars commented on a change in pull request #4987:
URL: https://github.com/apache/hudi/pull/4987#discussion_r826520719



##########
File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java
##########
@@ -120,6 +132,124 @@ public void testChainedJsonKafkaSourcePostProcessor() {
     assertEquals(0, fetch1.getBatch().get().count());
   }
 
+  @Test
+  public void testMaxwellJsonKafkaSourcePostProcessor() throws IOException {
+    // ------------------------------------------------------------------------
+    //  Maxwell data
+    // ------------------------------------------------------------------------
+
+    // database hudi, table hudi_maxwell_01 (insert, update and delete)
+    String hudiMaxwell01Insert = 
"{\"database\":\"hudi\",\"table\":\"hudi_maxwell_01\",\"type\":\"insert\","
+        + 
"\"ts\":1647074402,\"xid\":6233,\"commit\":true,\"data\":{\"id\":\"6018220e39e74477b45c7cf42f66bdc0\","
+        + "\"name\":\"mathieu\",\"age\":18,\"insert_time\":\"2022-03-12 
08:40:02\","
+        + "\"update_time\":\"2022-03-12 08:40:02\"}}";
+
+    String hudiMaxwell01Update = 
"{\"database\":\"hudi\",\"table\":\"hudi_maxwell_01\",\"type\":\"update\","
+        + 
"\"ts\":1647074482,\"xid\":6440,\"commit\":true,\"data\":{\"id\":\"6018220e39e74477b45c7cf42f66bdc0\","
+        + "\"name\":\"mathieu\",\"age\":20,\"insert_time\":\"2022-03-12 
04:40:02\",\"update_time\":\"2022-03-12 04:42:25\"},"
+        + "\"old\":{\"age\":18,\"insert_time\":\"2022-03-12 
08:40:02\",\"update_time\":\"2022-03-12 08:40:02\"}}";
+
+    String hudiMaxwell01Delete = 
"{\"database\":\"hudi\",\"table\":\"hudi_maxwell_01\",\"type\":\"delete\","
+        + 
"\"ts\":1647074555,\"xid\":6631,\"commit\":true,\"data\":{\"id\":\"6018220e39e74477b45c7cf42f66bdc0\","
+        + "\"name\":\"mathieu\",\"age\":20,\"insert_time\":\"2022-03-12 
04:40:02\",\"update_time\":\"2022-03-12 04:42:25\"}}";
+
+    String hudiMaxwell01Ddl = 
"{\"type\":\"table-alter\",\"database\":\"hudi\",\"table\":\"hudi_maxwell_01\","
+        + 
"\"old\":{\"database\":\"hudi\",\"charset\":\"utf8\",\"table\":\"hudi_maxwell_01\","
+        + 
"\"primary-key\":[\"id\"],\"columns\":[{\"type\":\"varchar\",\"name\":\"id\",\"charset\":\"utf8\"},"
+        + 
"{\"type\":\"varchar\",\"name\":\"name\",\"charset\":\"utf8\"},{\"type\":\"int\",\"name\":\"age\","
+        + 
"\"signed\":true},{\"type\":\"timestamp\",\"name\":\"insert_time\",\"column-length\":0},"
+        + 
"{\"type\":\"timestamp\",\"name\":\"update_time\",\"column-length\":0}]},\"def\":{\"database\":\"hudi\","
+        + 
"\"charset\":\"utf8\",\"table\":\"hudi_maxwell_01\",\"primary-key\":[\"id\"],"
+        + 
"\"columns\":[{\"type\":\"varchar\",\"name\":\"id\",\"charset\":\"utf8\"},{\"type\":\"varchar\","
+        + 
"\"name\":\"name\",\"charset\":\"utf8\"},{\"type\":\"int\",\"name\":\"age\",\"signed\":true},"
+        + 
"{\"type\":\"timestamp\",\"name\":\"insert_time\",\"column-length\":0},{\"type\":\"timestamp\","
+        + 
"\"name\":\"update_time\",\"column-length\":0}]},\"ts\":1647072305000,\"sql\":\"/*
 ApplicationName=DBeaver "
+        + "21.0.4 - Main */ ALTER TABLE hudi.hudi_maxwell_01 MODIFY COLUMN age 
int(3) NULL\"}";
+
+    // database hudi, table hudi_maxwell_010, insert
+    String hudiMaxwell010Insert = 
"{\"database\":\"hudi\",\"table\":\"hudi_maxwell_010\",\"type\":\"insert\","
+        + 
"\"ts\":1647073982,\"xid\":5164,\"commit\":true,\"data\":{\"id\":\"f3eaf4cdf7534e47a88cdf93d19b2ee6\","
+        + "\"name\":\"wangxianghu\",\"age\":18,\"insert_time\":\"2022-03-12 
08:33:02\","
+        + "\"update_time\":\"2022-03-12 08:33:02\"}}";
+
+    // database hudi_02, table hudi_maxwell_02, insert
+    String hudi02Maxwell02Insert = 
"{\"database\":\"hudi_02\",\"table\":\"hudi_maxwell_02\",\"type\":\"insert\","
+        + 
"\"ts\":1647073916,\"xid\":4990,\"commit\":true,\"data\":{\"id\":\"9bb17f316ee8488cb107621ddf0f3cb0\","
+        + "\"name\":\"andy\",\"age\":17,\"insert_time\":\"2022-03-12 
08:31:56\","
+        + "\"update_time\":\"2022-03-12 08:31:56\"}}";
+
+    // ------------------------------------------------------------------------
+    //  Tests
+    // ------------------------------------------------------------------------
+
+    ObjectMapper mapper = new ObjectMapper();
+    TypedProperties props = new TypedProperties();
+    
props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.DATABASE_NAME_REGEX_PROP.key(),
 "hudi(_)?[0-9]{0,2}");
+    
props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.TABLE_NAME_REGEX_PROP.key(),
 "hudi_maxwell(_)?[0-9]{0,2}");
+
+    // test insert and update
+    JavaRDD<String> inputInsertAndUpdate = 
jsc().parallelize(Arrays.asList(hudiMaxwell01Insert, hudiMaxwell01Update));
+    MaxwellJsonKafkaSourcePostProcessor processor = new 
MaxwellJsonKafkaSourcePostProcessor(props);
+    
processor.process(inputInsertAndUpdate).map(mapper::readTree).foreach(record -> 
{
+      // database name should be null
+      JsonNode database = record.get("database");
+      // insert and update records should be tagged as no delete
+      boolean isDelete = 
record.get(HoodieRecord.HOODIE_IS_DELETED).booleanValue();
+
+      assertFalse(isDelete);
+      assertNull(database);

Review comment:
       Why is the database null?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to