This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new ad5a88161 [CELEBORN-2267][FOLLOWUP] Add Cpp-Write Java-Read
integration tests for LZ4 and ZSTD
ad5a88161 is described below
commit ad5a88161cdd6febfb499f00dfbedd8ecb9d9d2a
Author: afterincomparableyum
<[email protected]>
AuthorDate: Thu Feb 26 11:24:37 2026 +0800
[CELEBORN-2267][FOLLOWUP] Add Cpp-Write Java-Read integration tests for LZ4
and ZSTD
This is a follow up to https://github.com/apache/celeborn/pull/3575
- Add compression codec argument to C++ DataSumWithWriterClient and set it
in CelebornConf so the writer uses LZ4/ZSTD when enabled
- Pass codec from runCppWriteJavaRead to the C++ writer command
- Add CppWriteJavaReadTestWithLZ4 and CppWriteJavaReadTestWithZSTD
(mirroring CppWriteJavaReadTestWithNONE)
### How was this patch tested?
I compiled and ran tests locally, all passed.
Closes #3606 from afterincomparableyum/cpp-client/celeborn-2267.
Authored-by: afterincomparableyum
<[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.github/workflows/cpp_integration.yml | 14 +++++++++++
cpp/celeborn/tests/DataSumWithWriterClient.cpp | 8 +++++--
.../cluster/CppWriteJavaReadTestWithLZ4.scala | 27 ++++++++++++++++++++++
.../cluster/CppWriteJavaReadTestWithZSTD.scala | 27 ++++++++++++++++++++++
.../cluster/JavaCppHybridReadWriteTestBase.scala | 5 ++--
5 files changed, 77 insertions(+), 4 deletions(-)
diff --git a/.github/workflows/cpp_integration.yml
b/.github/workflows/cpp_integration.yml
index 4f04c4bad..804f98c82 100644
--- a/.github/workflows/cpp_integration.yml
+++ b/.github/workflows/cpp_integration.yml
@@ -112,4 +112,18 @@ jobs:
test-compile exec:java \
-Dexec.classpathScope="test" \
-Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.CppWriteJavaReadTestWithNONE"
\
+ -Dexec.args="-XX:MaxDirectMemorySize=2G"
+ - name: Run Cpp-Write Java-Read Hybrid Integration Test (LZ4 Compression)
+ run: |
+ build/mvn -pl worker \
+ test-compile exec:java \
+ -Dexec.classpathScope="test" \
+
-Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.CppWriteJavaReadTestWithLZ4"
\
+ -Dexec.args="-XX:MaxDirectMemorySize=2G"
+ - name: Run Cpp-Write Java-Read Hybrid Integration Test (ZSTD
Compression)
+ run: |
+ build/mvn -pl worker \
+ test-compile exec:java \
+ -Dexec.classpathScope="test" \
+
-Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.CppWriteJavaReadTestWithZSTD"
\
-Dexec.args="-XX:MaxDirectMemorySize=2G"
\ No newline at end of file
diff --git a/cpp/celeborn/tests/DataSumWithWriterClient.cpp
b/cpp/celeborn/tests/DataSumWithWriterClient.cpp
index ceaa01beb..ec7aeb520 100644
--- a/cpp/celeborn/tests/DataSumWithWriterClient.cpp
+++ b/cpp/celeborn/tests/DataSumWithWriterClient.cpp
@@ -25,7 +25,7 @@
int main(int argc, char** argv) {
folly::init(&argc, &argv, false);
// Read the configs.
- assert(argc == 9);
+ assert(argc == 10);
std::string lifecycleManagerHost = argv[1];
int lifecycleManagerPort = std::atoi(argv[2]);
std::string appUniqueId = argv[3];
@@ -34,16 +34,20 @@ int main(int argc, char** argv) {
int numMappers = std::atoi(argv[6]);
int numPartitions = std::atoi(argv[7]);
std::string resultFile = argv[8];
+ std::string compressCodec = argv[9];
std::cout << "lifecycleManagerHost = " << lifecycleManagerHost
<< ", lifecycleManagerPort = " << lifecycleManagerPort
<< ", appUniqueId = " << appUniqueId
<< ", shuffleId = " << shuffleId << ", attemptId = " << attemptId
<< ", numMappers = " << numMappers
<< ", numPartitions = " << numPartitions
- << ", resultFile = " << resultFile << std::endl;
+ << ", resultFile = " << resultFile
+ << ", compressCodec = " << compressCodec << std::endl;
// Create shuffleClient and setup.
auto conf = std::make_shared<celeborn::conf::CelebornConf>();
+ conf->registerProperty(
+ celeborn::conf::CelebornConf::kShuffleCompressionCodec, compressCodec);
auto clientEndpoint =
std::make_shared<celeborn::client::ShuffleClientEndpoint>(conf);
auto shuffleClient = celeborn::client::ShuffleClientImpl::create(
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/CppWriteJavaReadTestWithLZ4.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/CppWriteJavaReadTestWithLZ4.scala
new file mode 100644
index 000000000..0f3528856
--- /dev/null
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/CppWriteJavaReadTestWithLZ4.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.cluster
+
+import org.apache.celeborn.common.protocol.CompressionCodec
+
+object CppWriteJavaReadTestWithLZ4 extends JavaCppHybridReadWriteTestBase {
+
+ def main(args: Array[String]) = {
+ testCppWriteJavaRead(CompressionCodec.LZ4)
+ }
+}
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/CppWriteJavaReadTestWithZSTD.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/CppWriteJavaReadTestWithZSTD.scala
new file mode 100644
index 000000000..63d6f5d73
--- /dev/null
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/CppWriteJavaReadTestWithZSTD.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.cluster
+
+import org.apache.celeborn.common.protocol.CompressionCodec
+
+object CppWriteJavaReadTestWithZSTD extends JavaCppHybridReadWriteTestBase {
+
+ def main(args: Array[String]) = {
+ testCppWriteJavaRead(CompressionCodec.ZSTD)
+ }
+}
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaCppHybridReadWriteTestBase.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaCppHybridReadWriteTestBase.scala
index 325f9c8b7..621841826 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaCppHybridReadWriteTestBase.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaCppHybridReadWriteTestBase.scala
@@ -188,9 +188,10 @@ trait JavaCppHybridReadWriteTestBase extends AnyFunSuite
val cppBinRelativeDirectory = "cpp/build/celeborn/tests/"
val cppBinFileName = "cppDataSumWithWriterClient"
val cppBinFilePath =
s"$projectDirectory/$cppBinRelativeDirectory/$cppBinFileName"
- // Execution command: $exec lifecycleManagerHost lifecycleManagerPort
appUniqueId shuffleId attemptId numMappers numPartitions cppResultFile
+ val cppCodec = codec.name()
+ // Execution command: $exec lifecycleManagerHost lifecycleManagerPort
appUniqueId shuffleId attemptId numMappers numPartitions cppResultFile cppCodec
val command = {
- s"$cppBinFilePath $lifecycleManagerHost $lifecycleManagerPort
$appUniqueId $shuffleId $attemptId $numMappers $numPartitions $cppResultFile"
+ s"$cppBinFilePath $lifecycleManagerHost $lifecycleManagerPort
$appUniqueId $shuffleId $attemptId $numMappers $numPartitions $cppResultFile
$cppCodec"
}
println(s"run command: $command")
val commandOutput = runCommand(command)