This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new c1bb67c62 [Hotfix][GOBBLIN-1949] add option to detect malformed orc
during commit (#3818)
c1bb67c62 is described below
commit c1bb67c6240e35005c55b018360fcd95b31d8410
Author: Hanghang Nate Liu <[email protected]>
AuthorDate: Thu Nov 2 15:11:19 2023 -0700
[Hotfix][GOBBLIN-1949] add option to detect malformed orc during commit
(#3818)
* add option to detect malformed ORC during commit phase
* better logging
* address comment
* catch more generic exception
* validate ORC file after close
* move validate in between close and commit
* syntax
* whitespace
* update log
---
.../org/apache/gobblin/writer/GobblinBaseOrcWriter.java | 15 +++++++++++++++
.../apache/gobblin/writer/GobblinOrcWriterConfigs.java | 5 +++++
2 files changed, 20 insertions(+)
diff --git
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
index d8a2a353e..9f81e82d9 100644
---
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
+++
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
@@ -24,10 +24,12 @@ import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.gobblin.util.HadoopUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
@@ -61,6 +63,7 @@ public abstract class GobblinBaseOrcWriter<S, D> extends
FsDataWriter<D> {
protected int batchSize;
protected final S inputSchema;
+ private final boolean validateORCAfterClose;
private final boolean selfTuningWriter;
private int selfTuneRowsBetweenCheck;
private double rowBatchMemoryUsageFactor;
@@ -94,6 +97,7 @@ public abstract class GobblinBaseOrcWriter<S, D> extends
FsDataWriter<D> {
this.inputSchema = builder.getSchema();
this.typeDescription = getOrcSchema();
this.selfTuningWriter =
properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED,
false);
+ this.validateORCAfterClose =
properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_VALIDATE_FILE_AFTER_CLOSE,
false);
this.maxOrcBatchSize =
properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_MAX_BATCH_SIZE,
GobblinOrcWriterConfigs.DEFAULT_MAX_ORC_WRITER_BATCH_SIZE);
this.batchSize = this.selfTuningWriter ?
@@ -258,7 +262,18 @@ public abstract class GobblinBaseOrcWriter<S, D> extends
FsDataWriter<D> {
public void commit()
throws IOException {
closeInternal();
+ // Validate the ORC file after writer close. Default is false as it
introduce more load to FS and decrease the performance
+ if(this.validateORCAfterClose) {
+ try (Reader reader = OrcFile.createReader(this.stagingFile, new
OrcFile.ReaderOptions(conf))) {
+ } catch (Exception e) {
+ log.error("Found error when validating staging ORC file {} during
commit phase. "
+ + "Will delete the malformed file and terminate the commit",
this.stagingFile, e);
+ HadoopUtils.deletePath(this.fs, this.stagingFile, false);
+ throw e;
+ }
+ }
super.commit();
+
if (this.selfTuningWriter) {
properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_RECORD_SIZE,
String.valueOf(getEstimatedRecordSizeBytes()));
properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY,
diff --git
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
index b0b859f93..89621d38c 100644
---
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
+++
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
@@ -22,6 +22,11 @@ package org.apache.gobblin.writer;
*/
public class GobblinOrcWriterConfigs {
public static final String ORC_WRITER_PREFIX = "orcWriter.";
+ /**
+ * Configuration for enabling validation of ORC file to detect malformation.
If enabled, will throw exception and
+ * delete malformed ORC file during commit
+ */
+ public static final String ORC_WRITER_VALIDATE_FILE_AFTER_CLOSE =
ORC_WRITER_PREFIX + "validate.file.after.close";
/**
* Default buffer size in the ORC Writer before sending the records to the
native ORC Writer
*/