RexXiong commented on code in PR #2130: URL: https://github.com/apache/incubator-celeborn/pull/2130#discussion_r1432731301
########## common/src/main/java/org/apache/celeborn/common/meta/FileMeta.java: ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.meta; + +import java.util.List; + +import org.apache.commons.lang3.NotImplementedException; + +public interface FileMeta { + default List<Long> getChunkOffsets() { + throw new NotImplementedException( + this.getClass().getSimpleName() + " did not implement this method"); + } + + default void setBufferSize(int bufferSize) { + throw new NotImplementedException( + this.getClass().getSimpleName() + " did not implement this method"); + } + + default void setNumSubpartitions(int numSubpartitions) { + throw new NotImplementedException( + this.getClass().getSimpleName() + " did not implement this method"); + } + + default int getBufferSize() { + throw new NotImplementedException( + this.getClass().getSimpleName() + " did not implement this method"); + } + + default int getNumSubPartitions() { + throw new NotImplementedException( + this.getClass().getSimpleName() + " did not implement this method"); + } + + default void addChunkOffset(long offset) { + throw new NotImplementedException( + this.getClass().getSimpleName() + " did not implement this method"); + } + + default void setSorted() { + throw new NotImplementedException( + this.getClass().getSimpleName() + " did not implement this method"); + } + + default boolean getSorted() { + throw new NotImplementedException( Review Comment: Can directly return false. then we can directly call in NonMemoryFileInfo ########## common/src/main/java/org/apache/celeborn/common/meta/NonMemoryFileInfo.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.meta; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.celeborn.common.identity.UserIdentifier; +import org.apache.celeborn.common.protocol.StorageInfo; +import org.apache.celeborn.common.util.Utils; + +public class NonMemoryFileInfo extends FileInfo { + private static Logger logger = LoggerFactory.getLogger(NonMemoryFileInfo.class); + private final Set<Long> streams = ConcurrentHashMap.newKeySet(); + private String filePath; + private StorageInfo.Type storageType; + private volatile long bytesFlushed; + + public NonMemoryFileInfo( + UserIdentifier userIdentifier, + boolean partitionSplitEnabled, + FileMeta fileMeta, + String filePath, + StorageInfo.Type storageType) { + super(userIdentifier, partitionSplitEnabled, fileMeta); + this.filePath = filePath; + this.storageType = storageType; + } + + public NonMemoryFileInfo( + UserIdentifier userIdentifier, + boolean partitionSplitEnabled, + FileMeta fileMeta, + String filePath) { + super(userIdentifier, partitionSplitEnabled, fileMeta); + this.filePath = filePath; + // assume that a fileinfo reloaded from pb is local file + this.storageType = StorageInfo.Type.HDD; + } + + public NonMemoryFileInfo(File file, UserIdentifier userIdentifier) { + this( + userIdentifier, + true, + new ReduceFileMeta(new ArrayList(Arrays.asList(0L))), + file.getAbsolutePath(), + StorageInfo.Type.HDD); + } + + public boolean addStream(long streamId) { + synchronized (fileMeta) { + if (fileMeta.getSorted()) { + return false; + } else { + streams.add(streamId); + return true; + } + } + } + + public void closeStream(long streamId) { + synchronized (fileMeta) { + streams.remove(streamId); + } + } + + public boolean isStreamsEmpty() { + synchronized (fileMeta) { + return streams.isEmpty(); + } + } + + public long getFileLength() { + return bytesFlushed; + } + + public long updateBytesFlushed(long numBytes) { + bytesFlushed += numBytes; + return bytesFlushed; + } + + public File getFile() { + return new File(filePath); + } + + public String getFilePath() { + return filePath; + } + + public String getSortedPath() { + return Utils.getSortedFilePath(filePath); + } + + public String getIndexPath() { + return Utils.getIndexFilePath(filePath); + } + + public Path getHdfsPath() { + return new Path(filePath); + } + + public Path getHdfsIndexPath() { + return new Path(Utils.getIndexFilePath(filePath)); + } + + public Path getHdfsSortedPath() { + return new Path(Utils.getSortedFilePath(filePath)); + } + + public Path getHdfsWriterSuccessPath() { + return new Path(Utils.getWriteSuccessFilePath(filePath)); + } + + public Path getHdfsPeerWriterSuccessPath() { + return new Path(Utils.getWriteSuccessFilePath(Utils.getPeerPath(filePath))); + } + + public void deleteAllFiles(FileSystem hdfsFs) { + if (isHdfs()) { + try { + hdfsFs.delete(getHdfsPath(), false); + hdfsFs.delete(getHdfsWriterSuccessPath(), false); + hdfsFs.delete(getHdfsIndexPath(), false); + hdfsFs.delete(getHdfsSortedPath(), false); + } catch (Exception e) { + // ignore delete exceptions because some other workers might be deleting the directory + logger.debug( + "delete HDFS file {},{},{},{} failed {}", + getHdfsPath(), + getHdfsWriterSuccessPath(), + getHdfsIndexPath(), + getHdfsSortedPath(), + e); + } + } else { + getFile().delete(); + new File(getIndexPath()).delete(); + new File(getSortedPath()).delete(); + } + } + + public String getMountPoint() { + return fileMeta.getMountPoint(); + } + + public void setMountPoint(String mountPoint) { + fileMeta.setMountPoint(mountPoint); + } + + public long getBytesFlushed() { + return bytesFlushed; + } + + public boolean isHdfs() { + return Utils.isHdfsPath(filePath); + } + + public synchronized List<Long> getChunkOffsets() { Review Comment: IMO all only m/r filemeta related method can not directly call in file info, we can add such as supportChunkRead method in file meta and return false as default , then we can check first ########## common/src/main/scala/org/apache/celeborn/common/util/Utils.scala: ########## @@ -1080,10 +1080,10 @@ object Utils extends Logging { } def getShortFormattedFileName(fileInfo: FileInfo): String = { - val parentFile = fileInfo.getFile.getParent + val parentFile = fileInfo.asInstanceOf[NonMemoryFileInfo].getFile.getParent Review Comment: we have so many path related method in nonMemoryInfo, maybe we can also put this into that later(Just remind). ########## common/src/main/java/org/apache/celeborn/common/meta/ReduceFileMeta.java: ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.meta; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +public class ReduceFileMeta implements FileMeta { + private List<Long> chunkOffsets; + private AtomicBoolean sorted = new AtomicBoolean(false); + + public ReduceFileMeta() { + this.chunkOffsets = new ArrayList<>(); + chunkOffsets.add(0L); + } + + public ReduceFileMeta(List<Long> chunkOffsets) { + this.chunkOffsets = chunkOffsets; + } + + @Override + public synchronized List<Long> getChunkOffsets() { + return chunkOffsets; + } + + @Override + public synchronized void addChunkOffset(long offset) { + chunkOffsets.add(offset); + } + + public synchronized long getLastChunkOffset() { + return chunkOffsets.get(chunkOffsets.size() - 1); + } + + @Override + public synchronized int getNumChunks() { + if (chunkOffsets.isEmpty()) { + return 0; + } else { + return chunkOffsets.size() - 1; + } + } + + @Override + public synchronized void setSorted() { + sorted.set(true); + } + + @Override + public boolean getSorted() { + return sorted.get(); + } + + public synchronized int numChunks() { + if (!chunkOffsets.isEmpty()) { + return chunkOffsets.size() - 1; + } else { + return 0; + } + } Review Comment: +1 ########## common/src/main/java/org/apache/celeborn/common/meta/ReduceFileMeta.java: ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.meta; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +public class ReduceFileMeta implements FileMeta { + private List<Long> chunkOffsets; + private AtomicBoolean sorted = new AtomicBoolean(false); + + public ReduceFileMeta() { + this.chunkOffsets = new ArrayList<>(); + chunkOffsets.add(0L); + } + + public ReduceFileMeta(List<Long> chunkOffsets) { + this.chunkOffsets = chunkOffsets; + } + + @Override + public synchronized List<Long> getChunkOffsets() { + return chunkOffsets; + } + + @Override + public synchronized void addChunkOffset(long offset) { + chunkOffsets.add(offset); + } + Review Comment: lost @Override ########## common/src/main/java/org/apache/celeborn/common/meta/FileMeta.java: ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.meta; + +import java.util.List; + +import org.apache.commons.lang3.NotImplementedException; + +public interface FileMeta { + default List<Long> getChunkOffsets() { Review Comment: IMO we can add getPartitionType in FileMeta, then we can eliminate type casts in somewhere ########## common/src/main/java/org/apache/celeborn/common/meta/NonMemoryFileInfo.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.meta; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.celeborn.common.identity.UserIdentifier; +import org.apache.celeborn.common.protocol.StorageInfo; +import org.apache.celeborn.common.util.Utils; + +public class NonMemoryFileInfo extends FileInfo { + private static Logger logger = LoggerFactory.getLogger(NonMemoryFileInfo.class); + private final Set<Long> streams = ConcurrentHashMap.newKeySet(); + private String filePath; + private StorageInfo.Type storageType; + private volatile long bytesFlushed; + + public NonMemoryFileInfo( + UserIdentifier userIdentifier, + boolean partitionSplitEnabled, + FileMeta fileMeta, + String filePath, + StorageInfo.Type storageType) { + super(userIdentifier, partitionSplitEnabled, fileMeta); + this.filePath = filePath; + this.storageType = storageType; + } + + public NonMemoryFileInfo( + UserIdentifier userIdentifier, + boolean partitionSplitEnabled, + FileMeta fileMeta, + String filePath) { + super(userIdentifier, partitionSplitEnabled, fileMeta); + this.filePath = filePath; + // assume that a fileinfo reloaded from pb is local file + this.storageType = StorageInfo.Type.HDD; + } + + public NonMemoryFileInfo(File file, UserIdentifier userIdentifier) { Review Comment: I am doubt if this constructor used anywhere? or We can delete this directly ########## worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala: ########## @@ -343,53 +362,39 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs partitionType: PartitionType, rangeReadFilter: Boolean, userIdentifier: UserIdentifier, - partitionSplitEnabled: Boolean): FileWriter = { + partitionSplitEnabled: Boolean): PartitionDataWriter = { if (healthyWorkingDirs().size <= 0 && !hasHDFSStorage) { throw new IOException("No available working dirs!") } - - val fileName = location.getFileName - var retryCount = 0 - var exception: IOException = null - val suggestedMountPoint = location.getStorageInfo.getMountPoint - while (retryCount < conf.workerCreateWriterMaxAttempts) { - val diskInfo = diskInfos.get(suggestedMountPoint) - val dirs = - if (diskInfo != null && diskInfo.status.equals(DiskStatus.HEALTHY)) { - diskInfo.dirs - } else { - logDebug(s"Disk unavailable for $suggestedMountPoint, return all healthy" + - s" working dirs. diskInfo $diskInfo") - healthyWorkingDirs() - } - if (dirs.isEmpty && hdfsFlusher.isEmpty) { - throw new IOException(s"No available disks! suggested mountPoint $suggestedMountPoint") - } - val shuffleKey = Utils.makeShuffleKey(appId, shuffleId) - if ((dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) || location.getStorageInfo.HDFSOnly()) { - val shuffleDir = - new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId") - val fileInfo = - new FileInfo( - new Path(shuffleDir, fileName).toString, - userIdentifier, - partitionType, - partitionSplitEnabled) - fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, fileInfo) - FileSystem.mkdirs(StorageManager.hadoopFs, shuffleDir, hdfsPermission) - val hdfsWriter = partitionType match { - case PartitionType.MAP => new MapPartitionFileWriter( - fileInfo, - hdfsFlusher.get, + val writer = + try { + partitionType match { + case PartitionType.MAP => new MapPartitionDataWriter( + this, + new CreateFileContext( + location, + appId, + shuffleId, + location.getFileName, + userIdentifier, + partitionType, + partitionSplitEnabled), Review Comment: May be we can first create CreateFileContext for both may/reduce partition writer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
