codope commented on code in PR #8520:
URL: https://github.com/apache/hudi/pull/8520#discussion_r1187029695


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java:
##########
@@ -28,32 +28,39 @@
 import org.apache.hudi.exception.HoodieException;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
+import static org.apache.spark.sql.functions.lit;
+
 import java.lang.reflect.InvocationTargetException;
 import java.util.Arrays;
+import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_WRITE_CLASS;
 import static 
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_WRITE_FAILURE_STRATEGY;
+import static 
org.apache.hudi.utilities.deltastreamer.BaseErrorTableWriter.ERROR_TABLE_CURRUPT_RECORD_COL_NAME;

Review Comment:
   So is my understanding correct that this column is never written to storage 
irrespective of whether it is an incremental source or something else? It is 
only being used for internal error table validation but never written.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java:
##########
@@ -28,32 +28,39 @@
 import org.apache.hudi.exception.HoodieException;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
+import static org.apache.spark.sql.functions.lit;
+
 import java.lang.reflect.InvocationTargetException;
 import java.util.Arrays;
+import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_WRITE_CLASS;
 import static 
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_WRITE_FAILURE_STRATEGY;
+import static 
org.apache.hudi.utilities.deltastreamer.BaseErrorTableWriter.ERROR_TABLE_CURRUPT_RECORD_COL_NAME;
 
 public final class ErrorTableUtils {
-
   public static Option<BaseErrorTableWriter> 
getErrorTableWriter(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
-      TypedProperties props, JavaSparkContext jssc, FileSystem fs) {
+                                                                 
TypedProperties props, JavaSparkContext jssc, FileSystem fs) {
     String errorTableWriterClass = 
props.getString(ERROR_TABLE_WRITE_CLASS.key());
     
ValidationUtils.checkState(!StringUtils.isNullOrEmpty(errorTableWriterClass),
         "Missing error table config " + ERROR_TABLE_WRITE_CLASS);
 
-    Class<?>[] argClassArr = new Class[] {HoodieDeltaStreamer.Config.class,
+    Class<?>[] argClassArr = new Class[]{HoodieDeltaStreamer.Config.class,
         SparkSession.class, TypedProperties.class, JavaSparkContext.class, 
FileSystem.class};
     String errMsg = "Unable to instantiate ErrorTableWriter with arguments 
type " + Arrays.toString(argClassArr);
     
ValidationUtils.checkArgument(ReflectionUtils.hasConstructor(BaseErrorTableWriter.class.getName(),
 argClassArr), errMsg);
 
     try {
       return Option.of((BaseErrorTableWriter) 
ReflectionUtils.getClass(errorTableWriterClass).getConstructor(argClassArr)
           .newInstance(cfg, sparkSession, props, jssc, fs));
-    } catch (NoSuchMethodException | InvocationTargetException | 
InstantiationException | IllegalAccessException e) {
+    } catch (NoSuchMethodException | InvocationTargetException | 
InstantiationException
+             | IllegalAccessException e) {

Review Comment:
   nit: keep all in a single line or have one exception per line for better 
readability.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java:
##########
@@ -63,4 +70,27 @@ public static 
HoodieErrorTableConfig.ErrorWriteFailureStrategy getErrorWriteFail
     String writeFailureStrategy = 
props.getString(ERROR_TABLE_WRITE_FAILURE_STRATEGY.key());
     return 
HoodieErrorTableConfig.ErrorWriteFailureStrategy.valueOf(writeFailureStrategy);
   }
+
+  /**
+   * validates for constraints on ErrorRecordColumn when ErrorTable enabled 
configs are set.
+   * @param dataset
+   */
+  public static void validate(Dataset<Row> dataset) {
+    if (!isErrorTableCorruptRecordColumnPresent(dataset)) {
+      throw new HoodieValidationException(String.format("Invalid condition, 
columnName=%s "
+              + "is not present in transformer " + "output schema", 
ERROR_TABLE_CURRUPT_RECORD_COL_NAME));
+    }
+  }
+
+  public static Dataset<Row> 
addNullValueErrorTableCorruptRecordColumn(Dataset<Row> dataset) {
+    if (!isErrorTableCorruptRecordColumnPresent(dataset)) {
+      dataset = dataset.withColumn(ERROR_TABLE_CURRUPT_RECORD_COL_NAME, 
lit(null));
+    }
+    return dataset;
+  }
+
+  private static boolean isErrorTableCorruptRecordColumnPresent(Dataset<Row> 
dataset) {
+    return Arrays.stream(dataset.columns()).collect(Collectors.toList())

Review Comment:
   I guess the intention is to avoid `collect`? Can we simply do 
`Arrays.stream(dataset.columns()).anyMatch(col -> 
col.equalsIgnoreCase(ERROR_TABLE_CURRUPT_RECORD_COL_NAME))`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to