This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d6f546e91 [GOBBLIN-1905] Fix bug where memory estimate occurs after
writer flush, leading to a… (#3769)
d6f546e91 is described below
commit d6f546e91f16d72d66c12182d86b416d54a8db83
Author: William Lo <[email protected]>
AuthorDate: Wed Sep 13 16:36:33 2023 -0400
[GOBBLIN-1905] Fix bug where memory estimate occurs after writer flush,
leading to a… (#3769)
* Fix bug where memory estimate occurs after writer flush, leading to an
underestimate in memory
* Fix edge case where writer is not initialized before commit
* Clean up comment
* Add comment explaining some details around native orc writer memory
estimation
---
.../java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
index f7bc0c325..ae671eeb3 100644
---
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
+++
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
@@ -211,6 +211,13 @@ public abstract class GobblinBaseOrcWriter<S, D> extends
FsDataWriter<D> {
initializeOrcFileWriter();
}
orcFileWriter.addRowBatch(rowBatch);
+ // Depending on the orcFileWriter orc.rows.between.memory.check, this
may be an underestimate depending on if it flushed right after
+ // adding the rows or not. However, since the rowBatch is reset and that
buffer is cleared, this should still be safe to use as an estimate
+ // We can also explore checking to see if rowBatch size is greater than
orc.rows.between.memory check, add just the maximum amount of rows
+ // such that the native file writer is saturated but not flushed, record
that memory then flush after. But that may be overkill for the time being.
+ if (this.selfTuningWriter) {
+ this.currentOrcWriterMaxUnderlyingMemory =
Math.max(this.currentOrcWriterMaxUnderlyingMemory,
orcFileWriter.estimateMemory());
+ }
rowBatch.reset();
}
}
@@ -258,8 +265,7 @@ public abstract class GobblinBaseOrcWriter<S, D> extends
FsDataWriter<D> {
String.valueOf(this.converterMemoryManager.getConverterBufferTotalSize()));
properties.setProp(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
String.valueOf(this.orcFileWriterRowsBetweenCheck));
properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_PREVIOUS_BATCH_SIZE,
this.batchSize);
-
properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_NATIVE_WRITER_MEMORY,
- this.currentOrcWriterMaxUnderlyingMemory != -1 ?
this.currentOrcWriterMaxUnderlyingMemory : orcFileWriter.estimateMemory());
+
properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_NATIVE_WRITER_MEMORY,
this.currentOrcWriterMaxUnderlyingMemory);
}
}