This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4da5816 [GOBBLIN-1359] Fix the error that IOException thrown by
writer be swallowed by Reactivex
4da5816 is described below
commit 4da5816815fc5b0c228f9ad6225394b6cfcfe484
Author: Zihan Li <[email protected]>
AuthorDate: Tue Jan 19 10:39:04 2021 -0800
[GOBBLIN-1359] Fix the error that IOException thrown by writer be swallowed
by Reactivex
[GOBBLIN-1359] Fix the error that IOException
thrown by writer be swallowed by Reactivex
address comments
add comments
add comments
Closes #3200 from ZihanLi58/GOBBLIN-1359
---
.../main/java/org/apache/gobblin/writer/PartitionedDataWriter.java | 4 +++-
.../src/main/java/org/apache/gobblin/runtime/fork/Fork.java | 5 ++++-
2 files changed, 7 insertions(+), 2 deletions(-)
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
index 0af84cf..3154c85 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
@@ -52,6 +52,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.Descriptor;
import org.apache.gobblin.dataset.PartitionDescriptor;
+import org.apache.gobblin.exception.NonTransientException;
import org.apache.gobblin.instrumented.writer.InstrumentedDataWriterDecorator;
import
org.apache.gobblin.instrumented.writer.InstrumentedPartitionedDataWriterDecorator;
import org.apache.gobblin.records.ControlMessageHandler;
@@ -245,7 +246,8 @@ public class PartitionedDataWriter<S, D> extends
WriterWrapper<D> implements Fin
// If the write take a long time, which is 1/3 of cache expiration time,
we fail the writer to avoid data loss
// and further slowness on the same HDFS block
if (timeForWriting / 1000 > this.writeTimeoutInterval ) {
- throw new IOException(String.format("Write record took %s s, but
threshold is %s s",
+ //Use NonTransientException to avoid writer retry, in this case, retry
will also cause data loss
+ throw new NonTransientException(String.format("Write record took %s s,
but threshold is %s s",
timeForWriting / 1000, writeTimeoutInterval));
}
} catch (ExecutionException ee) {
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index b35a51e..d2ff89c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -227,7 +227,10 @@ public class Fork<S, D> implements Closeable, FinalState,
RecordStreamConsumer<S
r.ack();
}
- }, e -> logger.error("Failed to process record.", e),
+ }, e -> {
+ logger.error("Failed to process record.", e);
+ throw(new RuntimeException(e));
+ },
() -> {
if (this.writer.isPresent()) {
this.writer.get().close();