This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new ec38fcce3 [CELEBORN-2254] Fix support for S3 and add a simple 
integration test
ec38fcce3 is described below

commit ec38fcce383cd14252fe0e7074deddafc91e5f3f
Author: Enrico Olivelli <[email protected]>
AuthorDate: Thu Feb 5 13:39:12 2026 +0800

    [CELEBORN-2254] Fix support for S3 and add a simple integration test
    
    ### What changes were proposed in this pull request?
    * Fix creating files to S3 (and other DFS)
    * Add integration test for Spark and S3 (using Minio)
    * in CI some job will run with the AWS profile because this way we can 
activate the new integration test (that needs the S3 client dependencies)
    
    ### Why are the changes needed?
    see https://issues.apache.org/jira/browse/CELEBORN-2254
    
    ### Does this PR resolve a correctness bug?
    
    No
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    * I have added an integration test
    * I have this patch on out internal fork, to make Celeborn run on k8s with 
S3
    
    Closes #3592 from eolivelli/CELEBORN-2254-apache.
    
    Authored-by: Enrico Olivelli <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .github/workflows/maven.yml                        |   6 +-
 .../org/apache/celeborn/client/ShuffleClient.java  |   2 +
 .../apache/celeborn/client/ShuffleClientImpl.java  |   8 ++
 .../celeborn/common/util/CelebornHadoopUtils.scala |   1 +
 .../apache/celeborn/S3MultipartUploadHandler.java  |  33 +++--
 pom.xml                                            |   7 ++
 project/CelebornBuild.scala                        |   8 +-
 tests/spark-it/pom.xml                             |   5 +
 .../spark/s3/BasicEndToEndTieredStorageTest.scala  | 133 +++++++++++++++++++++
 .../deploy/worker/storage/StorageManager.scala     |   9 +-
 .../deploy/worker/storage/StoragePolicy.scala      |   2 +
 11 files changed, 198 insertions(+), 16 deletions(-)

diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 7f357d9f0..513e3a891 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -133,7 +133,8 @@ jobs:
       run: |
         SPARK_BINARY_VERSION=${{ matrix.spark }}
         SPARK_MAJOR_VERSION=${SPARK_BINARY_VERSION%%.*}
-        PROFILES="-Pgoogle-mirror,spark-${{ matrix.spark }}"
+        # enable AWS profile to run end-to-end tests with S3 storage
+        PROFILES="-Pgoogle-mirror,aws,spark-${{ matrix.spark }}"
         
TEST_MODULES="client-spark/common,client-spark/spark-${SPARK_MAJOR_VERSION},client-spark/spark-${SPARK_MAJOR_VERSION}-columnar-common,client-spark/spark-${SPARK_MAJOR_VERSION}-shaded,tests/spark-it"
         build/mvn $PROFILES -pl $TEST_MODULES -am clean install -DskipTests
         build/mvn $PROFILES -pl $TEST_MODULES 
-Dspark.shuffle.sort.io.plugin.class=${{ matrix.shuffle-plugin-class }} test
@@ -170,7 +171,8 @@ jobs:
         run: |
           SPARK_BINARY_VERSION=${{ matrix.spark }}
           SPARK_MAJOR_VERSION=${SPARK_BINARY_VERSION%%.*}
-          PROFILES="-Pgoogle-mirror,spark-${{ matrix.spark }}"
+          # enable AWS profile to run end-to-end tests with S3 storage
+          PROFILES="-Pgoogle-mirror,aws,spark-${{ matrix.spark }}"
           
TEST_MODULES="client-spark/common,client-spark/spark-3,client-spark/spark-3-columnar-common,client-spark/spark-${SPARK_MAJOR_VERSION}-shaded,tests/spark-it"
           build/mvn $PROFILES -pl $TEST_MODULES -am clean install -DskipTests
           build/mvn $PROFILES -pl $TEST_MODULES 
-Dspark.shuffle.sort.io.plugin.class=${{ matrix.shuffle-plugin-class }} test
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
index e4c473a9b..7a89b051d 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
@@ -19,6 +19,7 @@ package org.apache.celeborn.client;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
@@ -122,6 +123,7 @@ public abstract class ShuffleClient {
             hadoopFs = CelebornHadoopUtils.getHadoopFS(conf);
           } catch (Exception e) {
             logger.error("Celeborn initialize DFS failed.", e);
+            hadoopFs = Collections.emptyMap();
           }
         }
       }
diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index b55420386..be2bdf87d 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -36,6 +36,7 @@ import io.netty.buffer.CompositeByteBuf;
 import io.netty.buffer.Unpooled;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.fs.FileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -231,6 +232,13 @@ public class ShuffleClientImpl extends ShuffleClient {
 
     reviveManager = new ReviveManager(this, conf);
 
+    if (conf.hasS3Storage()) {
+      Map<StorageInfo.Type, FileSystem> hadoopFs = getHadoopFs(conf);
+      FileSystem s3client = hadoopFs.get(StorageInfo.Type.S3);
+      logger.info("S3 client: {}", s3client);
+      if (s3client == null)
+        throw new IllegalStateException("S3 type is requred but the S3 client 
was not created");
+    }
     logger.info("Created ShuffleClientImpl, appUniqueId: {}", appUniqueId);
   }
 
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
 
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
index 2940afcd2..4bb394a6c 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
@@ -99,6 +99,7 @@ object CelebornHadoopUtils extends Logging {
       dirs.foreach {
         case (storageType, dir) => {
           val path = new Path(dir)
+          logInfo(s"Creating HadoopFS for type $storageType at path $path");
           hadoopFs.put(storageType, path.getFileSystem(hadoopConf))
         }
       })
diff --git 
a/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
 
b/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
index 11951e36e..ab1d8c9a0 100644
--- 
a/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
+++ 
b/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
@@ -27,12 +27,14 @@ import java.util.List;
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.event.ProgressListener;
 import com.amazonaws.retry.PredefinedBackoffStrategies;
 import com.amazonaws.retry.PredefinedRetryPolicies;
 import com.amazonaws.retry.RetryPolicy;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.*;
 import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
 import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
 import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
@@ -61,15 +63,15 @@ public class S3MultipartUploadHandler implements 
MultipartUploadHandler {
 
   private String uploadId;
 
-  private AmazonS3 s3Client;
+  private final AmazonS3 s3Client;
 
-  private String key;
+  private final String key;
 
-  private String bucketName;
+  private final String bucketName;
 
-  private Integer s3MultiplePartUploadMaxRetries;
-  private Integer baseDelay;
-  private Integer maxBackoff;
+  private final Integer s3MultiplePartUploadMaxRetries;
+  private final Integer baseDelay;
+  private final Integer maxBackoff;
 
   public S3MultipartUploadHandler(
       FileSystem hadoopFs,
@@ -103,12 +105,23 @@ public class S3MultipartUploadHandler implements 
MultipartUploadHandler {
         new ClientConfiguration()
             .withRetryPolicy(retryPolicy)
             .withMaxErrorRetry(s3MultiplePartUploadMaxRetries);
-    this.s3Client =
+    AmazonS3ClientBuilder builder =
         AmazonS3ClientBuilder.standard()
             .withCredentials(providers)
-            .withRegion(conf.get(Constants.AWS_REGION))
-            .withClientConfiguration(clientConfig)
-            .build();
+            .withClientConfiguration(clientConfig);
+    // for MinIO
+    String endpoint = conf.get("fs.s3a.endpoint");
+    if (endpoint != null && !endpoint.isEmpty()) {
+      builder =
+          builder
+              .withEndpointConfiguration(
+                  new AwsClientBuilder.EndpointConfiguration(
+                      endpoint, conf.get(Constants.AWS_REGION)))
+              
.withPathStyleAccessEnabled(conf.getBoolean("fs.s3a.path.style.access", false));
+    } else {
+      builder = builder.withRegion(conf.get(Constants.AWS_REGION));
+    }
+    this.s3Client = builder.build();
     this.key = key;
   }
 
diff --git a/pom.xml b/pom.xml
index 0bd41fbff..371f9b2b2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -155,6 +155,7 @@
     <maven.plugin.silencer.version>1.7.19</maven.plugin.silencer.version>
     <maven.plugin.resources.version>3.3.1</maven.plugin.resources.version>
     <openapi.generator.version>7.8.0</openapi.generator.version>
+    <testcontainers-minio.version>1.21.4</testcontainers-minio.version>
 
     <!-- Allow modules to enable / disable certain build plugins easily. -->
     <testJarPhase>prepare-package</testJarPhase>
@@ -525,6 +526,12 @@
         <version>${jakarta.ws.rs-api.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.testcontainers</groupId>
+        <artifactId>minio</artifactId>
+        <version>${testcontainers-minio.version}</version>
+      </dependency>
+
       <dependency>
         <groupId>org.openapitools</groupId>
         <artifactId>jackson-databind-nullable</artifactId>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index d303a3439..107edabac 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -90,6 +90,9 @@ object Dependencies {
   val picocliVersion = "4.7.6"
   val jmhVersion = "1.37"
 
+  // S3 integration tests with Minio
+  val testContainerMinioVersion = "1.21.4"
+
   // For SSL support
   val bouncycastleVersion = "1.77"
 
@@ -213,6 +216,8 @@ object Dependencies {
   val jakartaAnnotationApi = "jakarta.annotation" % "jakarta.annotation-api" % 
jakartaAnnotationApiVersion
   val jakartaWsRsApi = "jakarta.ws.rs" % "jakarta.ws.rs-api" % 
jakartaWsRsApiVersion
 
+  val testContainerMinio = "org.testcontainers" % "minio" %  
testContainerMinioVersion
+
   // Test dependencies
   // https://www.scala-sbt.org/1.x/docs/Testing.html
   val junitInterface = "com.github.sbt" % "junit-interface" % 
junitInterfaceVersion
@@ -425,7 +430,8 @@ object CelebornCommonSettings {
     Dependencies.scalatest % "test",
     Dependencies.junit % "test",
     // https://www.scala-sbt.org/1.x/docs/Testing.html
-    Dependencies.junitInterface % "test")
+    Dependencies.junitInterface % "test",
+    Dependencies.testContainerMinio  % "test")
 }
 
 object CelebornBuild extends sbt.internal.BuildDef {
diff --git a/tests/spark-it/pom.xml b/tests/spark-it/pom.xml
index c0a37ea6c..a87594f5c 100644
--- a/tests/spark-it/pom.xml
+++ b/tests/spark-it/pom.xml
@@ -182,6 +182,11 @@
       <artifactId>jakarta.servlet-api</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>minio</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <profiles>
     <profile>
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/s3/BasicEndToEndTieredStorageTest.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/s3/BasicEndToEndTieredStorageTest.scala
new file mode 100644
index 000000000..f65b00c4f
--- /dev/null
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/s3/BasicEndToEndTieredStorageTest.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.tests.spark
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.funsuite.AnyFunSuite
+import org.testcontainers.containers.MinIOContainer
+
+import org.apache.celeborn.client.ShuffleClient
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.ShuffleMode
+
+class BasicEndToEndTieredStorageTest extends AnyFunSuite
+  with SparkTestBase
+  with BeforeAndAfterEach {
+
+  var container: MinIOContainer = null;
+  val skipAWSTest = !isClassPresent("org.apache.hadoop.fs.s3a.S3AFileSystem")
+
+  def isClassPresent(className: String): Boolean = {
+    try {
+      Class.forName(className)
+      true
+    } catch {
+      case _: ClassNotFoundException => false
+    }
+  }
+
+  override def beforeAll(): Unit = {
+
+    if (skipAWSTest)
+      return
+
+    container = new MinIOContainer("minio/minio:RELEASE.2023-09-04T19-57-37Z");
+    container.start()
+
+    // create bucket using Minio command line tool
+    container.execInContainer(
+      "mc",
+      "alias",
+      "set",
+      "dockerminio",
+      "http://minio:9000";,
+      container.getUserName,
+      container.getPassword)
+    container.execInContainer("mc", "mb", "dockerminio/sample-bucket")
+
+    System.setProperty("aws.accessKeyId", container.getUserName)
+    System.setProperty("aws.secretKey", container.getPassword)
+
+    val s3url = container.getS3URL
+    val augmentedConfiguration = Map(
+      CelebornConf.ACTIVE_STORAGE_TYPES.key -> "MEMORY,S3",
+      CelebornConf.WORKER_STORAGE_CREATE_FILE_POLICY.key -> "MEMORY,S3",
+      CelebornConf.WORKER_STORAGE_EVICT_POLICY.key -> "MEMORY|S3",
+      "celeborn.hadoop.fs.s3a.endpoint" -> s"$s3url",
+      "celeborn.hadoop.fs.s3a.aws.credentials.provider" -> 
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
+      "celeborn.hadoop.fs.s3a.access.key" -> container.getUserName,
+      "celeborn.hadoop.fs.s3a.secret.key" -> container.getPassword,
+      "celeborn.hadoop.fs.s3a.path.style.access" -> "true",
+      CelebornConf.S3_DIR.key -> "s3://sample-bucket/test/celeborn",
+      CelebornConf.S3_ENDPOINT_REGION.key -> "dummy-region")
+
+    setupMiniClusterWithRandomPorts(
+      masterConf = augmentedConfiguration,
+      workerConf = augmentedConfiguration,
+      workerNum = 1)
+  }
+
+  override def beforeEach(): Unit = {
+    ShuffleClient.reset()
+  }
+
+  override def afterAll(): Unit = {
+    System.clearProperty("aws.accessKeyId")
+    System.clearProperty("aws.secretKey")
+    if (container != null) {
+      container.close()
+      super.afterAll()
+    }
+  }
+
+  override def updateSparkConf(sparkConf: SparkConf, mode: ShuffleMode): 
SparkConf = {
+    val s3url = container.getS3URL
+    val newConf = sparkConf
+      .set("spark." + CelebornConf.ACTIVE_STORAGE_TYPES.key, "MEMORY,S3")
+      .set("spark." + CelebornConf.S3_DIR.key, 
"s3://sample-bucket/test/celeborn")
+      .set("spark." + CelebornConf.S3_ENDPOINT_REGION.key, "dummy-region")
+      .set("spark.celeborn.hadoop.fs.s3a.endpoint", s"$s3url")
+      .set(
+        "spark.celeborn.hadoop.fs.s3a.aws.credentials.provider",
+        "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
+      .set("spark.celeborn.hadoop.fs.s3a.access.key", container.getUserName)
+      .set("spark.celeborn.hadoop.fs.s3a.secret.key", container.getPassword)
+      .set("spark.celeborn.hadoop.fs.s3a.path.style.access", "true")
+
+    super.updateSparkConf(newConf, mode)
+  }
+
+  test("celeborn spark integration test - s3") {
+    assume(
+      !skipAWSTest,
+      "Skipping test because AWS Hadoop client is not in the classpath (enable 
with -Paws")
+
+    val s3url = container.getS3URL
+    log.info(s"s3url $s3url");
+    val sparkConf = new 
SparkConf().setAppName("celeborn-demo").setMaster("local[2]")
+    val celebornSparkSession = SparkSession.builder()
+      .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
+      .getOrCreate()
+    groupBy(celebornSparkSession)
+
+    celebornSparkSession.stop()
+  }
+
+}
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index a0b46dd6d..17b725b2d 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -43,6 +43,7 @@ import org.apache.celeborn.common.meta.{DeviceInfo, 
DiskFileInfo, DiskInfo, Disk
 import org.apache.celeborn.common.metrics.source.{AbstractSource, 
ThreadPoolSource}
 import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf}
 import org.apache.celeborn.common.protocol.{PartitionLocation, 
PartitionSplitMode, PartitionType, StorageInfo}
+import org.apache.celeborn.common.protocol.StorageInfo.Type
 import org.apache.celeborn.common.quota.ResourceConsumption
 import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils, 
CollectionUtils, DiskUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
 import org.apache.celeborn.service.deploy.worker._
@@ -1081,6 +1082,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       partitionType: PartitionType,
       partitionSplitEnabled: Boolean): (Flusher, DiskFileInfo, File) = {
     val suggestedMountPoint = location.getStorageInfo.getMountPoint
+    val storageType = location.getStorageInfo.getType
     var retryCount = 0
     var exception: IOException = null
     val shuffleKey = Utils.makeShuffleKey(appId, shuffleId)
@@ -1102,7 +1104,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
         throw new IOException(s"No available disks! suggested mountPoint 
$suggestedMountPoint")
       }
 
-      if (dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) {
+      if (storageType == Type.HDFS && location.getStorageInfo.HDFSAvailable()) 
{
         val shuffleDir =
           new Path(new Path(hdfsDir, conf.workerWorkingDir), 
s"$appId/$shuffleId")
         FileSystem.mkdirs(
@@ -1120,9 +1122,10 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
           fileName,
           hdfsFileInfo)
         return (hdfsFlusher.get, hdfsFileInfo, null)
-      } else if (dirs.isEmpty && location.getStorageInfo.S3Available()) {
+      } else if (storageType == Type.S3 && 
location.getStorageInfo.S3Available()) {
         val shuffleDir =
           new Path(new Path(s3Dir, conf.workerWorkingDir), 
s"$appId/$shuffleId")
+        logDebug(s"trying to create S3 file at $shuffleDir");
         FileSystem.mkdirs(
           StorageManager.hadoopFs.get(StorageInfo.Type.S3),
           shuffleDir,
@@ -1138,7 +1141,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
           fileName,
           s3FileInfo)
         return (s3Flusher.get, s3FileInfo, null)
-      } else if (dirs.isEmpty && location.getStorageInfo.OSSAvailable()) {
+      } else if (storageType == Type.OSS && 
location.getStorageInfo.OSSAvailable()) {
         val shuffleDir =
           new Path(new Path(ossDir, conf.workerWorkingDir), 
s"$appId/$shuffleId")
         FileSystem.mkdirs(
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
index 08a1d0851..028e54bd7 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
@@ -160,6 +160,8 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
                   storageManager)
               }
             } else {
+              logError(
+                s"CANNOT create non-memory file for 
${partitionDataWriterContext.getShuffleKey} 
${partitionDataWriterContext.getPartitionLocation.getFileName} because 
localOrDfsStorageAvailable is false")
               null
             }
         }

Reply via email to