This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4da5816  [GOBBLIN-1359] Fix the error that IOException thrown by 
writer be swallowed by Reactivex
4da5816 is described below

commit 4da5816815fc5b0c228f9ad6225394b6cfcfe484
Author: Zihan Li <[email protected]>
AuthorDate: Tue Jan 19 10:39:04 2021 -0800

    [GOBBLIN-1359] Fix the error that IOException thrown by writer be swallowed 
by Reactivex
    
    [GOBBLIN-1359] Fix the error that IOException
    thrown by writer be swallowed by Reactivex
    
    address comments
    
    add comments
    
    add comments
    
    Closes #3200 from ZihanLi58/GOBBLIN-1359
---
 .../main/java/org/apache/gobblin/writer/PartitionedDataWriter.java   | 4 +++-
 .../src/main/java/org/apache/gobblin/runtime/fork/Fork.java          | 5 ++++-
 2 files changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
index 0af84cf..3154c85 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
@@ -52,6 +52,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.Descriptor;
 import org.apache.gobblin.dataset.PartitionDescriptor;
+import org.apache.gobblin.exception.NonTransientException;
 import org.apache.gobblin.instrumented.writer.InstrumentedDataWriterDecorator;
 import 
org.apache.gobblin.instrumented.writer.InstrumentedPartitionedDataWriterDecorator;
 import org.apache.gobblin.records.ControlMessageHandler;
@@ -245,7 +246,8 @@ public class PartitionedDataWriter<S, D> extends 
WriterWrapper<D> implements Fin
       // If the write take a long time, which is 1/3 of cache expiration time, 
we fail the writer to avoid data loss
       // and further slowness on the same HDFS block
       if (timeForWriting / 1000 > this.writeTimeoutInterval ) {
-        throw new IOException(String.format("Write record took %s s, but 
threshold is %s s",
+        //Use NonTransientException to avoid writer retry, in this case, retry 
will also cause data loss
+        throw new NonTransientException(String.format("Write record took %s s, 
but threshold is %s s",
             timeForWriting / 1000, writeTimeoutInterval));
       }
     } catch (ExecutionException ee) {
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index b35a51e..d2ff89c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -227,7 +227,10 @@ public class Fork<S, D> implements Closeable, FinalState, 
RecordStreamConsumer<S
 
           r.ack();
         }
-      }, e -> logger.error("Failed to process record.", e),
+      }, e -> {
+          logger.error("Failed to process record.", e);
+          throw(new RuntimeException(e));
+          },
         () -> {
           if (this.writer.isPresent()) {
             this.writer.get().close();

Reply via email to