This is an automated email from the ASF dual-hosted git repository. ethanfeng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push: new ae4022235 [CELEBORN-2047] Support MapPartitionData on DFS ae4022235 is described below commit ae40222351cbeb1a9bdd398d461255a0739f3cac Author: SteNicholas <programg...@163.com> AuthorDate: Sat Jul 26 22:11:32 2025 +0800 [CELEBORN-2047] Support MapPartitionData on DFS ### What changes were proposed in this pull request? Support `MapPartitionData` on DFS. ### Why are the changes needed? `MapPartitionData` only supports on local, which does not support on DFS. It's recommended to support `MapPartitionData` on DFS for MapPartition mode to align the ability of ReducePartition mode. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? `WordCountTestWithHDFS`. Closes #3349 from SteNicholas/CELEBORN-2047. Authored-by: SteNicholas <programg...@163.com> Signed-off-by: mingji <fengmingxiao....@alibaba-inc.com> --- .../apache/celeborn/common/meta/DiskFileInfo.java | 4 + .../org/apache/celeborn/common/meta/FileInfo.java | 4 + .../org/apache/celeborn/common/util/Utils.scala | 8 +- project/CelebornBuild.scala | 8 +- tests/flink-it/pom.xml | 31 ++++++ .../celeborn/tests/flink/WordCountTest.scala | 45 ++++++++- .../worker/storage/DfsPartitionDataReader.java | 108 +++++++++++++++++++++ .../worker/storage/LocalPartitionDataReader.java | 101 +++++++++++++++++++ .../deploy/worker/storage/MapPartitionData.java | 15 +-- .../worker/storage/MapPartitionDataReader.java | 92 +++++------------- .../deploy/worker/storage/PartitionDataReader.java | 85 ++++++++++++++++ .../worker/storage/PartitionMetaHandler.scala | 29 +----- .../deploy/worker/storage/StorageManager.scala | 4 +- .../service/deploy/worker/storage/TierWriter.scala | 52 +++++----- .../service/deploy/MiniClusterFeature.scala | 6 +- 15 files changed, 452 insertions(+), 140 deletions(-) diff --git a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java index c7161483b..ab798eca3 100644 --- a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java @@ -164,4 +164,8 @@ public class DiskFileInfo extends FileInfo { public boolean isDFS() { return Utils.isS3Path(filePath) || Utils.isOssPath(filePath) || Utils.isHdfsPath(filePath); } + + public StorageInfo.Type getStorageType() { + return storageType; + } } diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java index e81b72936..e8511f1bf 100644 --- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java @@ -108,4 +108,8 @@ public abstract class FileInfo { } public abstract String getFilePath(); + + public boolean isReduceFileMeta() { + return isReduceFileMeta; + } } diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala index 3fd090e77..aa719cff1 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala @@ -40,6 +40,7 @@ import com.google.protobuf.{ByteString, GeneratedMessageV3} import io.netty.channel.unix.Errors.NativeIoException import org.apache.commons.lang3.SystemUtils import org.apache.commons.lang3.time.FastDateFormat +import org.apache.hadoop.fs.FSDataInputStream import org.roaringbitmap.RoaringBitmap import org.apache.celeborn.common.CelebornConf @@ -1175,12 +1176,11 @@ object Utils extends Logging { } @throws[IOException] - def checkFileIntegrity(fileChannel: FileChannel, length: Int): Unit = { - val remainingBytes = fileChannel.size - fileChannel.position + def checkFileIntegrity(remainingBytes: Long, length: Int, filePath: String): Unit = { if (remainingBytes < length) { logError( - s"File remaining bytes not not enough, remaining: ${remainingBytes}, wanted: ${length}.") - throw new RuntimeException(s"File is corrupted ${fileChannel}") + s"File remaining bytes not not enough, remaining: $remainingBytes, wanted: $length.") + throw new RuntimeException(s"File is corrupted $filePath") } } diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala index cdce5973b..760aaf8fe 100644 --- a/project/CelebornBuild.scala +++ b/project/CelebornBuild.scala @@ -268,6 +268,8 @@ object Dependencies { ExclusionRule("org.apache.httpcomponents", "httpclient"), ExclusionRule("org.slf4j", "slf4j-log4j12") ) + val hadoopAuth = "org.apache.hadoop" % "hadoop-auth" % hadoopVersion + val hadoopHdfs = "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion val picocli = "info.picocli" % "picocli" % picocliVersion @@ -1284,7 +1286,11 @@ trait FlinkClientProjects { "org.apache.flink" % "flink-runtime" % flinkVersion % "test", flinkStreamingDependency, flinkClientsDependency, - flinkRuntimeWebDependency + flinkRuntimeWebDependency, + Dependencies.hadoopCommon % "test", + Dependencies.hadoopAuth % "test", + Dependencies.hadoopHdfs % "test->test;compile->compile", + Dependencies.jerseyServer % "test", ) ++ commonUnitTestDependencies, (Test / envVars) += ("FLINK_VERSION", flinkVersion) ) diff --git a/tests/flink-it/pom.xml b/tests/flink-it/pom.xml index 8664fbe94..b4ef70ea5 100644 --- a/tests/flink-it/pom.xml +++ b/tests/flink-it/pom.xml @@ -91,5 +91,36 @@ <version>${flink.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala index cc75a73f4..fa8860efc 100644 --- a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala +++ b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala @@ -18,6 +18,7 @@ package org.apache.celeborn.tests.flink import java.io.File +import java.nio.file.Files import scala.collection.JavaConverters._ @@ -26,13 +27,17 @@ import org.apache.flink.configuration.{Configuration, ExecutionOptions} import org.apache.flink.runtime.jobgraph.JobType import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode +import org.apache.flink.util.OperatingSystem +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hdfs.MiniDFSCluster import org.scalatest.BeforeAndAfterAll import org.scalatest.funsuite.AnyFunSuite import org.apache.celeborn.common.CelebornConf -import org.apache.celeborn.common.CelebornConf.{AUTH_ENABLED, INTERNAL_PORT_ENABLED} +import org.apache.celeborn.common.CelebornConf.{ACTIVE_STORAGE_TYPES, AUTH_ENABLED, HDFS_DIR, INTERNAL_PORT_ENABLED, WORKER_STORAGE_CREATE_FILE_POLICY} import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.protocol.FallbackPolicy +import org.apache.celeborn.rest.v1.model.PartitionLocationData.StorageEnum import org.apache.celeborn.service.deploy.MiniClusterFeature import org.apache.celeborn.service.deploy.worker.Worker @@ -132,3 +137,41 @@ class WordCountTestWithAuthentication extends WordCountTestBase { override protected def getWorkerConf: Map[String, String] = authConfig override protected def getClientConf: Map[String, String] = Map(AUTH_ENABLED.key -> "true") } + +class WordCountTestWithHDFS extends WordCountTestBase { + + private var basePath: Path = _ + private var hdfsCluster: MiniDFSCluster = _ + + override protected def getMasterConf: Map[String, String] = Map() + override protected def getWorkerConf: Map[String, String] = hdfsConfig + override protected def getClientConf: Map[String, String] = hdfsConfig + + override def createWorker(map: Map[String, String]): Worker = { + super.createWorker(map, null) + } + + override def beforeAll(): Unit = { + assume(!OperatingSystem.isWindows) + val hdConf = new org.apache.hadoop.conf.Configuration() + val tmpDir = Files.createTempDirectory("celeborn-") + tmpDir.toFile.deleteOnExit() + hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpDir.toString) + hdfsCluster = new MiniDFSCluster.Builder(hdConf).build + basePath = new Path(hdfsCluster.getFileSystem.getUri.toString + "/test") + super.beforeAll() + } + + override def afterAll(): Unit = { + super.afterAll() + if (hdfsCluster != null) { + hdfsCluster.getFileSystem.delete(basePath, true) + hdfsCluster.shutdown() + } + } + + private def hdfsConfig = Map( + ACTIVE_STORAGE_TYPES.key -> StorageEnum.HDFS.getValue, + WORKER_STORAGE_CREATE_FILE_POLICY.key -> StorageEnum.HDFS.getValue, + HDFS_DIR.key -> basePath.toString) +} diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/DfsPartitionDataReader.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/DfsPartitionDataReader.java new file mode 100644 index 000000000..c461e8366 --- /dev/null +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/DfsPartitionDataReader.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.worker.storage; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import io.netty.buffer.ByteBuf; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; + +import org.apache.celeborn.common.meta.DiskFileInfo; +import org.apache.celeborn.common.protocol.StorageInfo; +import org.apache.celeborn.common.util.Utils; + +public class DfsPartitionDataReader extends PartitionDataReader { + + private final FSDataInputStream dataInputStream; + private final FSDataInputStream indexInputStream; + + public DfsPartitionDataReader( + DiskFileInfo fileInfo, ByteBuffer headerBuffer, ByteBuffer indexBuffer) throws IOException { + super(fileInfo, headerBuffer, indexBuffer); + FileSystem fileSystem = + StorageManager.hadoopFs() + .get( + fileInfo.isHdfs() + ? StorageInfo.Type.HDFS + : fileInfo.isS3() ? StorageInfo.Type.S3 : StorageInfo.Type.OSS); + this.dataInputStream = fileSystem.open(fileInfo.getDfsPath()); + this.indexInputStream = fileSystem.open(fileInfo.getDfsIndexPath()); + this.dataFileSize = fileSystem.getFileStatus(fileInfo.getDfsPath()).getLen(); + this.indexFileSize = fileSystem.getFileStatus(fileInfo.getDfsIndexPath()).getLen(); + } + + @Override + public void readIndexBuffer(long targetPosition) throws IOException { + indexInputStream.seek(targetPosition); + readHeaderOrIndexBuffer( + indexInputStream, + indexBuffer, + indexFileSize, + indexBuffer.capacity(), + fileInfo.getIndexPath()); + } + + @Override + public void position(long targetPosition) throws IOException { + dataInputStream.seek(targetPosition); + } + + @Override + public void readHeaderBuffer(int headerSize) throws IOException { + readHeaderOrIndexBuffer( + dataInputStream, headerBuffer, dataFileSize, headerSize, fileInfo.getFilePath()); + } + + @Override + public void readBufferIntoReadBuffer(ByteBuf buf, long fileSize, int length, String filePath) + throws IOException { + Utils.checkFileIntegrity(fileSize - dataInputStream.getPos(), length, filePath); + ByteBuffer tmpBuffer = ByteBuffer.allocate(length); + while (tmpBuffer.hasRemaining()) { + dataInputStream.read(tmpBuffer); + } + tmpBuffer.flip(); + buf.writeBytes(tmpBuffer); + } + + @Override + public long position() throws IOException { + return dataInputStream.getPos(); + } + + @Override + public void close() { + IOUtils.closeQuietly(dataInputStream); + IOUtils.closeQuietly(indexInputStream); + } + + private void readHeaderOrIndexBuffer( + FSDataInputStream inputStream, ByteBuffer buffer, long fileSize, int length, String filePath) + throws IOException { + Utils.checkFileIntegrity(fileSize - inputStream.getPos(), length, filePath); + buffer.clear(); + buffer.limit(length); + while (buffer.hasRemaining()) { + inputStream.read(buffer); + } + buffer.flip(); + } +} diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/LocalPartitionDataReader.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/LocalPartitionDataReader.java new file mode 100644 index 000000000..c5c47aedf --- /dev/null +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/LocalPartitionDataReader.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.worker.storage; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +import io.netty.buffer.ByteBuf; +import org.apache.commons.io.IOUtils; + +import org.apache.celeborn.common.meta.DiskFileInfo; +import org.apache.celeborn.common.util.FileChannelUtils; +import org.apache.celeborn.common.util.Utils; + +public class LocalPartitionDataReader extends PartitionDataReader { + + private final FileChannel dataFileChanel; + private final FileChannel indexFileChannel; + + public LocalPartitionDataReader( + DiskFileInfo fileInfo, ByteBuffer headerBuffer, ByteBuffer indexBuffer) throws IOException { + super(fileInfo, headerBuffer, indexBuffer); + this.dataFileChanel = FileChannelUtils.openReadableFileChannel(fileInfo.getFilePath()); + this.indexFileChannel = FileChannelUtils.openReadableFileChannel(fileInfo.getIndexPath()); + this.dataFileSize = dataFileChanel.size(); + this.indexFileSize = indexFileChannel.size(); + } + + @Override + public void readIndexBuffer(long targetPosition) throws IOException { + indexFileChannel.position(targetPosition); + readHeaderOrIndexBuffer( + indexFileChannel, + indexBuffer, + indexFileSize, + indexBuffer.capacity(), + fileInfo.getIndexPath()); + } + + @Override + public void position(long targetPosition) throws IOException { + dataFileChanel.position(targetPosition); + } + + @Override + public void readHeaderBuffer(int headerSize) throws IOException { + readHeaderOrIndexBuffer( + dataFileChanel, headerBuffer, dataFileSize, headerSize, fileInfo.getFilePath()); + } + + @Override + public void readBufferIntoReadBuffer(ByteBuf buf, long fileSize, int length, String filePath) + throws IOException { + Utils.checkFileIntegrity(fileSize - dataFileChanel.position(), length, filePath); + ByteBuffer tmpBuffer = ByteBuffer.allocate(length); + while (tmpBuffer.hasRemaining()) { + dataFileChanel.read(tmpBuffer); + } + tmpBuffer.flip(); + buf.writeBytes(tmpBuffer); + } + + @Override + public long position() throws IOException { + return dataFileChanel.position(); + } + + @Override + public void close() { + IOUtils.closeQuietly(dataFileChanel); + IOUtils.closeQuietly(indexFileChannel); + } + + private void readHeaderOrIndexBuffer( + FileChannel channel, ByteBuffer buffer, long fileSize, int length, String filePath) + throws IOException { + Utils.checkFileIntegrity(fileSize - channel.position(), length, filePath); + buffer.clear(); + buffer.limit(length); + while (buffer.hasRemaining()) { + channel.read(buffer); + } + buffer.flip(); + } +} diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java index d88370edf..66d3be67b 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java @@ -18,7 +18,6 @@ package org.apache.celeborn.service.deploy.worker.storage; import java.io.IOException; -import java.nio.channels.FileChannel; import java.util.List; import java.util.PriorityQueue; import java.util.concurrent.ConcurrentHashMap; @@ -28,13 +27,11 @@ import java.util.function.Consumer; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; -import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.celeborn.common.meta.DiskFileInfo; import org.apache.celeborn.common.meta.MapFileMeta; -import org.apache.celeborn.common.util.FileChannelUtils; import org.apache.celeborn.common.util.JavaUtils; import org.apache.celeborn.common.util.ThreadUtils; import org.apache.celeborn.service.deploy.worker.memory.BufferQueue; @@ -49,9 +46,6 @@ public class MapPartitionData implements MemoryManager.ReadBufferTargetChangeLis protected final ExecutorService readExecutor; protected final ConcurrentHashMap<Long, MapPartitionDataReader> readers = JavaUtils.newConcurrentHashMap(); - private FileChannel dataFileChanel; - private FileChannel indexChannel; - private long indexSize; private volatile boolean isReleased = false; private final BufferQueue bufferQueue = new BufferQueue(); private AtomicBoolean bufferQueueInitialized = new AtomicBoolean(false); @@ -95,9 +89,6 @@ public class MapPartitionData implements MemoryManager.ReadBufferTargetChangeLis threadsPerMountPoint, String.format("worker-map-partition-%s-reader", mapFileMeta.getMountPoint()), false)); - this.dataFileChanel = FileChannelUtils.openReadableFileChannel(diskFileInfo.getFilePath()); - this.indexChannel = FileChannelUtils.openReadableFileChannel(diskFileInfo.getIndexPath()); - this.indexSize = indexChannel.size(); MemoryManager.instance().addReadBufferTargetChangeListener(this); } @@ -174,7 +165,7 @@ public class MapPartitionData implements MemoryManager.ReadBufferTargetChangeLis } protected void openReader(MapPartitionDataReader reader) throws IOException { - reader.open(dataFileChanel, indexChannel, indexSize); + reader.open(); } public synchronized void readBuffers() { @@ -252,8 +243,8 @@ public class MapPartitionData implements MemoryManager.ReadBufferTargetChangeLis bufferQueue.release(); isReleased = true; - IOUtils.closeQuietly(dataFileChanel); - IOUtils.closeQuietly(indexChannel); + readers.values().forEach(MapPartitionDataReader::close); + readers.clear(); MemoryManager.instance().removeReadBufferTargetChangeListener(this); } diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java index 8f8a2c353..a0a5ce506 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java @@ -20,7 +20,6 @@ package org.apache.celeborn.service.deploy.worker.storage; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; -import java.nio.channels.FileChannel; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.atomic.AtomicInteger; @@ -95,13 +94,6 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader @GuardedBy("lock") protected boolean errorNotified; - private FileChannel dataFileChannel; - - // The size of the data file, it is initialized in the open method and remains unchanged - // afterward. - private long dataFileChannelSize; - private FileChannel indexFileChannel; - private Channel associatedChannel; private Runnable recycleStream; @@ -109,6 +101,8 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader protected AtomicInteger numInUseBuffers = new AtomicInteger(0); private boolean isOpen = false; + private PartitionDataReader partitionDataReader; + public MapPartitionDataReader( int startPartitionIndex, int endPartitionIndex, @@ -132,15 +126,16 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader this.readFinished = false; } - public void open(FileChannel dataFileChannel, FileChannel indexFileChannel, long indexSize) - throws IOException { + public void open() throws IOException { if (!isOpen) { - this.dataFileChannel = dataFileChannel; - this.dataFileChannelSize = dataFileChannel.size(); - this.indexFileChannel = indexFileChannel; + this.partitionDataReader = + fileInfo.isDFS() + ? new DfsPartitionDataReader(fileInfo, headerBuffer, indexBuffer) + : new LocalPartitionDataReader(fileInfo, headerBuffer, indexBuffer); // index is (offset,length) long indexRegionSize = mapFileMeta.getNumSubpartitions() * (long) INDEX_ENTRY_SIZE; - this.numRegions = Utils.checkedDownCast(indexSize / indexRegionSize); + this.numRegions = + Utils.checkedDownCast(partitionDataReader.getIndexFileSize() / indexRegionSize); updateConsumingOffset(); isOpen = true; @@ -285,44 +280,6 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader return mapFileMeta.getNumSubpartitions() * (long) INDEX_ENTRY_SIZE; } - protected void readHeaderOrIndexBuffer(FileChannel channel, ByteBuffer buffer, int length) - throws IOException { - Utils.checkFileIntegrity(channel, length); - buffer.clear(); - buffer.limit(length); - while (buffer.hasRemaining()) { - channel.read(buffer); - } - buffer.flip(); - } - - protected void readBufferIntoReadBuffer(FileChannel channel, ByteBuf buf, int length) - throws IOException { - Utils.checkFileIntegrity(channel, length); - ByteBuffer tmpBuffer = ByteBuffer.allocate(length); - while (tmpBuffer.hasRemaining()) { - channel.read(tmpBuffer); - } - tmpBuffer.flip(); - buf.writeBytes(tmpBuffer); - } - - protected int readBuffer( - String filename, FileChannel channel, ByteBuffer header, ByteBuf buffer, int headerSize) - throws IOException { - readHeaderOrIndexBuffer(channel, header, headerSize); - // header is combined of mapId(4),attemptId(4),nextBatchId(4) and total Compressed Length(4) - // we need size here,so we read length directly - int bufferLength = header.getInt(12); - if (bufferLength <= 0 || bufferLength > buffer.capacity()) { - logger.error("Incorrect buffer header, buffer length: {}.", bufferLength); - throw new FileCorruptedException("File " + filename + " is corrupted"); - } - buffer.writeBytes(header); - readBufferIntoReadBuffer(channel, buffer, bufferLength); - return bufferLength + headerSize; - } - protected void updateConsumingOffset() throws IOException { while (currentPartitionRemainingBytes == 0 && (currentDataRegion < numRegions - 1 || numRemainingPartitions > 0)) { @@ -331,10 +288,10 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader numRemainingPartitions = endPartitionIndex - startPartitionIndex + 1; // read the target index entry to the target index buffer - indexFileChannel.position( + long targetPosition = currentDataRegion * getIndexRegionSize() - + (long) startPartitionIndex * INDEX_ENTRY_SIZE); - readHeaderOrIndexBuffer(indexFileChannel, indexBuffer, indexBuffer.capacity()); + + (long) startPartitionIndex * INDEX_ENTRY_SIZE; + partitionDataReader.readIndexBuffer(targetPosition); } // get the data file offset and the data size @@ -345,13 +302,14 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader logger.debug( "readBuffer updateConsumingOffset, {}, {}, {}, {}", streamId, - dataFileChannelSize, + partitionDataReader.getDataFileSize(), dataConsumingOffset, currentPartitionRemainingBytes); // if these checks fail, the partition file must be corrupted if (dataConsumingOffset < 0 - || dataConsumingOffset + currentPartitionRemainingBytes > dataFileChannelSize + || dataConsumingOffset + currentPartitionRemainingBytes + > partitionDataReader.getDataFileSize() || currentPartitionRemainingBytes < 0) { throw new FileCorruptedException("File " + fileInfo.getFilePath() + " is corrupted"); } @@ -360,17 +318,9 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader private synchronized boolean readBuffer(ByteBuf buffer) throws IOException { try { - dataFileChannel.position(dataConsumingOffset); - - int readSize = - readBuffer( - fileInfo.getFilePath(), - dataFileChannel, - headerBuffer, - buffer, - headerBuffer.capacity()); + int readSize = partitionDataReader.readBuffer(buffer, dataConsumingOffset); currentPartitionRemainingBytes -= readSize; - dataConsumingOffset = dataFileChannel.position(); + dataConsumingOffset = partitionDataReader.position(); logger.debug( "readBuffer data: {}, {}, {}, {}, {}, {}", @@ -388,7 +338,7 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader logger.debug( "readBuffer end, {}, {}, {}, {}", streamId, - dataFileChannelSize, + partitionDataReader.getDataFileSize(), dataConsumingOffset, currentPartitionRemainingBytes); int prevDataRegion = currentDataRegion; @@ -399,7 +349,7 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader logger.debug( "readBuffer run: {}, {}, {}, {}", streamId, - dataFileChannelSize, + partitionDataReader.getDataFileSize(), dataConsumingOffset, currentPartitionRemainingBytes); return true; @@ -557,4 +507,8 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader return !isReleased && !readFinished; } } + + public void close() { + partitionDataReader.close(); + } } diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataReader.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataReader.java new file mode 100644 index 000000000..48b08d741 --- /dev/null +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataReader.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.worker.storage; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import io.netty.buffer.ByteBuf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.celeborn.common.exception.FileCorruptedException; +import org.apache.celeborn.common.meta.DiskFileInfo; + +public abstract class PartitionDataReader { + + private static final Logger LOG = LoggerFactory.getLogger(PartitionDataReader.class); + + protected final DiskFileInfo fileInfo; + protected final ByteBuffer headerBuffer; + protected final ByteBuffer indexBuffer; + + protected long dataFileSize; + protected long indexFileSize; + + public PartitionDataReader( + DiskFileInfo fileInfo, ByteBuffer headerBuffer, ByteBuffer indexBuffer) { + this.fileInfo = fileInfo; + this.headerBuffer = headerBuffer; + this.indexBuffer = indexBuffer; + } + + public abstract void readIndexBuffer(long targetPosition) throws IOException; + + public abstract void position(long targetPosition) throws IOException; + + public abstract void readHeaderBuffer(int headSize) throws IOException; + + public abstract void readBufferIntoReadBuffer( + ByteBuf buf, long fileSize, int length, String filePath) throws IOException; + + public abstract long position() throws IOException; + + public abstract void close(); + + public int readBuffer(ByteBuf buffer, long dataConsumingOffset) throws IOException { + position(dataConsumingOffset); + int headerSize = headerBuffer.capacity(); + readHeaderBuffer(headerSize); + // header is combined of mapId(4),attemptId(4),nextBatchId(4) and total Compressed Length(4) + // we need size here,so we read length directly + int bufferLength = headerBuffer.getInt(12); + if (bufferLength <= 0 || bufferLength > buffer.capacity()) { + LOG.error("Incorrect buffer header, buffer length: {}.", bufferLength); + throw new FileCorruptedException( + String.format("File %s is corrupted", fileInfo.getFilePath())); + } + buffer.writeBytes(headerBuffer); + readBufferIntoReadBuffer(buffer, dataFileSize, bufferLength, fileInfo.getFilePath()); + return bufferLength + headerSize; + } + + public long getDataFileSize() { + return dataFileSize; + } + + public long getIndexFileSize() { + return indexFileSize; + } +} diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandler.scala index 0eeaa5e70..902d720ee 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandler.scala @@ -29,8 +29,8 @@ import org.apache.hadoop.fs.FileSystem import org.roaringbitmap.RoaringBitmap import org.slf4j.{Logger, LoggerFactory} -import org.apache.celeborn.common.meta.{DiskFileInfo, FileInfo, MapFileMeta, MemoryFileInfo, ReduceFileMeta} -import org.apache.celeborn.common.protocol.{PbPushDataHandShake, PbRegionFinish, PbRegionStart, PbSegmentStart} +import org.apache.celeborn.common.meta.{DiskFileInfo, FileInfo, MapFileMeta, ReduceFileMeta} +import org.apache.celeborn.common.protocol.{PbPushDataHandShake, PbRegionFinish, PbRegionStart, PbSegmentStart, StorageInfo} import org.apache.celeborn.common.unsafe.Platform import org.apache.celeborn.common.util.FileChannelUtils @@ -94,7 +94,7 @@ trait PartitionMetaHandler { class MapPartitionMetaHandler( diskFileInfo: DiskFileInfo, notifier: FlushNotifier) extends PartitionMetaHandler { - lazy val hadoopFs: FileSystem = StorageManager.hadoopFs.get() + lazy val hadoopFs: FileSystem = StorageManager.hadoopFs.get(diskFileInfo.getStorageType) val logger: Logger = LoggerFactory.getLogger(classOf[MapPartitionMetaHandler]) val fileMeta: MapFileMeta = diskFileInfo.getFileMeta.asInstanceOf[MapFileMeta] var numSubpartitions = 0 @@ -286,28 +286,7 @@ class MapPartitionMetaHandler( } override def afterClose(): Unit = { - // TODO: force flush the index file channel in scenarios which the upstream task writes and - // downstream task reads simultaneously, such as flink hybrid shuffle - if (indexBuffer != null) { - logger.debug(s"flushIndex start:${diskFileInfo.getIndexPath}") - val startTime = System.currentTimeMillis - indexBuffer.flip - notifier.checkException() - try { - if (indexBuffer.hasRemaining) { - // map partition synchronously writes file index - if (indexChannel != null) while (indexBuffer.hasRemaining) indexChannel.write(indexBuffer) - else if (diskFileInfo.isDFS) { - val dfsStream = hadoopFs.append(diskFileInfo.getDfsIndexPath) - dfsStream.write(indexBuffer.array) - dfsStream.close() - } - } - indexBuffer.clear - } finally logger.debug( - s"flushIndex end:${diskFileInfo.getIndexPath}, " + - s"cost:${System.currentTimeMillis - startTime}") - } + flushIndex() } override def beforeWrite(bytes: ByteBuf): Unit = { diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 3070e4b94..b71110947 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -43,7 +43,7 @@ import org.apache.celeborn.common.metrics.source.{AbstractSource, ThreadPoolSour import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf} import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMode, PartitionType, StorageInfo} import org.apache.celeborn.common.quota.ResourceConsumption -import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils, DiskUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils} +import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils, CollectionUtils, DiskUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils} import org.apache.celeborn.service.deploy.worker._ import org.apache.celeborn.service.deploy.worker.memory.MemoryManager import org.apache.celeborn.service.deploy.worker.memory.MemoryManager.MemoryPressureListener @@ -819,7 +819,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs } } } - if (null != diskOperators) { + if (CollectionUtils.isNotEmpty(diskOperators)) { if (exitKind != CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) { cleanupExpiredShuffleKey(shuffleKeySet(), false) } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala index 7c3f1b966..fa0934efb 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala @@ -494,7 +494,7 @@ class DfsTierWriter( notifier: FlushNotifier, flusher: Flusher, source: AbstractSource, - hdfsFileInfo: DiskFileInfo, + dfsFileInfo: DiskFileInfo, storageType: StorageInfo.Type, partitionDataWriterContext: PartitionDataWriterContext, storageManager: StorageManager) @@ -503,7 +503,7 @@ class DfsTierWriter( metaHandler, numPendingWrites, notifier, - hdfsFileInfo, + dfsFileInfo, source, storageType, partitionDataWriterContext.getPartitionLocation.getFileName, @@ -520,21 +520,21 @@ class DfsTierWriter( var partNumber: Int = 1 this.flusherBufferSize = - if (hdfsFileInfo.isS3()) { + if (dfsFileInfo.isS3()) { conf.workerS3FlusherBufferSize - } else if (hdfsFileInfo.isOSS()) { + } else if (dfsFileInfo.isOSS()) { conf.workerOssFlusherBufferSize } else { conf.workerHdfsFlusherBufferSize } try { - hadoopFs.create(hdfsFileInfo.getDfsPath, true).close() - if (hdfsFileInfo.isS3) { + hadoopFs.create(dfsFileInfo.getDfsPath, true).close() + if (dfsFileInfo.isS3) { val uri = hadoopFs.getUri val bucketName = uri.getHost - val index = hdfsFileInfo.getFilePath.indexOf(bucketName) - val key = hdfsFileInfo.getFilePath.substring(index + bucketName.length + 1) + val index = dfsFileInfo.getFilePath.indexOf(bucketName) + val key = dfsFileInfo.getFilePath.substring(index + bucketName.length + 1) this.s3MultipartUploadHandler = TierWriterHelper.getS3MultipartUploadHandler( hadoopFs, @@ -544,7 +544,7 @@ class DfsTierWriter( conf.s3MultiplePartUploadBaseDelay, conf.s3MultiplePartUploadMaxBackoff) s3MultipartUploadHandler.startUpload() - } else if (hdfsFileInfo.isOSS) { + } else if (dfsFileInfo.isOSS) { val configuration = hadoopFs.getConf val ossEndpoint = configuration.get("fs.oss.endpoint") val ossAccessKey = configuration.get("fs.oss.accessKeyId") @@ -552,8 +552,8 @@ class DfsTierWriter( val uri = hadoopFs.getUri val bucketName = uri.getHost - val index = hdfsFileInfo.getFilePath.indexOf(bucketName) - val key = hdfsFileInfo.getFilePath.substring(index + bucketName.length + 1) + val index = dfsFileInfo.getFilePath.indexOf(bucketName) + val key = dfsFileInfo.getFilePath.substring(index + bucketName.length + 1) this.ossMultipartUploadHandler = TierWriterHelper.getOssMultipartUploadHandler( ossEndpoint, @@ -572,7 +572,7 @@ class DfsTierWriter( case ex: InterruptedException => throw new RuntimeException(ex) } - hadoopFs.create(hdfsFileInfo.getDfsPath, true).close() + hadoopFs.create(dfsFileInfo.getDfsPath, true).close() } storageManager.registerDiskFilePartitionWriter( @@ -586,9 +586,9 @@ class DfsTierWriter( override def genFlushTask(finalFlush: Boolean, keepBuffer: Boolean): FlushTask = { notifier.numPendingFlushes.incrementAndGet() - if (hdfsFileInfo.isHdfs) { - new HdfsFlushTask(flushBuffer, hdfsFileInfo.getDfsPath(), notifier, true, source) - } else if (hdfsFileInfo.isOSS) { + if (dfsFileInfo.isHdfs) { + new HdfsFlushTask(flushBuffer, dfsFileInfo.getDfsPath(), notifier, true, source) + } else if (dfsFileInfo.isOSS) { val flushTask = new OssFlushTask( flushBuffer, notifier, @@ -641,17 +641,19 @@ class DfsTierWriter( } override def closeStreams(): Unit = { - if (hadoopFs.exists(hdfsFileInfo.getDfsPeerWriterSuccessPath)) { - hadoopFs.delete(hdfsFileInfo.getDfsPath, false) + if (hadoopFs.exists(dfsFileInfo.getDfsPeerWriterSuccessPath)) { + hadoopFs.delete(dfsFileInfo.getDfsPath, false) deleted = true } else { - hadoopFs.create(hdfsFileInfo.getDfsWriterSuccessPath).close() - val indexOutputStream = hadoopFs.create(hdfsFileInfo.getDfsIndexPath) - indexOutputStream.writeInt(hdfsFileInfo.getReduceFileMeta.getChunkOffsets.size) - for (offset <- hdfsFileInfo.getReduceFileMeta.getChunkOffsets.asScala) { - indexOutputStream.writeLong(offset) + hadoopFs.create(dfsFileInfo.getDfsWriterSuccessPath).close() + if (dfsFileInfo.isReduceFileMeta) { + val indexOutputStream = hadoopFs.create(dfsFileInfo.getDfsIndexPath) + indexOutputStream.writeInt(dfsFileInfo.getReduceFileMeta.getChunkOffsets.size) + for (offset <- dfsFileInfo.getReduceFileMeta.getChunkOffsets.asScala) { + indexOutputStream.writeLong(offset) + } + indexOutputStream.close() } - indexOutputStream.close() } if (s3MultipartUploadHandler != null) { s3MultipartUploadHandler.complete() @@ -664,12 +666,12 @@ class DfsTierWriter( } override def notifyFileCommitted(): Unit = - storageManager.notifyFileInfoCommitted(shuffleKey, filename, hdfsFileInfo) + storageManager.notifyFileInfoCommitted(shuffleKey, filename, dfsFileInfo) override def closeResource(): Unit = {} override def cleanLocalOrDfsFiles(): Unit = { - hdfsFileInfo.deleteAllFiles(hadoopFs) + dfsFileInfo.deleteAllFiles(hadoopFs) } override def takeBufferInternal(): CompositeByteBuf = { diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala index 8c3a47a07..95d69fc12 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala @@ -25,6 +25,8 @@ import java.util.concurrent.locks.ReentrantLock import scala.collection.mutable +import org.apache.commons.lang3.StringUtils + import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.util.{CelebornExitKind, Utils} @@ -151,7 +153,9 @@ trait MiniClusterFeature extends Logging { def createWorker(map: Map[String, String], storageDir: String): Worker = { logInfo("start create worker for mini cluster") val conf = new CelebornConf() - conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, storageDir) + if (StringUtils.isNotEmpty(storageDir)) { + conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, storageDir) + } conf.set(CelebornConf.WORKER_DISK_MONITOR_ENABLED.key, "false") conf.set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key, "256K") conf.set(CelebornConf.WORKER_HTTP_PORT.key, s"${selectRandomPort()}")