Shuaiqi Ge created SPARK-27852:
----------------------------------
Summary: One updateBytesWritten operaton may be missed in
DiskBlockObjectWriter.scala
Key: SPARK-27852
URL: https://issues.apache.org/jira/browse/SPARK-27852
Project: Spark
Issue Type: Question
Components: Spark Core
Affects Versions: 2.4.3
Reporter: Shuaiqi Ge
In *_DiskBlockObjectWriter.scala_*, there are 2 overload *_write_* functions,
the first of which executes _*updateBytesWritten*_ function while the other
doesn't. I think writeMetrics should record all the information about writing
operation, some data of which will displayed in the Spark jobs UI such as the
data size of shuffle read and shuffle write.
{code:java}
def write(key: Any, value: Any) {
if (!streamOpen) {
open()
}
objOut.writeKey(key)
objOut.writeValue(value)
recordWritten()
}
override def write(kvBytes: Array[Byte], offs: Int, len: Int): Unit = {
if (!streamOpen) {
open()
}
bs.write(kvBytes, offs, len)
}
**
* Notify the writer that a record worth of bytes has been written with
OutputStream#write.
*/
def recordWritten(): Unit = {
numRecordsWritten += 1
writeMetrics.incRecordsWritten(1)
if (numRecordsWritten % 16384 == 0) {
updateBytesWritten()
}
}
/**
* Report the number of bytes written in this writer's shuffle write metrics.
* Note that this is only valid before the underlying streams are closed.
*/
private def updateBytesWritten() {
val pos = channel.position()
writeMetrics.incBytesWritten(pos - reportedPosition)
reportedPosition = pos
}
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]