This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 50b6ca998 [GOBBLIN-1951] Emit GTE when deleting corrupted ORC files 
(#3821)
50b6ca998 is described below

commit 50b6ca998d47e4b42b6823d0c3816e12ad89d79d
Author: Matthew Ho <[email protected]>
AuthorDate: Tue Nov 7 12:59:54 2023 -0800

    [GOBBLIN-1951] Emit GTE when deleting corrupted ORC files (#3821)
    
    * [GOBBLIN-1951] Emit GTE when deleting corrupted ORC files
    
    This commit adds ORC file validation during the commit phase and deletes
    corrupted files. It also includes a test for ORC file validation.
    
    * Linter fixes
---
 .../gobblin/writer/GobblinBaseOrcWriter.java       | 47 ++++++++++++---
 .../writer/InstrumentedGobblinOrcWriter.java       |  6 +-
 .../gobblin/writer/GobblinBaseOrcWriterTest.java   | 67 ++++++++++++++++++++++
 3 files changed, 106 insertions(+), 14 deletions(-)

diff --git 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
index 9f81e82d9..bb6c11aaa 100644
--- 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
+++ 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
@@ -24,8 +24,9 @@ import java.util.Properties;
 import java.util.Queue;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.gobblin.util.HadoopUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.orc.OrcConf;
 import org.apache.orc.OrcFile;
@@ -37,18 +38,28 @@ import 
org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
 import org.apache.gobblin.state.ConstructState;
+import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.JobConfigurationUtils;
 
+
 /**
  * A wrapper for ORC-core writer without dependency on Hive SerDe library.
  */
 @Slf4j
 public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {
+  public static final String ORC_WRITER_NAMESPACE = "gobblin.orc.writer";
+  public static final String CORRUPTED_ORC_FILE_DELETION_EVENT = 
"CorruptedOrcFileDeletion";
 
+  protected final MetricContext metricContext;
   protected final OrcValueWriter<D> valueWriter;
   @VisibleForTesting
   VectorizedRowBatch rowBatch;
@@ -113,6 +124,7 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
     this.orcWriterStripeSizeBytes = 
properties.getPropAsLong(OrcConf.STRIPE_SIZE.getAttribute(), (long) 
OrcConf.STRIPE_SIZE.getDefaultValue());
     this.converterMemoryManager = new OrcConverterMemoryManager(this.rowBatch, 
properties);
     this.valueWriter = getOrcValueWriter(typeDescription, this.inputSchema, 
properties);
+    this.metricContext = getMetricContext();
 
     // Track the number of other writer tasks from different datasets 
ingesting on the same container
     this.concurrentWriterTasks = 
properties.getPropAsInt(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_CONCURRENT_TASKS,
 1);
@@ -262,16 +274,11 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
   public void commit()
       throws IOException {
     closeInternal();
-    // Validate the ORC file after writer close. Default is false as it 
introduce more load to FS and decrease the performance
+    // Validate the ORC file after writer close. Default is false as it 
introduce more load to FS from an extra read call
     if(this.validateORCAfterClose) {
-      try (Reader reader = OrcFile.createReader(this.stagingFile, new 
OrcFile.ReaderOptions(conf))) {
-      } catch (Exception e) {
-        log.error("Found error when validating staging ORC file {} during 
commit phase. "
-            + "Will delete the malformed file and terminate the commit", 
this.stagingFile, e);
-        HadoopUtils.deletePath(this.fs, this.stagingFile, false);
-        throw e;
-      }
+      assertOrcFileIsValid(this.fs, this.stagingFile, new 
OrcFile.ReaderOptions(conf), this.metricContext);
     }
+
     super.commit();
 
     if (this.selfTuningWriter) {
@@ -375,4 +382,26 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
       this.flush();
     }
   }
+
+  protected MetricContext getMetricContext() {
+    return Instrumented.getMetricContext(new State(properties), 
this.getClass());
+  }
+
+  @VisibleForTesting
+  @SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE",
+    justification = "Find bugs believes the eventBuilder is always null and 
that there is a null check, "
+        + "but both are not true.")
+  static void assertOrcFileIsValid(FileSystem fs, Path filePath, 
OrcFile.ReaderOptions readerOptions, MetricContext metricContext) throws 
IOException {
+    try (Reader ignored = OrcFile.createReader(filePath, readerOptions)) {
+    } catch (Exception e) {
+      log.error("Found error when validating staging ORC file {} during the 
commit phase. "
+          + "Will delete the malformed file and abort the commit by throwing 
an exception", filePath, e);
+      HadoopUtils.deletePath(fs, filePath, false);
+      GobblinEventBuilder eventBuilder = new 
GobblinEventBuilder(CORRUPTED_ORC_FILE_DELETION_EVENT, 
GobblinBaseOrcWriter.ORC_WRITER_NAMESPACE);
+      eventBuilder.addMetadata("filePath", filePath.toString());
+      EventSubmitter.submit(metricContext, eventBuilder);
+
+      throw e;
+    }
+  }
 }
diff --git 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java
 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java
index c32f1e4e3..ff2fa26b1 100644
--- 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java
+++ 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java
@@ -28,8 +28,6 @@ import com.google.common.collect.Maps;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.GobblinEventBuilder;
 
@@ -39,18 +37,16 @@ import org.apache.gobblin.metrics.event.GobblinEventBuilder;
  */
 @Slf4j
 public class InstrumentedGobblinOrcWriter extends GobblinOrcWriter {
-  MetricContext metricContext;
+
   public static final String METRICS_SCHEMA_NAME = "schemaName";
   public static final String METRICS_BYTES_WRITTEN = "bytesWritten";
   public static final String METRICS_RECORDS_WRITTEN = "recordsWritten";
   public static final String METRICS_BUFFER_RESIZES = "bufferResizes";
   public static final String METRICS_BUFFER_SIZE = "bufferSize";
   public static final String ORC_WRITER_METRICS_NAME = "OrcWriterMetrics";
-  private static final String ORC_WRITER_NAMESPACE = "gobblin.orc.writer";
 
   public InstrumentedGobblinOrcWriter(FsDataWriterBuilder<Schema, 
GenericRecord> builder, State properties) throws IOException {
     super(builder, properties);
-    metricContext = Instrumented.getMetricContext(new State(properties), 
this.getClass());
   }
 
   @Override
diff --git 
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinBaseOrcWriterTest.java
 
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinBaseOrcWriterTest.java
new file mode 100644
index 000000000..813870597
--- /dev/null
+++ 
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinBaseOrcWriterTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.FileFormatException;
+import org.apache.orc.OrcFile;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.io.Files;
+
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+
+import static 
org.apache.gobblin.writer.GobblinBaseOrcWriter.CORRUPTED_ORC_FILE_DELETION_EVENT;
+
+
+public class GobblinBaseOrcWriterTest {
+
+  @Test
+  public void testOrcValidation()
+      throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+    File tmpDir = Files.createTempDir();
+    File corruptedOrcFile = new File(tmpDir, "test.orc");
+    try (FileWriter writer = new FileWriter(corruptedOrcFile)) {
+      // write a corrupted ORC file that only contains the header but without 
content
+      writer.write(OrcFile.MAGIC);
+    }
+
+    OrcFile.ReaderOptions readerOptions = new OrcFile.ReaderOptions(conf);
+
+    MetricContext mockContext = Mockito.mock(MetricContext.class);
+    Path p = new Path(corruptedOrcFile.getAbsolutePath());
+    Assert.assertThrows(FileFormatException.class,
+        () -> GobblinBaseOrcWriter.assertOrcFileIsValid(fs, p, readerOptions, 
mockContext));
+
+    GobblinEventBuilder eventBuilder = new 
GobblinEventBuilder(CORRUPTED_ORC_FILE_DELETION_EVENT, 
GobblinBaseOrcWriter.ORC_WRITER_NAMESPACE);
+    eventBuilder.addMetadata("filePath", p.toString());
+    Mockito.verify(mockContext, Mockito.times(1))
+        .submitEvent(eventBuilder.build());
+  }
+}

Reply via email to