This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new ad5a88161 [CELEBORN-2267][FOLLOWUP] Add Cpp-Write Java-Read 
integration tests for LZ4 and ZSTD
ad5a88161 is described below

commit ad5a88161cdd6febfb499f00dfbedd8ecb9d9d2a
Author: afterincomparableyum 
<[email protected]>
AuthorDate: Thu Feb 26 11:24:37 2026 +0800

    [CELEBORN-2267][FOLLOWUP] Add Cpp-Write Java-Read integration tests for LZ4 
and ZSTD
    
    This is a follow up to https://github.com/apache/celeborn/pull/3575
    
    - Add compression codec argument to C++ DataSumWithWriterClient and set it 
in CelebornConf so the writer uses LZ4/ZSTD when enabled
    - Pass codec from runCppWriteJavaRead to the C++ writer command
    - Add CppWriteJavaReadTestWithLZ4 and CppWriteJavaReadTestWithZSTD 
(mirroring CppWriteJavaReadTestWithNONE)
    
    ### How was this patch tested?
    
    I compiled and ran tests locally, all passed.
    
    Closes #3606 from afterincomparableyum/cpp-client/celeborn-2267.
    
    Authored-by: afterincomparableyum 
<[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .github/workflows/cpp_integration.yml              | 14 +++++++++++
 cpp/celeborn/tests/DataSumWithWriterClient.cpp     |  8 +++++--
 .../cluster/CppWriteJavaReadTestWithLZ4.scala      | 27 ++++++++++++++++++++++
 .../cluster/CppWriteJavaReadTestWithZSTD.scala     | 27 ++++++++++++++++++++++
 .../cluster/JavaCppHybridReadWriteTestBase.scala   |  5 ++--
 5 files changed, 77 insertions(+), 4 deletions(-)

diff --git a/.github/workflows/cpp_integration.yml 
b/.github/workflows/cpp_integration.yml
index 4f04c4bad..804f98c82 100644
--- a/.github/workflows/cpp_integration.yml
+++ b/.github/workflows/cpp_integration.yml
@@ -112,4 +112,18 @@ jobs:
             test-compile exec:java \
             -Dexec.classpathScope="test" \
             
-Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.CppWriteJavaReadTestWithNONE"
 \
+            -Dexec.args="-XX:MaxDirectMemorySize=2G"
+      - name: Run Cpp-Write Java-Read Hybrid Integration Test (LZ4 Compression)
+        run: |
+          build/mvn -pl worker \
+            test-compile exec:java \
+            -Dexec.classpathScope="test" \
+            
-Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.CppWriteJavaReadTestWithLZ4"
 \
+            -Dexec.args="-XX:MaxDirectMemorySize=2G"
+      - name: Run Cpp-Write Java-Read Hybrid Integration Test (ZSTD 
Compression)
+        run: |
+          build/mvn -pl worker \
+            test-compile exec:java \
+            -Dexec.classpathScope="test" \
+            
-Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.CppWriteJavaReadTestWithZSTD"
 \
             -Dexec.args="-XX:MaxDirectMemorySize=2G"
\ No newline at end of file
diff --git a/cpp/celeborn/tests/DataSumWithWriterClient.cpp 
b/cpp/celeborn/tests/DataSumWithWriterClient.cpp
index ceaa01beb..ec7aeb520 100644
--- a/cpp/celeborn/tests/DataSumWithWriterClient.cpp
+++ b/cpp/celeborn/tests/DataSumWithWriterClient.cpp
@@ -25,7 +25,7 @@
 int main(int argc, char** argv) {
   folly::init(&argc, &argv, false);
   // Read the configs.
-  assert(argc == 9);
+  assert(argc == 10);
   std::string lifecycleManagerHost = argv[1];
   int lifecycleManagerPort = std::atoi(argv[2]);
   std::string appUniqueId = argv[3];
@@ -34,16 +34,20 @@ int main(int argc, char** argv) {
   int numMappers = std::atoi(argv[6]);
   int numPartitions = std::atoi(argv[7]);
   std::string resultFile = argv[8];
+  std::string compressCodec = argv[9];
   std::cout << "lifecycleManagerHost = " << lifecycleManagerHost
             << ", lifecycleManagerPort = " << lifecycleManagerPort
             << ", appUniqueId = " << appUniqueId
             << ", shuffleId = " << shuffleId << ", attemptId = " << attemptId
             << ", numMappers = " << numMappers
             << ", numPartitions = " << numPartitions
-            << ", resultFile = " << resultFile << std::endl;
+            << ", resultFile = " << resultFile
+            << ", compressCodec = " << compressCodec << std::endl;
 
   // Create shuffleClient and setup.
   auto conf = std::make_shared<celeborn::conf::CelebornConf>();
+  conf->registerProperty(
+      celeborn::conf::CelebornConf::kShuffleCompressionCodec, compressCodec);
   auto clientEndpoint =
       std::make_shared<celeborn::client::ShuffleClientEndpoint>(conf);
   auto shuffleClient = celeborn::client::ShuffleClientImpl::create(
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/CppWriteJavaReadTestWithLZ4.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/CppWriteJavaReadTestWithLZ4.scala
new file mode 100644
index 000000000..0f3528856
--- /dev/null
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/CppWriteJavaReadTestWithLZ4.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.cluster
+
+import org.apache.celeborn.common.protocol.CompressionCodec
+
+object CppWriteJavaReadTestWithLZ4 extends JavaCppHybridReadWriteTestBase {
+
+  def main(args: Array[String]) = {
+    testCppWriteJavaRead(CompressionCodec.LZ4)
+  }
+}
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/CppWriteJavaReadTestWithZSTD.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/CppWriteJavaReadTestWithZSTD.scala
new file mode 100644
index 000000000..63d6f5d73
--- /dev/null
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/CppWriteJavaReadTestWithZSTD.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.cluster
+
+import org.apache.celeborn.common.protocol.CompressionCodec
+
+object CppWriteJavaReadTestWithZSTD extends JavaCppHybridReadWriteTestBase {
+
+  def main(args: Array[String]) = {
+    testCppWriteJavaRead(CompressionCodec.ZSTD)
+  }
+}
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaCppHybridReadWriteTestBase.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaCppHybridReadWriteTestBase.scala
index 325f9c8b7..621841826 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaCppHybridReadWriteTestBase.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaCppHybridReadWriteTestBase.scala
@@ -188,9 +188,10 @@ trait JavaCppHybridReadWriteTestBase extends AnyFunSuite
     val cppBinRelativeDirectory = "cpp/build/celeborn/tests/"
     val cppBinFileName = "cppDataSumWithWriterClient"
     val cppBinFilePath = 
s"$projectDirectory/$cppBinRelativeDirectory/$cppBinFileName"
-    // Execution command: $exec lifecycleManagerHost lifecycleManagerPort 
appUniqueId shuffleId attemptId numMappers numPartitions cppResultFile
+    val cppCodec = codec.name()
+    // Execution command: $exec lifecycleManagerHost lifecycleManagerPort 
appUniqueId shuffleId attemptId numMappers numPartitions cppResultFile cppCodec
     val command = {
-      s"$cppBinFilePath $lifecycleManagerHost $lifecycleManagerPort 
$appUniqueId $shuffleId $attemptId $numMappers $numPartitions $cppResultFile"
+      s"$cppBinFilePath $lifecycleManagerHost $lifecycleManagerPort 
$appUniqueId $shuffleId $attemptId $numMappers $numPartitions $cppResultFile 
$cppCodec"
     }
     println(s"run command: $command")
     val commandOutput = runCommand(command)

Reply via email to