Will-Lo commented on code in PR #3818:
URL: https://github.com/apache/gobblin/pull/3818#discussion_r1380594338
##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -258,7 +261,18 @@ public void close()
public void commit()
throws IOException {
closeInternal();
+ if(this.validateORCDuringCommit) {
+ try {
+ OrcFile.createReader(this.stagingFile, new
OrcFile.ReaderOptions(conf));
+ } catch (Exception e) {
+ log.error("Found error when validating ORC file during commit phase",
e);
+ HadoopUtils.deletePath(this.fs, this.stagingFile, false);
Review Comment:
Do we want to do this validation here or if its being published at a folder
level at a different step, is when we have the reader validate all the files
going into publish? My concern is that even if we are validating and deleting
files at the writer step, this still assumes that this writer will shut down
using a happy path and not suddenly. If it shuts down due to some container
error code e.g. 143 and not go through the cleanup process then it can still be
moved to another folder later down the line?
##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -258,7 +261,18 @@ public void close()
public void commit()
throws IOException {
closeInternal();
+ if(this.validateORCDuringCommit) {
Review Comment:
+1 I think the current issue is caused during the close sequence, so we need
to destroy the file
```
Caused by: java.nio.channels.ClosedChannelException
at
org.apache.hadoop.hdfs.DataStreamer$LastExceptionInStreamer.throwException4Close(DataStreamer.java:324)
at
org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:152)
at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:105)
at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at
org.apache.hadoop.fs.RetryingOutputStream.lambda$write$1(RetryingOutputStream.java:237)
at org.apache.hadoop.fs.RetryPolicy.lambda$run$0(RetryPolicy.java:137)
at org.apache.hadoop.fs.NoOpRetryPolicy.run(NoOpRetryPolicy.java:36)
at org.apache.hadoop.fs.RetryPolicy.run(RetryPolicy.java:136)
at
org.apache.hadoop.fs.RetryingOutputStream.runWithRetries(RetryingOutputStream.java:301)
at
org.apache.hadoop.fs.RetryingOutputStream.write(RetryingOutputStream.java:234)
at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at
org.apache.orc.impl.PhysicalFsWriter$DirectStream.output(PhysicalFsWriter.java:314)
at org.apache.orc.impl.OutStream.outputBuffer(OutStream.java:163)
at org.apache.orc.impl.OutStream.flush(OutStream.java:359)
at
org.apache.orc.impl.PhysicalFsWriter.writeFileFooter(PhysicalFsWriter.java:457)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]