HolyLow commented on code in PR #3568:
URL: https://github.com/apache/celeborn/pull/3568#discussion_r2639106529
##########
cpp/celeborn/client/tests/PushStateTest.cpp:
##########
@@ -153,3 +192,67 @@ TEST_F(PushStateTest, throwException) {
}
EXPECT_TRUE(exceptionThrowed);
}
+
+TEST_F(PushStateBytesSizeTest, limitMaxInFlightByBytesSize) {
+ const std::string hostAndPushPort = "xx.xx.xx.xx:8080";
+ const int expectedAllowedBatches = 2;
+ const int addBatchCalls = expectedAllowedBatches + 1;
+ std::vector<bool> addBatchMarks(addBatchCalls, false);
+
+ std::thread addBatchThread([&]() {
+ for (auto i = 0; i < addBatchCalls; i++) {
+ pushState_->addBatch(i, batchSize_, hostAndPushPort);
+ auto result = pushState_->limitMaxInFlight(hostAndPushPort);
+ addBatchMarks[i] = true;
+ if (i < expectedAllowedBatches) {
+ EXPECT_FALSE(result) << "Batch " << i << " should be within limits";
+ }
+ }
+ });
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(pushSleepDeltaMs_));
+ for (auto i = 0; i < expectedAllowedBatches; i++) {
+ EXPECT_TRUE(addBatchMarks[i]) << "Batch " << i << " should have completed";
+ }
+
+ pushState_->removeBatch(0, hostAndPushPort);
+ addBatchThread.join();
+ EXPECT_TRUE(addBatchMarks[expectedAllowedBatches]);
+}
+
+TEST_F(PushStateBytesSizeTest, limitMaxInFlightByTotalBytesSize) {
+ const std::string hostAndPushPort1 = "xx.xx.xx.xx:8080";
+ const std::string hostAndPushPort2 = "yy.yy.yy.yy:8080";
+
+ pushState_->addBatch(0, batchSize_, hostAndPushPort1);
+ EXPECT_FALSE(pushState_->limitMaxInFlight(hostAndPushPort1));
+
+ pushState_->addBatch(1, batchSize_, hostAndPushPort2);
+ EXPECT_FALSE(pushState_->limitMaxInFlight(hostAndPushPort2));
+
+ std::atomic<bool> thirdBatchCompleted{false};
+ std::thread addBatchThread([&]() {
+ pushState_->addBatch(2, batchSize_, hostAndPushPort1);
+ pushState_->limitMaxInFlight(hostAndPushPort1);
+ thirdBatchCompleted = true;
+ });
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(pushSleepDeltaMs_));
+ EXPECT_FALSE(thirdBatchCompleted.load())
+ << "Third batch should be blocked due to total bytes limit";
+
+ pushState_->removeBatch(0, hostAndPushPort1);
+ addBatchThread.join();
+
+ EXPECT_TRUE(thirdBatchCompleted.load());
+}
+
+TEST_F(PushStateBytesSizeTest, cleanupClearsBytesSizeTracking) {
+ const std::string hostAndPushPort = "xx.xx.xx.xx:8080";
+
+ pushState_->addBatch(0, batchSize_, hostAndPushPort);
+ pushState_->addBatch(1, batchSize_, hostAndPushPort);
+ pushState_->cleanup();
+
+ EXPECT_FALSE(pushState_->limitMaxInFlight(hostAndPushPort));
+}
Review Comment:
Should be an empty line at end of file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]