This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 50b6ca998 [GOBBLIN-1951] Emit GTE when deleting corrupted ORC files
(#3821)
50b6ca998 is described below
commit 50b6ca998d47e4b42b6823d0c3816e12ad89d79d
Author: Matthew Ho <[email protected]>
AuthorDate: Tue Nov 7 12:59:54 2023 -0800
[GOBBLIN-1951] Emit GTE when deleting corrupted ORC files (#3821)
* [GOBBLIN-1951] Emit GTE when deleting corrupted ORC files
This commit adds ORC file validation during the commit phase and deletes
corrupted files. It also includes a test for ORC file validation.
* Linter fixes
---
.../gobblin/writer/GobblinBaseOrcWriter.java | 47 ++++++++++++---
.../writer/InstrumentedGobblinOrcWriter.java | 6 +-
.../gobblin/writer/GobblinBaseOrcWriterTest.java | 67 ++++++++++++++++++++++
3 files changed, 106 insertions(+), 14 deletions(-)
diff --git
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
index 9f81e82d9..bb6c11aaa 100644
---
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
+++
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
@@ -24,8 +24,9 @@ import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.gobblin.util.HadoopUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
@@ -37,18 +38,28 @@ import
org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.state.ConstructState;
+import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.JobConfigurationUtils;
+
/**
* A wrapper for ORC-core writer without dependency on Hive SerDe library.
*/
@Slf4j
public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {
+ public static final String ORC_WRITER_NAMESPACE = "gobblin.orc.writer";
+ public static final String CORRUPTED_ORC_FILE_DELETION_EVENT =
"CorruptedOrcFileDeletion";
+ protected final MetricContext metricContext;
protected final OrcValueWriter<D> valueWriter;
@VisibleForTesting
VectorizedRowBatch rowBatch;
@@ -113,6 +124,7 @@ public abstract class GobblinBaseOrcWriter<S, D> extends
FsDataWriter<D> {
this.orcWriterStripeSizeBytes =
properties.getPropAsLong(OrcConf.STRIPE_SIZE.getAttribute(), (long)
OrcConf.STRIPE_SIZE.getDefaultValue());
this.converterMemoryManager = new OrcConverterMemoryManager(this.rowBatch,
properties);
this.valueWriter = getOrcValueWriter(typeDescription, this.inputSchema,
properties);
+ this.metricContext = getMetricContext();
// Track the number of other writer tasks from different datasets
ingesting on the same container
this.concurrentWriterTasks =
properties.getPropAsInt(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_CONCURRENT_TASKS,
1);
@@ -262,16 +274,11 @@ public abstract class GobblinBaseOrcWriter<S, D> extends
FsDataWriter<D> {
public void commit()
throws IOException {
closeInternal();
- // Validate the ORC file after writer close. Default is false as it
introduce more load to FS and decrease the performance
+ // Validate the ORC file after writer close. Default is false as it
introduce more load to FS from an extra read call
if(this.validateORCAfterClose) {
- try (Reader reader = OrcFile.createReader(this.stagingFile, new
OrcFile.ReaderOptions(conf))) {
- } catch (Exception e) {
- log.error("Found error when validating staging ORC file {} during
commit phase. "
- + "Will delete the malformed file and terminate the commit",
this.stagingFile, e);
- HadoopUtils.deletePath(this.fs, this.stagingFile, false);
- throw e;
- }
+ assertOrcFileIsValid(this.fs, this.stagingFile, new
OrcFile.ReaderOptions(conf), this.metricContext);
}
+
super.commit();
if (this.selfTuningWriter) {
@@ -375,4 +382,26 @@ public abstract class GobblinBaseOrcWriter<S, D> extends
FsDataWriter<D> {
this.flush();
}
}
+
+ protected MetricContext getMetricContext() {
+ return Instrumented.getMetricContext(new State(properties),
this.getClass());
+ }
+
+ @VisibleForTesting
+ @SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE",
+ justification = "Find bugs believes the eventBuilder is always null and
that there is a null check, "
+ + "but both are not true.")
+ static void assertOrcFileIsValid(FileSystem fs, Path filePath,
OrcFile.ReaderOptions readerOptions, MetricContext metricContext) throws
IOException {
+ try (Reader ignored = OrcFile.createReader(filePath, readerOptions)) {
+ } catch (Exception e) {
+ log.error("Found error when validating staging ORC file {} during the
commit phase. "
+ + "Will delete the malformed file and abort the commit by throwing
an exception", filePath, e);
+ HadoopUtils.deletePath(fs, filePath, false);
+ GobblinEventBuilder eventBuilder = new
GobblinEventBuilder(CORRUPTED_ORC_FILE_DELETION_EVENT,
GobblinBaseOrcWriter.ORC_WRITER_NAMESPACE);
+ eventBuilder.addMetadata("filePath", filePath.toString());
+ EventSubmitter.submit(metricContext, eventBuilder);
+
+ throw e;
+ }
+ }
}
diff --git
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java
index c32f1e4e3..ff2fa26b1 100644
---
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java
+++
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java
@@ -28,8 +28,6 @@ import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
@@ -39,18 +37,16 @@ import org.apache.gobblin.metrics.event.GobblinEventBuilder;
*/
@Slf4j
public class InstrumentedGobblinOrcWriter extends GobblinOrcWriter {
- MetricContext metricContext;
+
public static final String METRICS_SCHEMA_NAME = "schemaName";
public static final String METRICS_BYTES_WRITTEN = "bytesWritten";
public static final String METRICS_RECORDS_WRITTEN = "recordsWritten";
public static final String METRICS_BUFFER_RESIZES = "bufferResizes";
public static final String METRICS_BUFFER_SIZE = "bufferSize";
public static final String ORC_WRITER_METRICS_NAME = "OrcWriterMetrics";
- private static final String ORC_WRITER_NAMESPACE = "gobblin.orc.writer";
public InstrumentedGobblinOrcWriter(FsDataWriterBuilder<Schema,
GenericRecord> builder, State properties) throws IOException {
super(builder, properties);
- metricContext = Instrumented.getMetricContext(new State(properties),
this.getClass());
}
@Override
diff --git
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinBaseOrcWriterTest.java
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinBaseOrcWriterTest.java
new file mode 100644
index 000000000..813870597
--- /dev/null
+++
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinBaseOrcWriterTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.FileFormatException;
+import org.apache.orc.OrcFile;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.io.Files;
+
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+
+import static
org.apache.gobblin.writer.GobblinBaseOrcWriter.CORRUPTED_ORC_FILE_DELETION_EVENT;
+
+
+public class GobblinBaseOrcWriterTest {
+
+ @Test
+ public void testOrcValidation()
+ throws IOException {
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(conf);
+ File tmpDir = Files.createTempDir();
+ File corruptedOrcFile = new File(tmpDir, "test.orc");
+ try (FileWriter writer = new FileWriter(corruptedOrcFile)) {
+ // write a corrupted ORC file that only contains the header but without
content
+ writer.write(OrcFile.MAGIC);
+ }
+
+ OrcFile.ReaderOptions readerOptions = new OrcFile.ReaderOptions(conf);
+
+ MetricContext mockContext = Mockito.mock(MetricContext.class);
+ Path p = new Path(corruptedOrcFile.getAbsolutePath());
+ Assert.assertThrows(FileFormatException.class,
+ () -> GobblinBaseOrcWriter.assertOrcFileIsValid(fs, p, readerOptions,
mockContext));
+
+ GobblinEventBuilder eventBuilder = new
GobblinEventBuilder(CORRUPTED_ORC_FILE_DELETION_EVENT,
GobblinBaseOrcWriter.ORC_WRITER_NAMESPACE);
+ eventBuilder.addMetadata("filePath", p.toString());
+ Mockito.verify(mockContext, Mockito.times(1))
+ .submitEvent(eventBuilder.build());
+ }
+}