Will-Lo commented on code in PR #3818:
URL: https://github.com/apache/gobblin/pull/3818#discussion_r1380594338


##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -258,7 +261,18 @@ public void close()
   public void commit()
       throws IOException {
     closeInternal();
+    if(this.validateORCDuringCommit) {
+      try {
+        OrcFile.createReader(this.stagingFile, new 
OrcFile.ReaderOptions(conf));
+      } catch (Exception e) {
+        log.error("Found error when validating ORC file during commit phase", 
e);
+        HadoopUtils.deletePath(this.fs, this.stagingFile, false);

Review Comment:
   Do we want to do this validation here or if its being published at a folder 
level at a different step, is when we have the reader validate all the files 
going into publish? My concern is that even if we are validating and deleting 
files at the writer step, this still assumes that this writer will shut down 
using a happy path and not suddenly. If it shuts down due to some container 
error code e.g. 143 and not go through the cleanup process then it can still be 
moved to another folder later down the line?



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -258,7 +261,18 @@ public void close()
   public void commit()
       throws IOException {
     closeInternal();
+    if(this.validateORCDuringCommit) {

Review Comment:
   +1 I think the current issue is caused during the close sequence, so we need 
to destroy the file 
   ```
   Caused by: java.nio.channels.ClosedChannelException
        at 
org.apache.hadoop.hdfs.DataStreamer$LastExceptionInStreamer.throwException4Close(DataStreamer.java:324)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:152)
        at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:105)
        at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at 
org.apache.hadoop.fs.RetryingOutputStream.lambda$write$1(RetryingOutputStream.java:237)
        at org.apache.hadoop.fs.RetryPolicy.lambda$run$0(RetryPolicy.java:137)
        at org.apache.hadoop.fs.NoOpRetryPolicy.run(NoOpRetryPolicy.java:36)
        at org.apache.hadoop.fs.RetryPolicy.run(RetryPolicy.java:136)
        at 
org.apache.hadoop.fs.RetryingOutputStream.runWithRetries(RetryingOutputStream.java:301)
        at 
org.apache.hadoop.fs.RetryingOutputStream.write(RetryingOutputStream.java:234)
        at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at 
org.apache.orc.impl.PhysicalFsWriter$DirectStream.output(PhysicalFsWriter.java:314)
        at org.apache.orc.impl.OutStream.outputBuffer(OutStream.java:163)
        at org.apache.orc.impl.OutStream.flush(OutStream.java:359)
        at 
org.apache.orc.impl.PhysicalFsWriter.writeFileFooter(PhysicalFsWriter.java:457)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to