waitinfuture commented on code in PR #2130:
URL:
https://github.com/apache/incubator-celeborn/pull/2130#discussion_r1438997054
##########
common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala:
##########
@@ -54,8 +54,19 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
val chunkOffsets1 = util.Arrays.asList[java.lang.Long](1000L, 2000L, 3000L)
val chunkOffsets2 = util.Arrays.asList[java.lang.Long](2000L, 4000L, 6000L)
- val fileInfo1 = new FileInfo("/tmp/1", chunkOffsets1, userIdentifier1)
- val fileInfo2 = new FileInfo("/tmp/2", chunkOffsets2, userIdentifier2)
+ val fileInfo1 = new DiskFileInfo(
+ userIdentifier1,
+ true,
+ new ReduceFileMeta(chunkOffsets1),
+ file1.getAbsolutePath)
+ // "/tmp/1", chunkOffsets1, userIdentifier1)
Review Comment:
Better to delete instead of comment out these lines.
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java:
##########
@@ -323,7 +327,8 @@ private void updateConsumingOffset() throws IOException {
if (dataConsumingOffset < 0
|| dataConsumingOffset + currentPartitionRemainingBytes >
dataFileChannel.size()
|| currentPartitionRemainingBytes < 0) {
- throw new FileCorruptedException("File " + fileInfo.getFilePath() + "
is corrupted");
+ throw new FileCorruptedException(
+ "File " + ((DiskFileInfo) fileInfo).getFilePath() + " is
corrupted");
Review Comment:
Better to change the type of `fileInfo` to `DiskFileInfo` as
`MapPartitionData` does to avoid these type conversions.
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala:
##########
@@ -230,32 +230,40 @@ class FetchHandler(
client,
rpcRequestId,
-1,
- fileInfo.numChunks(),
+ fileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getNumChunks,
isLegacy,
- fileInfo.getChunkOffsets,
+
fileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getChunkOffsets,
fileInfo.getFilePath)
} else if (fileInfo.isHdfs) {
replyStreamHandler(client, rpcRequestId, streamId, numChunks = 0,
isLegacy)
} else {
- val buffers = new FileManagedBuffers(fileInfo, transportConf)
- val fetchTimeMetrics =
storageManager.getFetchTimeMetric(fileInfo.getFile)
+ val buffers =
+ new FileManagedBuffers(fileInfo, transportConf)
+ val fetchTimeMetrics =
+ storageManager.getFetchTimeMetric(fileInfo.getFile)
chunkStreamManager.registerStream(
streamId,
shuffleKey,
buffers,
fileName,
fetchTimeMetrics)
- if (fileInfo.numChunks() == 0)
+ if (fileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getNumChunks
== 0)
Review Comment:
ditto
##########
common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala:
##########
@@ -54,8 +54,19 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
val chunkOffsets1 = util.Arrays.asList[java.lang.Long](1000L, 2000L, 3000L)
val chunkOffsets2 = util.Arrays.asList[java.lang.Long](2000L, 4000L, 6000L)
- val fileInfo1 = new FileInfo("/tmp/1", chunkOffsets1, userIdentifier1)
- val fileInfo2 = new FileInfo("/tmp/2", chunkOffsets2, userIdentifier2)
+ val fileInfo1 = new DiskFileInfo(
+ userIdentifier1,
+ true,
+ new ReduceFileMeta(chunkOffsets1),
+ file1.getAbsolutePath)
+ // "/tmp/1", chunkOffsets1, userIdentifier1)
+ // val fileInfo1 = new FileInfo("/tmp/1", chunkOffsets1, userIdentifier1)
+ val fileInfo2 = new DiskFileInfo(
+ userIdentifier2,
+ true,
+ new ReduceFileMeta(chunkOffsets2),
+ file2.getAbsolutePath)
+ // "/tmp/2", chunkOffsets2, userIdentifier2)
Review Comment:
ditto
##########
common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala:
##########
@@ -138,10 +149,11 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
val pbFileInfo = PbSerDeUtils.toPbFileInfo(fileInfo1)
val restoredFileInfo = PbSerDeUtils.fromPbFileInfo(pbFileInfo)
- assert(restoredFileInfo.getFilePath.equals(fileInfo1.getFilePath))
- assert(restoredFileInfo.getChunkOffsets.equals(fileInfo1.getChunkOffsets))
+ assert(
+ restoredFileInfo.getFilePath.equals(fileInfo1.getFilePath))
+ assert(restoredFileInfo.getChunkOffsets.equals(
+ fileInfo1.getChunkOffsets))
assert(restoredFileInfo.getUserIdentifier.equals(fileInfo1.getUserIdentifier))
-
assert(restoredFileInfo.getPartitionType.equals(fileInfo1.getPartitionType))
Review Comment:
Then instead of just delete this line, maybe better to change the assert to
something like
`restoredFileInfo.fileMeta isInstanceOf ReduceFileMeta/MapFileMeta`
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java:
##########
@@ -63,45 +65,48 @@ class MapDataPartition implements
MemoryManager.ReadBufferTargetChangeListener {
private int minBuffersToTriggerRead;
private AtomicBoolean hasReadingTask = new AtomicBoolean(false);
- public MapDataPartition(
+ public MapPartitionData(
int minReadBuffers,
int maxReadBuffers,
HashMap<String, ExecutorService> storageFetcherPool,
int threadsPerMountPoint,
- FileInfo fileInfo,
+ DiskFileInfo diskFileInfo,
Consumer<Long> recycleStream,
int minBuffersToTriggerRead)
throws IOException {
this.recycleStream = recycleStream;
- this.fileInfo = fileInfo;
+ this.diskFileInfo = diskFileInfo;
+ this.mapFileMeta = (MapFileMeta) diskFileInfo.getFileMeta();
this.minReadBuffers = minReadBuffers;
this.maxReadBuffers = maxReadBuffers;
updateBuffersTarget((this.minReadBuffers + this.maxReadBuffers) / 2 + 1);
logger.debug(
"read map partition {} with {} {}",
- fileInfo.getFilePath(),
+ ((DiskFileInfo) diskFileInfo).getFilePath(),
Review Comment:
Unnecessary type conversion.
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java:
##########
@@ -402,7 +407,7 @@ private void notifyBacklog(int backlog) {
private void notifyError(Throwable throwable) {
logger.error(
"Read file: {} error from {}, stream id {}",
- fileInfo.getFilePath(),
+ ((DiskFileInfo) fileInfo).getFilePath(),
Review Comment:
ditto
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala:
##########
@@ -31,7 +31,7 @@ import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.CelebornConf.MAX_CHUNKS_BEING_TRANSFERRED
import org.apache.celeborn.common.exception.CelebornIOException
import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.meta.{FileInfo, FileManagedBuffers}
+import org.apache.celeborn.common.meta.{DiskFileInfo, FileInfo,
FileManagedBuffers, MapFileMeta, ReduceFileMeta}
Review Comment:
`FileInfo` can be removed
##########
common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala:
##########
@@ -86,38 +86,50 @@ object PbSerDeUtils {
.setStorageType(diskInfo.storageType.getValue)
.build
- def fromPbFileInfo(pbFileInfo: PbFileInfo): FileInfo =
+ def fromPbFileInfo(pbFileInfo: PbFileInfo): DiskFileInfo =
fromPbFileInfo(pbFileInfo,
fromPbUserIdentifier(pbFileInfo.getUserIdentifier))
- def fromPbFileInfo(pbFileInfo: PbFileInfo, userIdentifier: UserIdentifier) =
- new FileInfo(
- pbFileInfo.getFilePath,
- pbFileInfo.getChunkOffsetsList,
+ def fromPbFileInfo(pbFileInfo: PbFileInfo, userIdentifier: UserIdentifier) =
{
+ val meta = Utils.toPartitionType(pbFileInfo.getPartitionType) match {
+ case PartitionType.REDUCE =>
+ new ReduceFileMeta(pbFileInfo.getChunkOffsetsList)
+ case PartitionType.MAP =>
+ new MapFileMeta(pbFileInfo.getBufferSize,
pbFileInfo.getNumSubpartitions)
+ case PartitionType.MAPGROUP =>
+ throw new NotImplementedError("Map group is not implemented")
+ }
+ new DiskFileInfo(
userIdentifier,
- Utils.toPartitionType(pbFileInfo.getPartitionType),
- pbFileInfo.getBufferSize,
- pbFileInfo.getNumSubpartitions,
- pbFileInfo.getBytesFlushed,
Review Comment:
Seems we lost `pbFileInfo.getBytesFlushed` here, will return wrong value
when `getFileLength`, `updateBytesFlushed` and `getBytesFlushed` are called.
##########
common/src/main/scala/org/apache/celeborn/common/util/Utils.scala:
##########
@@ -44,7 +44,7 @@ import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.CelebornConf.PORT_MAX_RETRY
import org.apache.celeborn.common.exception.CelebornException
import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.meta.{DiskStatus, FileInfo, WorkerInfo}
+import org.apache.celeborn.common.meta.{DiskFileInfo, DiskStatus, FileInfo,
WorkerInfo}
Review Comment:
`DiskFileInfo` and `FileInfo` are not used.
##########
common/src/main/proto/TransportMessages.proto:
##########
@@ -498,6 +499,25 @@ message PbFileInfo {
bool partitionSplitEnabled = 8;
}
+message PbMapFileMeta {
+ int32 bufferSize = 1;
+ int32 numSubPartitions = 2;
+}
+
+message PbReduceFileMeta {
+ repeated int64 chunkOffsets = 1;
+ bool sorted = 2;
+}
+
+message PbNonMemoryFile {
Review Comment:
Maybe `PbDiskFileInfo`?
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java:
##########
@@ -63,45 +65,48 @@ class MapDataPartition implements
MemoryManager.ReadBufferTargetChangeListener {
private int minBuffersToTriggerRead;
private AtomicBoolean hasReadingTask = new AtomicBoolean(false);
- public MapDataPartition(
+ public MapPartitionData(
int minReadBuffers,
int maxReadBuffers,
HashMap<String, ExecutorService> storageFetcherPool,
int threadsPerMountPoint,
- FileInfo fileInfo,
+ DiskFileInfo diskFileInfo,
Consumer<Long> recycleStream,
int minBuffersToTriggerRead)
throws IOException {
this.recycleStream = recycleStream;
- this.fileInfo = fileInfo;
+ this.diskFileInfo = diskFileInfo;
+ this.mapFileMeta = (MapFileMeta) diskFileInfo.getFileMeta();
this.minReadBuffers = minReadBuffers;
this.maxReadBuffers = maxReadBuffers;
updateBuffersTarget((this.minReadBuffers + this.maxReadBuffers) / 2 + 1);
logger.debug(
"read map partition {} with {} {}",
- fileInfo.getFilePath(),
+ ((DiskFileInfo) diskFileInfo).getFilePath(),
bufferQueue.getLocalBuffersTarget(),
- fileInfo.getBufferSize());
+ mapFileMeta.getBufferSize());
this.minBuffersToTriggerRead = minBuffersToTriggerRead;
readExecutor =
storageFetcherPool.computeIfAbsent(
- fileInfo.getMountPoint(),
+ mapFileMeta.getMountPoint(),
k ->
Executors.newFixedThreadPool(
threadsPerMountPoint,
new ThreadFactoryBuilder()
- .setNameFormat(fileInfo.getMountPoint() +
"-reader-thread-%d")
+ .setNameFormat(mapFileMeta.getMountPoint() +
"-reader-thread-%d")
.setUncaughtExceptionHandler(
(t1, t2) -> {
logger.warn("StorageFetcherPool thread:{}:{}",
t1, t2);
})
.build()));
- this.dataFileChanel =
FileChannelUtils.openReadableFileChannel(fileInfo.getFilePath());
- this.indexChannel =
FileChannelUtils.openReadableFileChannel(fileInfo.getIndexPath());
+ this.dataFileChanel =
+ FileChannelUtils.openReadableFileChannel(((DiskFileInfo)
diskFileInfo).getFilePath());
Review Comment:
ditto
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java:
##########
@@ -63,45 +65,48 @@ class MapDataPartition implements
MemoryManager.ReadBufferTargetChangeListener {
private int minBuffersToTriggerRead;
private AtomicBoolean hasReadingTask = new AtomicBoolean(false);
- public MapDataPartition(
+ public MapPartitionData(
int minReadBuffers,
int maxReadBuffers,
HashMap<String, ExecutorService> storageFetcherPool,
int threadsPerMountPoint,
- FileInfo fileInfo,
+ DiskFileInfo diskFileInfo,
Consumer<Long> recycleStream,
int minBuffersToTriggerRead)
throws IOException {
this.recycleStream = recycleStream;
- this.fileInfo = fileInfo;
+ this.diskFileInfo = diskFileInfo;
+ this.mapFileMeta = (MapFileMeta) diskFileInfo.getFileMeta();
this.minReadBuffers = minReadBuffers;
this.maxReadBuffers = maxReadBuffers;
updateBuffersTarget((this.minReadBuffers + this.maxReadBuffers) / 2 + 1);
logger.debug(
"read map partition {} with {} {}",
- fileInfo.getFilePath(),
+ ((DiskFileInfo) diskFileInfo).getFilePath(),
bufferQueue.getLocalBuffersTarget(),
- fileInfo.getBufferSize());
+ mapFileMeta.getBufferSize());
this.minBuffersToTriggerRead = minBuffersToTriggerRead;
readExecutor =
storageFetcherPool.computeIfAbsent(
- fileInfo.getMountPoint(),
+ mapFileMeta.getMountPoint(),
k ->
Executors.newFixedThreadPool(
threadsPerMountPoint,
new ThreadFactoryBuilder()
- .setNameFormat(fileInfo.getMountPoint() +
"-reader-thread-%d")
+ .setNameFormat(mapFileMeta.getMountPoint() +
"-reader-thread-%d")
.setUncaughtExceptionHandler(
(t1, t2) -> {
logger.warn("StorageFetcherPool thread:{}:{}",
t1, t2);
})
.build()));
- this.dataFileChanel =
FileChannelUtils.openReadableFileChannel(fileInfo.getFilePath());
- this.indexChannel =
FileChannelUtils.openReadableFileChannel(fileInfo.getIndexPath());
+ this.dataFileChanel =
+ FileChannelUtils.openReadableFileChannel(((DiskFileInfo)
diskFileInfo).getFilePath());
+ this.indexChannel =
+ FileChannelUtils.openReadableFileChannel(((DiskFileInfo)
diskFileInfo).getIndexPath());
Review Comment:
ditto
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala:
##########
@@ -230,32 +230,40 @@ class FetchHandler(
client,
rpcRequestId,
-1,
- fileInfo.numChunks(),
+ fileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getNumChunks,
Review Comment:
ditto
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java:
##########
@@ -334,7 +339,7 @@ private boolean readBuffer(ByteBuf buffer) throws
IOException {
int readSize =
readBuffer(
- fileInfo.getFilePath(),
+ ((DiskFileInfo) fileInfo).getFilePath(),
Review Comment:
ditto
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java:
##########
@@ -256,20 +261,20 @@ public void close() {
@Override
public String toString() {
- return "MapDataPartition{" + "fileInfo=" + fileInfo.getFilePath() + '}';
+ return "MapDataPartition{" + "fileInfo=" + ((DiskFileInfo)
diskFileInfo).getFilePath() + '}';
Review Comment:
ditto
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java:
##########
@@ -501,11 +504,14 @@ public FileInfo resolve(
IOUtils.closeQuietly(hdfsIndexStream, null);
}
}
- return new FileInfo(
+ return new DiskFileInfo(
Review Comment:
Better to provide a constructor like
```
public DiskFileInfo(
UserIdentifier userIdentifier,
FileMeta fileMeta,
String filePath)
```
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java:
##########
@@ -347,7 +352,7 @@ private boolean readBuffer(ByteBuf buffer) throws
IOException {
currentPartitionRemainingBytes,
readSize,
dataConsumingOffset,
- fileInfo.getFilePath(),
+ ((DiskFileInfo) fileInfo).getFilePath(),
Review Comment:
ditto
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:
##########
@@ -192,8 +192,8 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
}
// shuffleKey -> (fileName -> file info)
- private val fileInfos =
- JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String,
FileInfo]]()
+ private val nonMemoryFileInfos =
Review Comment:
`diskFileInfos`
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -136,12 +141,20 @@ public FileWriter(
takeBuffer();
}
- public FileInfo getFileInfo() {
- return fileInfo;
+ public FileInfo getDiskFileInfo() {
+ return diskFileInfo;
+ }
+
+ public DiskFileInfo getNonMemoryFileInfo() {
Review Comment:
`getDiskFileInfo`
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala:
##########
@@ -230,32 +230,40 @@ class FetchHandler(
client,
rpcRequestId,
-1,
- fileInfo.numChunks(),
+ fileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getNumChunks,
isLegacy,
- fileInfo.getChunkOffsets,
+
fileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getChunkOffsets,
Review Comment:
ditto
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala:
##########
@@ -204,9 +204,9 @@ class FetchHandler(
callback: RpcResponseCallback): Unit = {
workerSource.startTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey)
try {
- var fileInfo = getRawFileInfo(shuffleKey, fileName)
- fileInfo.getPartitionType match {
- case PartitionType.REDUCE =>
+ var fileInfo = getRawNonMemoryFileInfo(shuffleKey, fileName)
+ fileInfo.getFileMeta match {
+ case _: ReduceFileMeta =>
Review Comment:
Better to change to `meta: ReduceFileMeta` to use directly inside the branch
instead of repeatedly calling
`fileInfo.getFileMeta.asInstanceOf[ReduceFileMeta]`
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:
##########
@@ -343,149 +349,63 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
partitionType: PartitionType,
rangeReadFilter: Boolean,
userIdentifier: UserIdentifier,
- partitionSplitEnabled: Boolean): FileWriter = {
+ partitionSplitEnabled: Boolean): PartitionDataWriter = {
if (healthyWorkingDirs().size <= 0 && !hasHDFSStorage) {
throw new IOException("No available working dirs!")
}
-
- val fileName = location.getFileName
- var retryCount = 0
- var exception: IOException = null
- val suggestedMountPoint = location.getStorageInfo.getMountPoint
- while (retryCount < conf.workerCreateWriterMaxAttempts) {
- val diskInfo = diskInfos.get(suggestedMountPoint)
- val dirs =
- if (diskInfo != null && diskInfo.status.equals(DiskStatus.HEALTHY)) {
- diskInfo.dirs
- } else {
- logDebug(s"Disk unavailable for $suggestedMountPoint, return all
healthy" +
- s" working dirs. diskInfo $diskInfo")
- healthyWorkingDirs()
- }
- if (dirs.isEmpty && hdfsFlusher.isEmpty) {
- throw new IOException(s"No available disks! suggested mountPoint
$suggestedMountPoint")
- }
- val shuffleKey = Utils.makeShuffleKey(appId, shuffleId)
- if ((dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) ||
location.getStorageInfo.HDFSOnly()) {
- val shuffleDir =
- new Path(new Path(hdfsDir, conf.workerWorkingDir),
s"$appId/$shuffleId")
- val fileInfo =
- new FileInfo(
- new Path(shuffleDir, fileName).toString,
- userIdentifier,
- partitionType,
- partitionSplitEnabled)
- fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName,
fileInfo)
- FileSystem.mkdirs(StorageManager.hadoopFs, shuffleDir, hdfsPermission)
- val hdfsWriter = partitionType match {
- case PartitionType.MAP => new MapPartitionFileWriter(
- fileInfo,
- hdfsFlusher.get,
+ val shuffleKey = Utils.makeShuffleKey(appId, shuffleId)
+ val (flusher, diskFileInfo, workingDir) = createFile(
+ location,
+ appId,
+ shuffleId,
+ location.getFileName,
+ userIdentifier,
+ partitionType,
+ partitionSplitEnabled)
+ val writer =
+ try {
+ partitionType match {
+ case PartitionType.MAP => new MapPartitionDataWriter(
+ this,
+ diskFileInfo,
+ flusher,
workerSource,
conf,
deviceMonitor,
splitThreshold,
splitMode,
- rangeReadFilter)
- case PartitionType.REDUCE => new ReducePartitionFileWriter(
- fileInfo,
- hdfsFlusher.get,
+ rangeReadFilter,
+ shuffleKey)
+ case PartitionType.REDUCE => new ReducePartitionDataWriter(
+ this,
+ diskFileInfo,
+ flusher,
workerSource,
conf,
deviceMonitor,
splitThreshold,
splitMode,
- rangeReadFilter)
+ rangeReadFilter,
+ shuffleKey)
case _ => throw new UnsupportedOperationException(s"Not support
$partitionType yet")
}
- if (workerGracefulShutdown) {
- hdfsWriter.setStorageManager(this)
- hdfsWriter.setShuffleKey(shuffleKey)
- }
- hdfsWriters.put(fileInfo.getFilePath, hdfsWriter)
- return hdfsWriter
- } else if (dirs.nonEmpty &&
location.getStorageInfo.localDiskAvailable()) {
- val dir = dirs(getNextIndex() % dirs.size)
- val mountPoint = DeviceInfo.getMountPoint(dir.getAbsolutePath,
mountPoints)
- val shuffleDir = new File(dir, s"$appId/$shuffleId")
- val file = new File(shuffleDir, fileName)
- try {
- val fileInfo =
- new FileInfo(
- file.getAbsolutePath,
- userIdentifier,
- partitionType,
- partitionSplitEnabled)
- fileInfo.setMountPoint(mountPoint)
- fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName,
fileInfo)
- shuffleDir.mkdirs()
- if (file.exists()) {
- throw new FileAlreadyExistsException(
- s"Shuffle data file ${file.getAbsolutePath} already exists.")
- } else {
- val createFileSuccess = file.createNewFile()
- if (!createFileSuccess) {
- throw new CelebornException(
- s"Create shuffle data file ${file.getAbsolutePath} failed!")
- }
- }
- val fileWriter = partitionType match {
- case PartitionType.MAP => new MapPartitionFileWriter(
- fileInfo,
- localFlushers.get(mountPoint),
- workerSource,
- conf,
- deviceMonitor,
- splitThreshold,
- splitMode,
- rangeReadFilter)
- case PartitionType.REDUCE => new ReducePartitionFileWriter(
- fileInfo,
- localFlushers.get(mountPoint),
- workerSource,
- conf,
- deviceMonitor,
- splitThreshold,
- splitMode,
- rangeReadFilter)
- case _ => throw new UnsupportedOperationException(s"Not support
$partitionType yet")
- }
- if (workerGracefulShutdown) {
- fileWriter.setStorageManager(this)
- fileWriter.setShuffleKey(shuffleKey)
- }
- deviceMonitor.registerFileWriter(fileWriter)
- val map = workingDirWriters.computeIfAbsent(dir,
workingDirWriterListFunc)
- map.put(fileInfo.getFilePath, fileWriter)
- location.getStorageInfo.setMountPoint(mountPoint)
- logDebug(s"location $location set disk hint to
${location.getStorageInfo} ")
- return fileWriter
- } catch {
- case fe: FileAlreadyExistsException =>
- logError("Failed to create fileWriter because of existed file", fe)
- throw fe
- case t: Throwable =>
- logError(
- s"Create FileWriter for ${file.getAbsolutePath} of mount
$mountPoint " +
- s"failed, report to DeviceMonitor",
- t)
- exception = new IOException(t)
- deviceMonitor.reportNonCriticalError(
- mountPoint,
- exception,
- DiskStatus.READ_OR_WRITE_FAILURE)
- }
- } else {
- exception = new IOException("No storage available for location:" +
location.toString)
+ } catch {
+ case e: Exception =>
+ logError("Create partition data writer failed", e)
+ throw e
}
- retryCount += 1
+ if (writer.getDiskFileInfo.isInstanceOf[DiskFileInfo] &&
!(writer.getDiskFileInfo.asInstanceOf[
Review Comment:
It's guaranteed to be of type `DiskFileInfo` so unnecessary to check again
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala:
##########
@@ -230,32 +230,40 @@ class FetchHandler(
client,
rpcRequestId,
-1,
- fileInfo.numChunks(),
+ fileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getNumChunks,
isLegacy,
- fileInfo.getChunkOffsets,
+
fileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getChunkOffsets,
fileInfo.getFilePath)
} else if (fileInfo.isHdfs) {
replyStreamHandler(client, rpcRequestId, streamId, numChunks = 0,
isLegacy)
} else {
- val buffers = new FileManagedBuffers(fileInfo, transportConf)
- val fetchTimeMetrics =
storageManager.getFetchTimeMetric(fileInfo.getFile)
+ val buffers =
+ new FileManagedBuffers(fileInfo, transportConf)
+ val fetchTimeMetrics =
+ storageManager.getFetchTimeMetric(fileInfo.getFile)
chunkStreamManager.registerStream(
streamId,
shuffleKey,
buffers,
fileName,
fetchTimeMetrics)
- if (fileInfo.numChunks() == 0)
+ if (fileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getNumChunks
== 0)
logDebug(s"StreamId $streamId, fileName $fileName, mapRange " +
s"[$startIndex-$endIndex] is empty. Received from client
channel " +
s"${NettyUtils.getRemoteAddress(client.getChannel)}")
else logDebug(
- s"StreamId $streamId, fileName $fileName, numChunks
${fileInfo.numChunks()}, " +
+ s"StreamId $streamId, fileName $fileName, numChunks
${fileInfo.getFileMeta.asInstanceOf[
+ ReduceFileMeta].getNumChunks()}, " +
s"mapRange [$startIndex-$endIndex]. Received from client
channel " +
s"${NettyUtils.getRemoteAddress(client.getChannel)}")
- replyStreamHandler(client, rpcRequestId, streamId,
fileInfo.numChunks(), isLegacy)
+ replyStreamHandler(
+ client,
+ rpcRequestId,
+ streamId,
+ fileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getNumChunks,
Review Comment:
ditto
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala:
##########
@@ -74,11 +74,11 @@ class FetchHandler(
this.registered = worker.registered
}
- def getRawFileInfo(
+ def getRawNonMemoryFileInfo(
Review Comment:
`getRawDiskFileInfo`
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -1187,13 +1187,13 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
|diskFull:$diskFull,
|partitionSplitMinimumSize:$partitionSplitMinimumSize,
|splitThreshold:${fileWriter.getSplitThreshold()},
- |fileLength:${fileWriter.getFileInfo.getFileLength}
- |fileName:${fileWriter.getFileInfo.getFilePath}
+ |fileLength:${fileWriter.getDiskFileInfo.getFileLength}
+
|fileName:${fileWriter.getDiskFileInfo.asInstanceOf[DiskFileInfo].getFilePath}
Review Comment:
ditto
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala:
##########
@@ -230,32 +230,40 @@ class FetchHandler(
client,
rpcRequestId,
-1,
- fileInfo.numChunks(),
+ fileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getNumChunks,
isLegacy,
- fileInfo.getChunkOffsets,
+
fileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getChunkOffsets,
fileInfo.getFilePath)
} else if (fileInfo.isHdfs) {
replyStreamHandler(client, rpcRequestId, streamId, numChunks = 0,
isLegacy)
} else {
- val buffers = new FileManagedBuffers(fileInfo, transportConf)
- val fetchTimeMetrics =
storageManager.getFetchTimeMetric(fileInfo.getFile)
+ val buffers =
+ new FileManagedBuffers(fileInfo, transportConf)
+ val fetchTimeMetrics =
+ storageManager.getFetchTimeMetric(fileInfo.getFile)
chunkStreamManager.registerStream(
streamId,
shuffleKey,
buffers,
fileName,
fetchTimeMetrics)
- if (fileInfo.numChunks() == 0)
+ if (fileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getNumChunks
== 0)
logDebug(s"StreamId $streamId, fileName $fileName, mapRange " +
s"[$startIndex-$endIndex] is empty. Received from client
channel " +
s"${NettyUtils.getRemoteAddress(client.getChannel)}")
else logDebug(
- s"StreamId $streamId, fileName $fileName, numChunks
${fileInfo.numChunks()}, " +
+ s"StreamId $streamId, fileName $fileName, numChunks
${fileInfo.getFileMeta.asInstanceOf[
+ ReduceFileMeta].getNumChunks()}, " +
Review Comment:
ditto
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -261,7 +261,7 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
if (fileWriter.isClosed) {
logWarning(
- s"[handlePushData] FileWriter is already closed! File path
${fileWriter.getFileInfo.getFilePath}")
+ s"[handlePushData] FileWriter is already closed! File path
${fileWriter.getDiskFileInfo.asInstanceOf[DiskFileInfo].getFilePath}")
Review Comment:
Needless conversion
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -1203,8 +1203,8 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
|diskFull:$diskFull,
|partitionSplitMinimumSize:$partitionSplitMinimumSize,
|splitThreshold:${fileWriter.getSplitThreshold()},
- |fileLength:${fileWriter.getFileInfo.getFileLength},
- |fileName:${fileWriter.getFileInfo.getFilePath}
+ |fileLength:${fileWriter.getDiskFileInfo.getFileLength},
+
|fileName:${fileWriter.getDiskFileInfo.asInstanceOf[DiskFileInfo].getFilePath}
Review Comment:
ditto
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -136,12 +141,20 @@ public FileWriter(
takeBuffer();
}
- public FileInfo getFileInfo() {
- return fileInfo;
+ public FileInfo getDiskFileInfo() {
Review Comment:
Better to change the return type to `DiskFileInfo`
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -796,7 +796,7 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
if (fileWriter.isClosed) {
logWarning(
- s"[handleMapPartitionPushData] FileWriter is already closed! File path
${fileWriter.getFileInfo.getFilePath}")
+ s"[handleMapPartitionPushData] FileWriter is already closed! File path
${fileWriter.getDiskFileInfo.asInstanceOf[DiskFileInfo].getFilePath}")
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]