This is an automated email from the ASF dual-hosted git repository. ckj pushed a commit to branch branch-0.6 in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
commit c5975a5b0704dd5f6769f4fc2f6db1b0a695bff4 Author: Xianming Lei <[email protected]> AuthorDate: Thu Oct 27 14:28:47 2022 +0800 [ISSUE-239][BUG] RssUtils#transIndexDataToSegments should consider the length of the data file (#275) ### What changes were proposed in this pull request? For issue#239, Fix inconsistent blocks when reading shuffle data. ### Why are the changes needed? This problem will cause reading shuffle data failed. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Already added UT Co-authored-by: leixianming <[email protected]> --- .../apache/uniffle/common/ShuffleIndexResult.java | 10 +- .../org/apache/uniffle/common/util/RssUtils.java | 14 +- .../uniffle/common/ShuffleIndexResultTest.java | 2 +- .../apache/uniffle/common/util/RssUtilsTest.java | 4 +- .../client/impl/grpc/ShuffleServerGrpcClient.java | 2 +- .../response/RssGetShuffleIndexResponse.java | 4 +- proto/src/main/proto/Rss.proto | 1 + .../uniffle/server/ShuffleServerGrpcService.java | 1 + .../storage/handler/impl/HdfsFileReader.java | 7 +- .../handler/impl/HdfsShuffleReadHandler.java | 13 +- .../handler/impl/LocalFileServerReadHandler.java | 10 +- .../handler/impl/HdfsShuffleReadHandlerTest.java | 139 ++++++++++++++++++ .../storage/handler/impl/LocalFileHandlerTest.java | 112 +++------------ .../handler/impl/LocalFileHandlerTestBase.java | 157 +++++++++++++++++++++ .../impl/LocalFileServerReadHandlerTest.java | 109 ++++++++++++++ 15 files changed, 475 insertions(+), 110 deletions(-) diff --git a/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java b/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java index 36dc8213..7e5d5b58 100644 --- a/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java +++ b/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java @@ -19,19 +19,25 @@ package org.apache.uniffle.common; public class ShuffleIndexResult { private final byte[] indexData; + private long dataFileLen; public ShuffleIndexResult() { - this(new byte[0]); + this(new byte[0], -1); } - public ShuffleIndexResult(byte[] bytes) { + public ShuffleIndexResult(byte[] bytes, long dataFileLen) { this.indexData = bytes; + this.dataFileLen = dataFileLen; } public byte[] getIndexData() { return indexData; } + public long getDataFileLen() { + return dataFileLen; + } + public boolean isEmpty() { return indexData == null || indexData.length == 0; } diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java index 47160204..848d6d01 100644 --- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java +++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java @@ -191,15 +191,18 @@ public class RssUtils { } byte[] indexData = shuffleIndexResult.getIndexData(); - return transIndexDataToSegments(indexData, readBufferSize); + long dataFileLen = shuffleIndexResult.getDataFileLen(); + return transIndexDataToSegments(indexData, readBufferSize, dataFileLen); } - private static List<ShuffleDataSegment> transIndexDataToSegments(byte[] indexData, int readBufferSize) { + private static List<ShuffleDataSegment> transIndexDataToSegments(byte[] indexData, + int readBufferSize, long dataFileLen) { ByteBuffer byteBuffer = ByteBuffer.wrap(indexData); List<BufferSegment> bufferSegments = Lists.newArrayList(); List<ShuffleDataSegment> dataFileSegments = Lists.newArrayList(); int bufferOffset = 0; long fileOffset = -1; + long totalLength = 0; while (byteBuffer.hasRemaining()) { try { @@ -218,6 +221,13 @@ public class RssUtils { bufferSegments.add(new BufferSegment(blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId)); bufferOffset += length; + totalLength += length; + + // If ShuffleServer is flushing the file at this time, the length in the index file record may be greater + // than the length in the actual data file, and it needs to be returned at this time to avoid EOFException + if (dataFileLen != -1 && totalLength >= dataFileLen) { + break; + } if (bufferOffset >= readBufferSize) { ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments); diff --git a/common/src/test/java/org/apache/uniffle/common/ShuffleIndexResultTest.java b/common/src/test/java/org/apache/uniffle/common/ShuffleIndexResultTest.java index 74c7c0d3..349e5a88 100644 --- a/common/src/test/java/org/apache/uniffle/common/ShuffleIndexResultTest.java +++ b/common/src/test/java/org/apache/uniffle/common/ShuffleIndexResultTest.java @@ -26,7 +26,7 @@ public class ShuffleIndexResultTest { @Test public void testEmpty() { assertTrue(new ShuffleIndexResult().isEmpty()); - assertTrue(new ShuffleIndexResult(null).isEmpty()); + assertTrue(new ShuffleIndexResult(null, -1).isEmpty()); } } diff --git a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java index c67bf482..4c1f5a8c 100644 --- a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java +++ b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java @@ -137,7 +137,7 @@ public class RssUtilsTest { } byte[] data = byteBuffer.array(); - shuffleDataSegments = RssUtils.transIndexDataToSegments(new ShuffleIndexResult(data), readBufferSize); + shuffleDataSegments = RssUtils.transIndexDataToSegments(new ShuffleIndexResult(data, -1), readBufferSize); assertEquals(expectedTotalSegmentNum, shuffleDataSegments.size()); assertEquals(0, shuffleDataSegments.get(0).getOffset()); @@ -158,7 +158,7 @@ public class RssUtilsTest { data = incompleteByteBuffer.array(); // It should throw exception try { - RssUtils.transIndexDataToSegments(new ShuffleIndexResult(data), readBufferSize); + RssUtils.transIndexDataToSegments(new ShuffleIndexResult(data, -1), readBufferSize); fail(); } catch (Exception e) { // ignore diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java index bfebd622..a44a4e7d 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java @@ -544,7 +544,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer switch (statusCode) { case SUCCESS: response = new RssGetShuffleIndexResponse( - ResponseStatusCode.SUCCESS, rpcResponse.getIndexData().toByteArray()); + ResponseStatusCode.SUCCESS, rpcResponse.getIndexData().toByteArray(), rpcResponse.getDataFileLen()); break; default: diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java index 74ddb826..8884271c 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java @@ -22,9 +22,9 @@ import org.apache.uniffle.common.ShuffleIndexResult; public class RssGetShuffleIndexResponse extends ClientResponse { private final ShuffleIndexResult shuffleIndexResult; - public RssGetShuffleIndexResponse(ResponseStatusCode statusCode, byte[] data) { + public RssGetShuffleIndexResponse(ResponseStatusCode statusCode, byte[] data, long dataFileLen) { super(statusCode); - this.shuffleIndexResult = new ShuffleIndexResult(data); + this.shuffleIndexResult = new ShuffleIndexResult(data, dataFileLen); } public ShuffleIndexResult getShuffleIndexResult() { diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto index 26cd8b51..1f034c08 100644 --- a/proto/src/main/proto/Rss.proto +++ b/proto/src/main/proto/Rss.proto @@ -110,6 +110,7 @@ message GetLocalShuffleIndexResponse { bytes indexData = 1; StatusCode status = 2; string retMsg = 3; + int64 dataFileLen = 4; } message ReportShuffleResultRequest { diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index 0501deca..df5719f7 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -532,6 +532,7 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase { + " bytes with {}", readTime, data.length, requestInfo); builder.setIndexData(UnsafeByteOperations.unsafeWrap(data)); + builder.setDataFileLen(shuffleIndexResult.getDataFileLen()); reply = builder.build(); } catch (Exception e) { status = StatusCode.INTERNAL_ERROR; diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java index a6414114..38d6439e 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java @@ -37,6 +37,7 @@ public class HdfsFileReader implements FileReader, Closeable { private Path path; private Configuration hadoopConf; private FSDataInputStream fsDataInputStream; + private FileSystem fileSystem; public HdfsFileReader(Path path, Configuration hadoopConf) throws Exception { this.path = path; @@ -45,7 +46,7 @@ public class HdfsFileReader implements FileReader, Closeable { } private void createStream() throws Exception { - FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(path, hadoopConf); + fileSystem = HadoopFilesystemProvider.getFilesystem(path, hadoopConf); if (!fileSystem.isFile(path)) { String msg = path + " don't exist or is not a file."; @@ -92,4 +93,8 @@ public class HdfsFileReader implements FileReader, Closeable { public Path getPath() { return path; } + + public long getFileLen() throws IOException { + return fileSystem.getFileStatus(path).getLen(); + } } diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java index f91e27f4..af94723f 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java @@ -71,15 +71,15 @@ public class HdfsShuffleReadHandler extends DataSkippableReadHandler { System.arraycopy(indexData, 0, indexNewData, 0, expectedLen); indexData = indexNewData; } + long dateFileLen = getDataFileLen(); LOG.info("Read index files {}.index for {} ms", filePrefix, System.currentTimeMillis() - start); - return new ShuffleIndexResult(indexData); + return new ShuffleIndexResult(indexData, dateFileLen); } catch (Exception e) { LOG.info("Fail to read index files {}.index", filePrefix, e); } return new ShuffleIndexResult(); } - @Override protected ShuffleDataResult readShuffleData(ShuffleDataSegment shuffleDataSegment) { // Here we make an assumption that the rest of the file is corrupted, if an unexpected data is read. int expectedLength = shuffleDataSegment.getLength(); @@ -115,6 +115,15 @@ public class HdfsShuffleReadHandler extends DataSkippableReadHandler { return data; } + private long getDataFileLen() { + try { + return dataReader.getFileLen(); + } catch (IOException ioException) { + LOG.error("getDataFileLen failed for " + ShuffleStorageUtils.generateDataFileName(filePrefix), ioException); + return -1; + } + } + public synchronized void close() { try { dataReader.close(); diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java index 1eeb75bc..19f5bc5b 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java @@ -139,14 +139,16 @@ public class LocalFileServerReadHandler implements ServerReadHandler { int indexNum = 0; int len = 0; try (LocalFileReader reader = createFileReader(indexFileName)) { - long fileSize = new File(indexFileName).length(); - indexNum = (int) (fileSize / FileBasedShuffleSegment.SEGMENT_SIZE); + long indexFileSize = new File(indexFileName).length(); + indexNum = (int) (indexFileSize / FileBasedShuffleSegment.SEGMENT_SIZE); len = indexNum * FileBasedShuffleSegment.SEGMENT_SIZE; - if (fileSize != len) { + if (indexFileSize != len) { LOG.warn("Maybe the index file: {} is being written due to the shuffle-buffer flushing.", indexFileName); } byte[] indexData = reader.read(0, len); - return new ShuffleIndexResult(indexData); + // get dataFileSize for read segment generation in DataSkippableReadHandler#readShuffleData + long dataFileSize = new File(dataFileName).length(); + return new ShuffleIndexResult(indexData, dataFileSize); } catch (Exception e) { LOG.error("Fail to read index file {} indexNum {} len {}", indexFileName, indexNum, len); diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandlerTest.java index 8c5dd7a7..cd2df454 100644 --- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandlerTest.java +++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandlerTest.java @@ -17,10 +17,15 @@ package org.apache.uniffle.storage.handler.impl; +import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; @@ -30,8 +35,12 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap; import org.apache.uniffle.common.BufferSegment; import org.apache.uniffle.common.ShuffleDataResult; +import org.apache.uniffle.common.ShufflePartitionedBlock; +import org.apache.uniffle.common.util.ChecksumUtils; +import org.apache.uniffle.common.util.Constants; import org.apache.uniffle.storage.HdfsShuffleHandlerTestBase; import org.apache.uniffle.storage.HdfsTestBase; +import org.apache.uniffle.storage.common.FileBasedShuffleSegment; import org.apache.uniffle.storage.util.ShuffleStorageUtils; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -99,4 +108,134 @@ public class HdfsShuffleReadHandlerTest extends HdfsTestBase { public void test() { createAndRunCases(HDFS_URI, conf, StringUtils.EMPTY); } + + @Test + public void testDataInconsistent() { + + try { + String basePath = HDFS_URI + "HdfsShuffleFileReadHandlerTest#testDataInconsistent"; + TestHdfsShuffleWriteHandler writeHandler = + new TestHdfsShuffleWriteHandler( + "appId", + 0, + 1, + 1, + basePath, + "test", + conf, + StringUtils.EMPTY); + + Map<Long, byte[]> expectedData = Maps.newHashMap(); + int totalBlockNum = 0; + int expectTotalBlockNum = 6; + int blockSize = 7; + int taskAttemptId = 0; + + // write expectTotalBlockNum - 1 complete block + HdfsShuffleHandlerTestBase.writeTestData(writeHandler, expectTotalBlockNum - 1, + blockSize, taskAttemptId, expectedData); + + // write 1 incomplete block , which only write index file + List<ShufflePartitionedBlock> blocks = Lists.newArrayList(); + byte[] buf = new byte[blockSize]; + new Random().nextBytes(buf); + long blockId = (expectTotalBlockNum + << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH)) + + taskAttemptId; + blocks.add(new ShufflePartitionedBlock(blockSize, blockSize, ChecksumUtils.getCrc32(buf), blockId, + taskAttemptId, buf)); + writeHandler.writeIndex(blocks); + + int readBufferSize = 13; + int total = HdfsShuffleHandlerTestBase.calcExpectedSegmentNum(expectTotalBlockNum, blockSize, readBufferSize); + Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf(); + Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf(); + expectedData.forEach((id, block) -> expectBlockIds.addLong(id)); + String fileNamePrefix = ShuffleStorageUtils.getFullShuffleDataFolder(basePath, + ShuffleStorageUtils.getShuffleDataPathWithRange("appId", + 0, 1, 1, 10)) + "/test_0"; + HdfsShuffleReadHandler handler = + new HdfsShuffleReadHandler("appId", 0, 1, fileNamePrefix, + readBufferSize, expectBlockIds, processBlockIds, conf); + + Set<Long> actualBlockIds = Sets.newHashSet(); + for (int i = 0; i < total; ++i) { + ShuffleDataResult shuffleDataResult = handler.readShuffleData(); + totalBlockNum += shuffleDataResult.getBufferSegments().size(); + HdfsShuffleHandlerTestBase.checkData(shuffleDataResult, expectedData); + for (BufferSegment bufferSegment : shuffleDataResult.getBufferSegments()) { + actualBlockIds.add(bufferSegment.getBlockId()); + } + } + + assertNull(handler.readShuffleData()); + assertEquals( + total, + handler.getShuffleDataSegments().size()); + // The last block cannot be read, only the index is generated + assertEquals(expectTotalBlockNum - 1, totalBlockNum); + assertEquals(expectedData.keySet(), actualBlockIds); + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + static class TestHdfsShuffleWriteHandler extends HdfsShuffleWriteHandler { + + private Configuration hadoopConf; + private Lock writeLock = new ReentrantLock(); + private String basePath; + private String fileNamePrefix; + private int failTimes = 0; + + TestHdfsShuffleWriteHandler( + String appId, + int shuffleId, + int startPartition, + int endPartition, + String storageBasePath, + String fileNamePrefix, + Configuration hadoopConf, + String user) throws Exception { + super(appId, shuffleId, startPartition, endPartition, storageBasePath, fileNamePrefix, hadoopConf, user); + this.hadoopConf = hadoopConf; + this.fileNamePrefix = fileNamePrefix; + this.basePath = ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePath, + ShuffleStorageUtils.getShuffleDataPath(appId, shuffleId, startPartition, endPartition)); + } + + + // only write index file + public void writeIndex( + List<ShufflePartitionedBlock> shuffleBlocks) throws IOException, IllegalStateException { + HdfsFileWriter indexWriter = null; + writeLock.lock(); + try { + try { + String indexFileName = ShuffleStorageUtils.generateIndexFileName(fileNamePrefix + "_" + failTimes); + indexWriter = createWriter(indexFileName); + for (ShufflePartitionedBlock block : shuffleBlocks) { + long blockId = block.getBlockId(); + long crc = block.getCrc(); + long startOffset = indexWriter.nextOffset(); + + FileBasedShuffleSegment segment = new FileBasedShuffleSegment( + blockId, startOffset, block.getLength(), block.getUncompressLength(), crc, block.getTaskAttemptId()); + indexWriter.writeIndex(segment); + } + } catch (Exception e) { + failTimes++; + throw new RuntimeException(e); + } finally { + if (indexWriter != null) { + indexWriter.close(); + } + } + } finally { + writeLock.unlock(); + } + } + } } diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTest.java index 83df32db..a0c0043f 100644 --- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTest.java +++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTest.java @@ -21,37 +21,24 @@ import java.io.File; import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.io.Files; import org.junit.jupiter.api.Test; -import org.apache.uniffle.common.BufferSegment; import org.apache.uniffle.common.ShuffleDataResult; -import org.apache.uniffle.common.ShuffleDataSegment; import org.apache.uniffle.common.ShuffleIndexResult; -import org.apache.uniffle.common.ShufflePartitionedBlock; import org.apache.uniffle.common.config.RssBaseConf; -import org.apache.uniffle.common.util.ChecksumUtils; -import org.apache.uniffle.common.util.RssUtils; -import org.apache.uniffle.storage.handler.api.ServerReadHandler; -import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler; import org.apache.uniffle.storage.util.ShuffleStorageUtils; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; public class LocalFileHandlerTest { - private static AtomicLong ATOMIC_LONG = new AtomicLong(0L); - @Test public void writeTest() throws Exception { File tmpDir = Files.createTempDir(); @@ -76,15 +63,15 @@ public class LocalFileHandlerTest { final Set<Long> expectedBlockIds1 = Sets.newHashSet(); final Set<Long> expectedBlockIds2 = Sets.newHashSet(); - writeTestData(writeHandler1, 1, 32, expectedData, expectedBlockIds1); - writeTestData(writeHandler1, 2, 32, expectedData, expectedBlockIds1); - writeTestData(writeHandler1, 3, 32, expectedData, expectedBlockIds1); - writeTestData(writeHandler1, 4, 32, expectedData, expectedBlockIds1); + LocalFileHandlerTestBase.writeTestData(writeHandler1, 1, 32, expectedData, expectedBlockIds1); + LocalFileHandlerTestBase.writeTestData(writeHandler1, 2, 32, expectedData, expectedBlockIds1); + LocalFileHandlerTestBase.writeTestData(writeHandler1, 3, 32, expectedData, expectedBlockIds1); + LocalFileHandlerTestBase.writeTestData(writeHandler1, 4, 32, expectedData, expectedBlockIds1); - writeTestData(writeHandler2, 3, 32, expectedData, expectedBlockIds2); - writeTestData(writeHandler2, 3, 32, expectedData, expectedBlockIds2); - writeTestData(writeHandler2, 2, 32, expectedData, expectedBlockIds2); - writeTestData(writeHandler2, 1, 32, expectedData, expectedBlockIds2); + LocalFileHandlerTestBase.writeTestData(writeHandler2, 3, 32, expectedData, expectedBlockIds2); + LocalFileHandlerTestBase.writeTestData(writeHandler2, 3, 32, expectedData, expectedBlockIds2); + LocalFileHandlerTestBase.writeTestData(writeHandler2, 2, 32, expectedData, expectedBlockIds2); + LocalFileHandlerTestBase.writeTestData(writeHandler2, 1, 32, expectedData, expectedBlockIds2); RssBaseConf conf = new RssBaseConf(); conf.setString("rss.storage.basePath", dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath()); @@ -93,21 +80,22 @@ public class LocalFileHandlerTest { LocalFileServerReadHandler readHandler2 = new LocalFileServerReadHandler( "appId", 0, 2, 1, 10, dataDir1.getAbsolutePath()); - validateResult(readHandler1, expectedBlockIds1, expectedData); - validateResult(readHandler2, expectedBlockIds2, expectedData); + LocalFileHandlerTestBase.validateResult(readHandler1, expectedBlockIds1, expectedData); + LocalFileHandlerTestBase.validateResult(readHandler2, expectedBlockIds2, expectedData); // after first read, write more data - writeTestData(writeHandler1, 1, 32, expectedData, expectedBlockIds1); + LocalFileHandlerTestBase.writeTestData(writeHandler1, 1, 32, expectedData, expectedBlockIds1); // new data should be read - validateResult(readHandler1, expectedBlockIds1, expectedData); + LocalFileHandlerTestBase.validateResult(readHandler1, expectedBlockIds1, expectedData); - File targetDataFile = new File(possiblePath1, "pre.data"); - ShuffleIndexResult shuffleIndexResult = readIndex(readHandler1); + ShuffleIndexResult shuffleIndexResult = LocalFileHandlerTestBase.readIndex(readHandler1); assertFalse(shuffleIndexResult.isEmpty()); - List<ShuffleDataResult> shuffleDataResults = readData(readHandler1, shuffleIndexResult); + assertEquals(352, shuffleIndexResult.getDataFileLen()); + List<ShuffleDataResult> shuffleDataResults = LocalFileHandlerTestBase.readData(readHandler1, shuffleIndexResult); assertFalse(shuffleDataResults.isEmpty()); + File targetDataFile = new File(possiblePath1, "pre.data"); targetDataFile.delete(); - shuffleDataResults = readData(readHandler1, shuffleIndexResult); + shuffleDataResults = LocalFileHandlerTestBase.readData(readHandler1, shuffleIndexResult); for (ShuffleDataResult shuffleData : shuffleDataResults) { assertEquals(0, shuffleData.getData().length); assertTrue(shuffleData.isEmpty()); @@ -129,71 +117,9 @@ public class LocalFileHandlerTest { assertEquals(writer.nextOffset(), totalSize); } + @Test + public void testReadIndex() { - private void writeTestData( - ShuffleWriteHandler writeHandler, - int num, int length, - Map<Long, byte[]> expectedData, - Set<Long> expectedBlockIds) throws Exception { - List<ShufflePartitionedBlock> blocks = Lists.newArrayList(); - for (int i = 0; i < num; i++) { - byte[] buf = new byte[length]; - new Random().nextBytes(buf); - long blockId = ATOMIC_LONG.incrementAndGet(); - blocks.add(new ShufflePartitionedBlock(length, length, ChecksumUtils.getCrc32(buf), blockId, 100, - buf)); - expectedData.put(blockId, buf); - expectedBlockIds.add(blockId); - } - writeHandler.write(blocks); - } - - protected void validateResult(ServerReadHandler readHandler, Set<Long> expectedBlockIds, - Map<Long, byte[]> expectedData) { - List<ShuffleDataResult> shuffleDataResults = readAll(readHandler); - Set<Long> actualBlockIds = Sets.newHashSet(); - for (ShuffleDataResult sdr : shuffleDataResults) { - byte[] buffer = sdr.getData(); - List<BufferSegment> bufferSegments = sdr.getBufferSegments(); - - for (BufferSegment bs : bufferSegments) { - byte[] data = new byte[bs.getLength()]; - System.arraycopy(buffer, bs.getOffset(), data, 0, bs.getLength()); - assertEquals(bs.getCrc(), ChecksumUtils.getCrc32(data)); - assertArrayEquals(expectedData.get(bs.getBlockId()), data); - actualBlockIds.add(bs.getBlockId()); - } - } - assertEquals(expectedBlockIds, actualBlockIds); - } - - private List<ShuffleDataResult> readAll(ServerReadHandler readHandler) { - ShuffleIndexResult shuffleIndexResult = readIndex(readHandler); - return readData(readHandler, shuffleIndexResult); - } - - private ShuffleIndexResult readIndex(ServerReadHandler readHandler) { - ShuffleIndexResult shuffleIndexResult = readHandler.getShuffleIndex(); - return shuffleIndexResult; - } - - private List<ShuffleDataResult> readData(ServerReadHandler readHandler, ShuffleIndexResult shuffleIndexResult) { - List<ShuffleDataResult> shuffleDataResults = Lists.newLinkedList(); - if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) { - return shuffleDataResults; - } - - List<ShuffleDataSegment> shuffleDataSegments = - RssUtils.transIndexDataToSegments(shuffleIndexResult, 32); - - for (ShuffleDataSegment shuffleDataSegment : shuffleDataSegments) { - byte[] shuffleData = - readHandler.getShuffleData(shuffleDataSegment.getOffset(), shuffleDataSegment.getLength()).getData(); - ShuffleDataResult sdr = new ShuffleDataResult(shuffleData, shuffleDataSegment.getBufferSegments()); - shuffleDataResults.add(sdr); - } - - return shuffleDataResults; } } diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java new file mode 100644 index 00000000..ae7f0541 --- /dev/null +++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.storage.handler.impl; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import org.apache.uniffle.common.BufferSegment; +import org.apache.uniffle.common.ShuffleDataResult; +import org.apache.uniffle.common.ShuffleDataSegment; +import org.apache.uniffle.common.ShuffleIndexResult; +import org.apache.uniffle.common.ShufflePartitionedBlock; +import org.apache.uniffle.common.util.ChecksumUtils; +import org.apache.uniffle.common.util.RssUtils; +import org.apache.uniffle.storage.common.FileBasedShuffleSegment; +import org.apache.uniffle.storage.handler.api.ServerReadHandler; +import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class LocalFileHandlerTestBase { + private static AtomicLong ATOMIC_LONG = new AtomicLong(0L); + + public static void writeTestData( + ShuffleWriteHandler writeHandler, + int num, int length, + Map<Long, byte[]> expectedData, + Set<Long> expectedBlockIds) throws Exception { + List<ShufflePartitionedBlock> blocks = Lists.newArrayList(); + for (int i = 0; i < num; i++) { + byte[] buf = new byte[length]; + new Random().nextBytes(buf); + long blockId = ATOMIC_LONG.incrementAndGet(); + blocks.add(new ShufflePartitionedBlock(length, length, ChecksumUtils.getCrc32(buf), blockId, 100, + buf)); + expectedData.put(blockId, buf); + expectedBlockIds.add(blockId); + } + writeHandler.write(blocks); + } + + public static void validateResult(ServerReadHandler readHandler, Set<Long> expectedBlockIds, + Map<Long, byte[]> expectedData) { + List<ShuffleDataResult> shuffleDataResults = readAll(readHandler); + Set<Long> actualBlockIds = Sets.newHashSet(); + for (ShuffleDataResult sdr : shuffleDataResults) { + byte[] buffer = sdr.getData(); + List<BufferSegment> bufferSegments = sdr.getBufferSegments(); + + for (BufferSegment bs : bufferSegments) { + byte[] data = new byte[bs.getLength()]; + System.arraycopy(buffer, bs.getOffset(), data, 0, bs.getLength()); + assertEquals(bs.getCrc(), ChecksumUtils.getCrc32(data)); + assertArrayEquals(expectedData.get(bs.getBlockId()), data); + actualBlockIds.add(bs.getBlockId()); + } + } + assertEquals(expectedBlockIds, actualBlockIds); + } + + public static List<ShuffleDataResult> readAll(ServerReadHandler readHandler) { + ShuffleIndexResult shuffleIndexResult = readIndex(readHandler); + return readData(readHandler, shuffleIndexResult); + } + + public static ShuffleIndexResult readIndex(ServerReadHandler readHandler) { + ShuffleIndexResult shuffleIndexResult = readHandler.getShuffleIndex(); + return shuffleIndexResult; + } + + public static List<ShuffleDataResult> readData(ServerReadHandler readHandler, ShuffleIndexResult shuffleIndexResult) { + List<ShuffleDataResult> shuffleDataResults = Lists.newLinkedList(); + if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) { + return shuffleDataResults; + } + + List<ShuffleDataSegment> shuffleDataSegments = + RssUtils.transIndexDataToSegments(shuffleIndexResult, 32); + + for (ShuffleDataSegment shuffleDataSegment : shuffleDataSegments) { + byte[] shuffleData = + readHandler.getShuffleData(shuffleDataSegment.getOffset(), shuffleDataSegment.getLength()).getData(); + ShuffleDataResult sdr = new ShuffleDataResult(shuffleData, shuffleDataSegment.getBufferSegments()); + shuffleDataResults.add(sdr); + } + + return shuffleDataResults; + } + + public static void checkData(ShuffleDataResult shuffleDataResult, Map<Long, byte[]> expectedData) { + byte[] buffer = shuffleDataResult.getData(); + List<BufferSegment> bufferSegments = shuffleDataResult.getBufferSegments(); + + for (BufferSegment bs : bufferSegments) { + byte[] data = new byte[bs.getLength()]; + System.arraycopy(buffer, bs.getOffset(), data, 0, bs.getLength()); + assertEquals(bs.getCrc(), ChecksumUtils.getCrc32(data)); + assertArrayEquals(expectedData.get(bs.getBlockId()), data); + } + } + + public static void writeIndex(ByteBuffer byteBuffer, FileBasedShuffleSegment segment) { + byteBuffer.putLong(segment.getOffset()); + byteBuffer.putInt(segment.getLength()); + byteBuffer.putInt(segment.getUncompressLength()); + byteBuffer.putLong(segment.getCrc()); + byteBuffer.putLong(segment.getBlockId()); + byteBuffer.putLong(segment.getTaskAttemptId()); + } + + public static List<byte[]> calcSegmentBytes(Map<Long, byte[]> blockIdToData, int bytesPerSegment, int blockNum) { + List<byte[]> res = Lists.newArrayList(); + int curSize = 0; + ByteBuffer byteBuffer = ByteBuffer.allocate(2 * bytesPerSegment); + for (long i = 1; i <= blockNum; i++) { + byte[] data = blockIdToData.get(i); + byteBuffer.put(data, 0, data.length); + curSize += data.length; + if (curSize >= bytesPerSegment) { + byte[] newByte = new byte[curSize]; + System.arraycopy(byteBuffer.array(), 0, newByte, 0, curSize); + res.add(newByte); + byteBuffer.clear(); + curSize = 0; + } + } + if (curSize > 0) { + byte[] newByte = new byte[curSize]; + System.arraycopy(byteBuffer.array(), 0, newByte, 0, curSize); + res.add(newByte); + } + return res; + } +} diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java new file mode 100644 index 00000000..d25f44f4 --- /dev/null +++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.storage.handler.impl; + +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Maps; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; +import org.roaringbitmap.longlong.Roaring64NavigableMap; + +import org.apache.uniffle.client.api.ShuffleServerClient; +import org.apache.uniffle.client.request.RssGetShuffleDataRequest; +import org.apache.uniffle.client.response.ResponseStatusCode; +import org.apache.uniffle.client.response.RssGetShuffleDataResponse; +import org.apache.uniffle.client.response.RssGetShuffleIndexResponse; +import org.apache.uniffle.common.ShuffleDataResult; +import org.apache.uniffle.common.ShufflePartitionedBlock; +import org.apache.uniffle.storage.common.FileBasedShuffleSegment; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class LocalFileServerReadHandlerTest { + @Test + public void testDataInconsistent() throws Exception { + Map<Long, byte[]> expectedData = Maps.newHashMap(); + int expectTotalBlockNum = 4; + int blockSize = 7; + + ByteBuffer byteBuffer = ByteBuffer.allocate(expectTotalBlockNum * 40); + Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf(); + + // We simulate the generation of 4 block index files and 3 block data files to test LocalFileClientReadHandler + LocalFileHandlerTestBase.writeTestData(shuffleBlocks -> { + int offset = 0; + for (ShufflePartitionedBlock block : shuffleBlocks) { + FileBasedShuffleSegment segment = new FileBasedShuffleSegment( + block.getBlockId(), offset, block.getLength(), block.getUncompressLength(), + block.getCrc(), block.getTaskAttemptId()); + offset += block.getLength(); + LocalFileHandlerTestBase.writeIndex(byteBuffer, segment); + } + }, expectTotalBlockNum, blockSize, + expectedData, new HashSet<>()); + expectedData.forEach((id, block) -> expectBlockIds.addLong(id)); + + String appId = "app1"; + int shuffleId = 1; + int partitionId = 1; + ShuffleServerClient mockShuffleServerClient = Mockito.mock(ShuffleServerClient.class); + + int actualWriteDataBlock = expectTotalBlockNum - 1; + int actualFileLen = blockSize * actualWriteDataBlock; + RssGetShuffleIndexResponse response = new RssGetShuffleIndexResponse(ResponseStatusCode.SUCCESS, + byteBuffer.array(), actualFileLen); + Mockito.doReturn(response).when(mockShuffleServerClient).getShuffleIndex(Mockito.any()); + + int readBufferSize = 13; + int bytesPerSegment = ((readBufferSize / blockSize) + 1) * blockSize; + List<byte[]> segments = LocalFileHandlerTestBase.calcSegmentBytes(expectedData, + bytesPerSegment, actualWriteDataBlock); + + // first segment include 2 blocks + ArgumentMatcher<RssGetShuffleDataRequest> segment1Match = + (request) -> request.getOffset() == 0 && request.getLength() == bytesPerSegment; + // second segment include 1 block + ArgumentMatcher<RssGetShuffleDataRequest> segment2Match = + (request) -> request.getOffset() == bytesPerSegment && request.getLength() == blockSize; + RssGetShuffleDataResponse segment1Response = + new RssGetShuffleDataResponse(ResponseStatusCode.SUCCESS, segments.get(0)); + RssGetShuffleDataResponse segment2Response = + new RssGetShuffleDataResponse(ResponseStatusCode.SUCCESS, segments.get(1)); + + Mockito.doReturn(segment1Response).when(mockShuffleServerClient).getShuffleData(Mockito.argThat(segment1Match)); + Mockito.doReturn(segment2Response).when(mockShuffleServerClient).getShuffleData(Mockito.argThat(segment2Match)); + + Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf(); + LocalFileClientReadHandler handler = new LocalFileClientReadHandler(appId, partitionId, shuffleId, -1, 1, 1, + readBufferSize, expectBlockIds, processBlockIds, mockShuffleServerClient); + int totalSegment = ((blockSize * actualWriteDataBlock) / bytesPerSegment) + 1; + int readBlocks = 0; + for (int i = 0; i < totalSegment; i++) { + ShuffleDataResult result = handler.readShuffleData(); + LocalFileHandlerTestBase.checkData(result, expectedData); + readBlocks += result.getBufferSegments().size(); + } + assertEquals(actualWriteDataBlock, readBlocks); + } + +}
