This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new c1bb67c62 [Hotfix][GOBBLIN-1949] add option to detect malformed orc 
during commit (#3818)
c1bb67c62 is described below

commit c1bb67c6240e35005c55b018360fcd95b31d8410
Author: Hanghang Nate Liu <[email protected]>
AuthorDate: Thu Nov 2 15:11:19 2023 -0700

    [Hotfix][GOBBLIN-1949] add option to detect malformed orc during commit 
(#3818)
    
    * add option to detect malformed ORC during commit phase
    
    * better logging
    
    * address comment
    
    * catch more generic exception
    
    * validate ORC file after close
    
    * move validate in between close and commit
    
    * syntax
    
    * whitespace
    
    * update log
---
 .../org/apache/gobblin/writer/GobblinBaseOrcWriter.java   | 15 +++++++++++++++
 .../apache/gobblin/writer/GobblinOrcWriterConfigs.java    |  5 +++++
 2 files changed, 20 insertions(+)

diff --git 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
index d8a2a353e..9f81e82d9 100644
--- 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
+++ 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
@@ -24,10 +24,12 @@ import java.util.Properties;
 import java.util.Queue;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.gobblin.util.HadoopUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.orc.OrcConf;
 import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
@@ -61,6 +63,7 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
   protected int batchSize;
   protected final S inputSchema;
 
+  private final boolean validateORCAfterClose;
   private final boolean selfTuningWriter;
   private int selfTuneRowsBetweenCheck;
   private double rowBatchMemoryUsageFactor;
@@ -94,6 +97,7 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
     this.inputSchema = builder.getSchema();
     this.typeDescription = getOrcSchema();
     this.selfTuningWriter = 
properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED,
 false);
+    this.validateORCAfterClose = 
properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_VALIDATE_FILE_AFTER_CLOSE,
 false);
     this.maxOrcBatchSize = 
properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_MAX_BATCH_SIZE,
         GobblinOrcWriterConfigs.DEFAULT_MAX_ORC_WRITER_BATCH_SIZE);
     this.batchSize = this.selfTuningWriter ?
@@ -258,7 +262,18 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
   public void commit()
       throws IOException {
     closeInternal();
+    // Validate the ORC file after writer close. Default is false as it 
introduce more load to FS and decrease the performance
+    if(this.validateORCAfterClose) {
+      try (Reader reader = OrcFile.createReader(this.stagingFile, new 
OrcFile.ReaderOptions(conf))) {
+      } catch (Exception e) {
+        log.error("Found error when validating staging ORC file {} during 
commit phase. "
+            + "Will delete the malformed file and terminate the commit", 
this.stagingFile, e);
+        HadoopUtils.deletePath(this.fs, this.stagingFile, false);
+        throw e;
+      }
+    }
     super.commit();
+
     if (this.selfTuningWriter) {
       
properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_RECORD_SIZE,
 String.valueOf(getEstimatedRecordSizeBytes()));
       
properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY,
diff --git 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
index b0b859f93..89621d38c 100644
--- 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
+++ 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
@@ -22,6 +22,11 @@ package org.apache.gobblin.writer;
  */
 public class GobblinOrcWriterConfigs {
   public static final String ORC_WRITER_PREFIX = "orcWriter.";
+  /**
+   * Configuration for enabling validation of ORC file to detect malformation. 
If enabled, will throw exception and
+   * delete malformed ORC file during commit
+   */
+  public static final String ORC_WRITER_VALIDATE_FILE_AFTER_CLOSE = 
ORC_WRITER_PREFIX + "validate.file.after.close";
   /**
    * Default buffer size in the ORC Writer before sending the records to the 
native ORC Writer
    */

Reply via email to