hudi-agent commented on code in PR #18914:
URL: https://github.com/apache/hudi/pull/18914#discussion_r3368787285


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ErrorTableAwareChainedTransformer.java:
##########
@@ -55,8 +55,9 @@ public Dataset<Row> apply(JavaSparkContext jsc, SparkSession 
sparkSession, Datas
     for (TransformerInfo transformerInfo : transformers) {
       Transformer transformer = transformerInfo.getTransformer();
       dataset = transformer.apply(jsc, sparkSession, dataset, 
transformerInfo.getProperties(properties, transformers));
-      // validate in every stage to ensure ErrorRecordColumn not dropped by 
one of the transformer and added by next transformer.
-      ErrorTableUtils.validate(dataset);
+      // Re-inject _corrupt_record if the transformer dropped it (e.g. custom 
JAR transformers
+      // that do column projection like ColumnFilter with mode=include).
+      dataset = 
ErrorTableUtils.addNullValueErrorTableCorruptRecordColumn(dataset);

Review Comment:
   🤖 If an earlier transformer in the chain populates `_corrupt_record` with 
actual error rows and a later transformer drops it, this silently re-injects it 
as a null column — the previously captured error data is lost without warning. 
Previously `validate()` would have surfaced this as an exception. Is that an 
acceptable tradeoff for the column-projection use case, or worth at least 
logging a warn when the column was dropped after being non-null? @yihua
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestErrorTableAwareChainedTransformer.java:
##########
@@ -86,7 +87,45 @@ public void testForErrorRecordColumn() {
     TypedProperties properties = new TypedProperties();
     properties.setProperty(ERROR_TABLE_ENABLED.key(), "true");
     ErrorTableAwareChainedTransformer transformer = new 
ErrorTableAwareChainedTransformer(Arrays.asList(t1, t2, t3));
-    assertThrows(HoodieValidationException.class, () -> 
transformer.apply(jsc(), spark(), original, properties));
+    Dataset<Row> transformed = transformer.apply(jsc(), spark(), original, 
properties);
+
+    assertArrayEquals(new String[]{"foo", 
ERROR_TABLE_CURRUPT_RECORD_COL_NAME}, transformed.columns());
+    assertEquals(2, transformed.count());
+  }
+
+  @Test
+  void testCustomColumnProjectionPreservesCorruptRecord() {
+    // Single custom transformer doing column projection — drops 
_corrupt_record.
+    Dataset<Row> original = getTestDataset();
+
+    Transformer columnFilter = (jsc, sparkSession, dataset, props) -> 
dataset.select("foo");
+    TypedProperties properties = new TypedProperties();
+    properties.setProperty(ERROR_TABLE_ENABLED.key(), "true");
+    ErrorTableAwareChainedTransformer transformer =
+        new ErrorTableAwareChainedTransformer(Arrays.asList(columnFilter));
+    Dataset<Row> transformed = transformer.apply(jsc(), spark(), original, 
properties);
+
+    assertArrayEquals(new String[]{"foo", 
ERROR_TABLE_CURRUPT_RECORD_COL_NAME}, transformed.columns());
+    assertEquals(2, transformed.count());
+    assertEquals(2, transformed.filter(new 
Column(ERROR_TABLE_CURRUPT_RECORD_COL_NAME).isNull()).count());
+  }
+
+  @Test
+  void testTransformerPreservingCorruptRecordIsNoOp() {
+    // Transformer that keeps all columns — re-injection is a no-op.
+    Dataset<Row> original = getTestDataset();
+
+    Transformer keepAll = (jsc, sparkSession, dataset, properties) -> 
dataset.withColumn("foo",
+        dataset.col("foo").cast(IntegerType));
+    TypedProperties properties = new TypedProperties();

Review Comment:
   🤖 nit: the lambda parameter here is named `properties`, while the sibling 
test `testCustomColumnProjectionPreservesCorruptRecord` uses `props` for the 
same position — could you use `props` here too for consistency? It also avoids 
the visual confusion with `TypedProperties properties` declared two lines below.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to