This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 44e2c793e [CELEBORN-1958][CIP-14] Add testsuite to test writing with
javaClient and reading with cppClient
44e2c793e is described below
commit 44e2c793e2477e8f819a977c88213d27e067c85a
Author: HolyLow <[email protected]>
AuthorDate: Tue Apr 15 17:04:23 2025 +0800
[CELEBORN-1958][CIP-14] Add testsuite to test writing with javaClient and
reading with cppClient
### What changes were proposed in this pull request?
This PR adds joint test suite between java and cpp code, which test writing
data with javaClient and reading with cppClient. Besides, the CICD procedure is
added to verify the code automatically.
### Why are the changes needed?
The joint test suite could be used to verify the cppClient's correctness.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Compilation and CICD procedure.
Closes #3202 from
HolyLow/issue/celeborn-1958-add-integration-test-to-cppclient.
Authored-by: HolyLow <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.github/workflows/cpp_integration.yml | 39 +++++-
cpp/celeborn/CMakeLists.txt | 4 +
cpp/celeborn/{ => tests}/CMakeLists.txt | 31 ++++-
cpp/celeborn/tests/DataSumWithReaderClient.cpp | 85 ++++++++++++
.../deploy/cluster/JavaReadCppWriteTestBase.scala | 143 +++++++++++++++++++++
.../cluster/JavaReadCppWriteTestWithNONE.scala | 27 ++++
6 files changed, 320 insertions(+), 9 deletions(-)
diff --git a/.github/workflows/cpp_integration.yml
b/.github/workflows/cpp_integration.yml
index 85954623b..03eb81059 100644
--- a/.github/workflows/cpp_integration.yml
+++ b/.github/workflows/cpp_integration.yml
@@ -28,7 +28,7 @@ on:
- branch-*
jobs:
- celeborn_cpp_integration_test:
+ celeborn_cpp_unit_test:
runs-on: ubuntu-22.04
container: holylow/celeborn-cpp-dev:0.2
steps:
@@ -43,4 +43,39 @@ jobs:
cmake .. && make -j
- name: Run Unittests of Celeborn Cpp
working-directory: ./cpp/build
- run: ctest
\ No newline at end of file
+ run: ctest
+ celeborn_cpp_integration_test:
+ runs-on: ubuntu-22.04
+ container: holylow/celeborn-cpp-dev:0.2
+ steps:
+ - uses: actions/checkout@v4
+ with:
+ persist-credentials: 'false'
+ fetch-depth: 0
+ - name: Compile Celeborn Cpp
+ working-directory: ./cpp
+ run: |
+ rm -rf build && mkdir -p build && cd build
+ cmake .. && make -j
+ # This step is necessary because within container the setup-java step
would pollute the $PATH
+ # and cause following steps fail. We manually back up the $PATH here to
avoid the problem.
+ - name: Backup Path
+ run: |
+ echo $PATH
+ echo "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
>>"$GITHUB_PATH"
+ - name: Setup JDK 8
+ uses: actions/setup-java@v4
+ with:
+ distribution: zulu
+ java-version: 8
+ cache: maven
+ check-latest: false
+ - name: Compile & Install Celeborn Java
+ run: build/mvn clean install -DskipTests
+ - name: Run Java-Cpp Hybrid Integration Test
+ run: |
+ build/mvn -pl worker \
+ test-compile exec:java \
+ -Dexec.classpathScope="test" \
+
-Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.JavaReadCppWriteTestWithNONE"
\
+ -Dexec.args="-XX:MaxDirectMemorySize=2G"
diff --git a/cpp/celeborn/CMakeLists.txt b/cpp/celeborn/CMakeLists.txt
index b1effedb6..1588488a7 100644
--- a/cpp/celeborn/CMakeLists.txt
+++ b/cpp/celeborn/CMakeLists.txt
@@ -19,3 +19,7 @@ add_subdirectory(conf)
add_subdirectory(protocol)
add_subdirectory(network)
add_subdirectory(client)
+
+if(CELEBORN_BUILD_TESTS)
+ add_subdirectory(tests)
+endif()
diff --git a/cpp/celeborn/CMakeLists.txt b/cpp/celeborn/tests/CMakeLists.txt
similarity index 59%
copy from cpp/celeborn/CMakeLists.txt
copy to cpp/celeborn/tests/CMakeLists.txt
index b1effedb6..0bc5e41c9 100644
--- a/cpp/celeborn/CMakeLists.txt
+++ b/cpp/celeborn/tests/CMakeLists.txt
@@ -12,10 +12,27 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-add_subdirectory(proto)
-add_subdirectory(memory)
-add_subdirectory(utils)
-add_subdirectory(conf)
-add_subdirectory(protocol)
-add_subdirectory(network)
-add_subdirectory(client)
+add_library(
+ dataSumWithReaderClient
+ DataSumWithReaderClient.cpp)
+
+target_link_libraries(
+ dataSumWithReaderClient
+ memory
+ utils
+ conf
+ proto
+ network
+ protocol
+ client
+ ${WANGLE}
+ ${FIZZ}
+ ${LIBSODIUM_LIBRARY}
+ ${FOLLY_WITH_DEPENDENCIES}
+ ${GLOG}
+ ${GFLAGS_LIBRARIES}
+)
+
+add_executable(cppDataSumWithReaderClient DataSumWithReaderClient.cpp)
+
+target_link_libraries(cppDataSumWithReaderClient dataSumWithReaderClient)
\ No newline at end of file
diff --git a/cpp/celeborn/tests/DataSumWithReaderClient.cpp
b/cpp/celeborn/tests/DataSumWithReaderClient.cpp
new file mode 100644
index 000000000..d60166e92
--- /dev/null
+++ b/cpp/celeborn/tests/DataSumWithReaderClient.cpp
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cstdio>
+#include <fstream>
+#include <iostream>
+
+#include <celeborn/client/ShuffleClient.h>
+
+int main(int argc, char** argv) {
+ // Read the configs.
+ assert(argc == 8);
+ std::string lifecycleManagerHost = argv[1];
+ int lifecycleManagerPort = std::atoi(argv[2]);
+ std::string appUniqueId = argv[3];
+ int shuffleId = std::atoi(argv[4]);
+ int attemptId = std::atoi(argv[5]);
+ int numPartitions = std::atoi(argv[6]);
+ std::string resultFile = argv[7];
+ std::cout << "lifecycleManagerHost = " << lifecycleManagerHost
+ << ", lifecycleManagerPort = " << lifecycleManagerPort
+ << ", appUniqueId = " << appUniqueId
+ << ", shuffleId = " << shuffleId << ", attemptId = " << attemptId
+ << ", numPartitions = " << numPartitions
+ << ", resultFile = " << resultFile << std::endl;
+
+ // Create shuffleClient and setup.
+ auto conf = std::make_shared<celeborn::conf::CelebornConf>();
+ auto clientFactory =
+ std::make_shared<celeborn::network::TransportClientFactory>(conf);
+ auto shuffleClient = std::make_unique<celeborn::client::ShuffleClientImpl>(
+ appUniqueId, conf, clientFactory);
+ shuffleClient->setupLifecycleManagerRef(
+ lifecycleManagerHost, lifecycleManagerPort);
+
+ // Read data, parse data and sum up.
+ std::vector<long> result(numPartitions, 0);
+ shuffleClient->updateReducerFileGroup(shuffleId);
+ for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+ auto inputStream = shuffleClient->readPartition(
+ shuffleId, partitionId, attemptId, 0, INT_MAX);
+ char c;
+ long data = 0;
+ int dataCnt = 0;
+ while (inputStream->read((uint8_t*)&c, 0, 1) > 0) {
+ if (c == '-') {
+ result[partitionId] += data;
+ data = 0;
+ dataCnt++;
+ continue;
+ }
+ assert(c >= '0' && c <= '9');
+ data *= 10;
+ data += c - '0';
+ }
+ result[partitionId] += data;
+ std::cout << "partition " << partitionId
+ << " sum result = " << result[partitionId]
+ << ", dataCnt = " << dataCnt << std::endl;
+ }
+
+ // Write result to resultFile.
+ remove(resultFile.c_str());
+ std::ofstream of(resultFile);
+ for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+ of << result[partitionId] << std::endl;
+ }
+ of.close();
+
+ return 0;
+}
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestBase.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestBase.scala
new file mode 100644
index 000000000..a57ffebe8
--- /dev/null
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestBase.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.cluster
+
+import java.io.File
+import java.util
+
+import scala.io.Source
+import scala.util.Random
+
+import org.junit.Assert
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.funsuite.AnyFunSuite
+
+import org.apache.celeborn.client.{LifecycleManager, ShuffleClientImpl}
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.identity.UserIdentifier
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.protocol.CompressionCodec
+import org.apache.celeborn.common.util.Utils.runCommand
+import org.apache.celeborn.service.deploy.MiniClusterFeature
+
+trait JavaReadCppWriteTestBase extends AnyFunSuite
+ with Logging with MiniClusterFeature with BeforeAndAfterAll {
+
+ var masterPort = 0
+
+ override def beforeAll(): Unit = {
+ logInfo("test initialized, setup Celeborn mini cluster")
+ val (m, _) = setupMiniClusterWithRandomPorts()
+ masterPort = m.conf.masterPort
+ }
+
+ override def afterAll(): Unit = {
+ logInfo("all test complete, stop Celeborn mini cluster")
+ shutdownMiniCluster()
+ }
+
+ def testJavaReadCppWrite(codec: CompressionCodec): Unit = {
+ beforeAll()
+ try {
+ runJavaReadCppWrite(codec)
+ } finally {
+ afterAll()
+ }
+ }
+
+ def runJavaReadCppWrite(codec: CompressionCodec): Unit = {
+ val appUniqueId = "test-app"
+ val shuffleId = 0
+ val attemptId = 0
+
+ // Create lifecycleManager.
+ val clientConf = new CelebornConf()
+ .set(CelebornConf.MASTER_ENDPOINTS.key, s"localhost:$masterPort")
+ .set(CelebornConf.SHUFFLE_COMPRESSION_CODEC.key, codec.name)
+ .set(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key, "true")
+ .set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key, "256K")
+ .set(CelebornConf.READ_LOCAL_SHUFFLE_FILE, false)
+ .set("celeborn.data.io.numConnectionsPerPeer", "1")
+ val lifecycleManager = new LifecycleManager(appUniqueId, clientConf)
+
+ // Create writer shuffleClient.
+ val shuffleClient =
+ new ShuffleClientImpl(appUniqueId, clientConf, UserIdentifier("mock",
"mock"))
+ shuffleClient.setupLifecycleManagerRef(lifecycleManager.self)
+
+ // Generate random data, write with shuffleClient and calculate result.
+ val numMappers = 2
+ val numPartitions = 2
+ val maxData = 1000000
+ val numData = 1000
+ var sums = new util.ArrayList[Long](numPartitions)
+ val rand = new Random()
+ for (mapId <- 0 until numMappers) {
+ for (partitionId <- 0 until numPartitions) {
+ sums.add(0)
+ for (i <- 0 until numData) {
+ val data = rand.nextInt(maxData)
+ sums.set(partitionId, sums.get(partitionId) + data)
+ val dataStr = "-" + data.toString
+ shuffleClient.pushOrMergeData(
+ shuffleId,
+ mapId,
+ attemptId,
+ partitionId,
+ dataStr.getBytes,
+ 0,
+ dataStr.length,
+ numMappers,
+ numPartitions,
+ false,
+ true)
+ }
+ }
+ shuffleClient.pushMergedData(shuffleId, mapId, attemptId)
+ shuffleClient.mapperEnd(shuffleId, mapId, attemptId, numMappers)
+ }
+
+ // Launch cpp reader to read data, calculate result and write to specific
result file.
+ val cppResultFile = "/tmp/celeborn-cpp-result.txt"
+ val lifecycleManagerHost = lifecycleManager.getHost
+ val lifecycleManagerPort = lifecycleManager.getPort
+ val projectDirectory = new File(new File(".").getAbsolutePath)
+ val cppBinRelativeDirectory = "cpp/build/celeborn/tests/"
+ val cppBinFileName = "cppDataSumWithReaderClient"
+ val cppBinFilePath =
s"$projectDirectory/$cppBinRelativeDirectory/$cppBinFileName"
+ // Execution command: $exec lifecycleManagerHost lifecycleManagerPort
appUniqueId shuffleId attemptId numPartitions cppResultFile
+ val command = {
+ s"$cppBinFilePath $lifecycleManagerHost $lifecycleManagerPort
$appUniqueId $shuffleId $attemptId $numPartitions $cppResultFile"
+ }
+ println(s"run command: $command")
+ val commandOutput = runCommand(command)
+ println(s"command output: $commandOutput")
+
+ // Verify the sum result.
+ var lineCount = 0
+ for (line <- Source.fromFile(cppResultFile, "utf-8").getLines.toList) {
+ val data = line.toLong
+ Assert.assertEquals(data, sums.get(lineCount))
+ lineCount += 1
+ }
+ Assert.assertEquals(lineCount, numPartitions)
+ lifecycleManager.stop()
+ shuffleClient.shutdown()
+ }
+
+}
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
new file mode 100644
index 000000000..4aa5b0ed7
--- /dev/null
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.cluster
+
+import org.apache.celeborn.common.protocol.CompressionCodec
+
+object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase {
+
+ def main(args: Array[String]) = {
+ testJavaReadCppWrite(CompressionCodec.NONE)
+ }
+}