This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 1053129ea [CELEBORN-1490][CIP-6] Introduce tier factory and master 
agent in flink hybrid shuffle
1053129ea is described below

commit 1053129ea41a5ed2e5e809e53d4f1ba970bd849a
Author: Weijie Guo <[email protected]>
AuthorDate: Fri Sep 13 23:13:01 2024 +0800

    [CELEBORN-1490][CIP-6] Introduce tier factory and master agent in flink 
hybrid shuffle
    
    ### What changes were proposed in this pull request?
    
    Implement `TieredFactory` interface to adapt Flink hybrid shuffle and also 
introduce `CelebornTierMasterAgent`.
    
    Note: Only the last commit need review in this PR.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    No need.
    
    Closes #2721 from reswqa/cip6-4-pr.
    
    Authored-by: Weijie Guo <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../plugin/flink/FlinkResultPartitionInfo.java     |  35 +++-
 .../plugin/flink/tiered/CelebornTierFactory.java   | 120 +++++++++++
 .../flink/tiered/CelebornTierMasterAgent.java      | 222 +++++++++++++++++++++
 .../flink/tiered/TierShuffleDescriptorImpl.java    |  72 +++++++
 4 files changed, 438 insertions(+), 11 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/FlinkResultPartitionInfo.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/FlinkResultPartitionInfo.java
index a8591d3e2..596c1ee5c 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/FlinkResultPartitionInfo.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/FlinkResultPartitionInfo.java
@@ -18,7 +18,9 @@
 package org.apache.celeborn.plugin.flink;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ProducerDescriptor;
 
@@ -26,39 +28,50 @@ import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
 
 public class FlinkResultPartitionInfo {
   private final JobID jobID;
-  private final PartitionDescriptor partitionDescriptor;
-  private final ProducerDescriptor producerDescriptor;
+  private final ResultPartitionID resultPartitionId;
+  private final IntermediateResultPartitionID partitionId;
+  private final ExecutionAttemptID producerId;
+
+  public FlinkResultPartitionInfo(JobID jobID, ResultPartitionID 
resultPartitionId) {
+    this.jobID = jobID;
+    this.resultPartitionId = resultPartitionId;
+    this.partitionId = resultPartitionId.getPartitionId();
+    this.producerId = resultPartitionId.getProducerId();
+  }
 
   public FlinkResultPartitionInfo(
       JobID jobID, PartitionDescriptor partitionDescriptor, ProducerDescriptor 
producerDescriptor) {
     this.jobID = jobID;
-    this.partitionDescriptor = partitionDescriptor;
-    this.producerDescriptor = producerDescriptor;
+    this.resultPartitionId =
+        new ResultPartitionID(
+            partitionDescriptor.getPartitionId(), 
producerDescriptor.getProducerExecutionId());
+    this.partitionId = partitionDescriptor.getPartitionId();
+    this.producerId = producerDescriptor.getProducerExecutionId();
   }
 
   public ResultPartitionID getResultPartitionId() {
-    return new ResultPartitionID(
-        partitionDescriptor.getPartitionId(), 
producerDescriptor.getProducerExecutionId());
+    return resultPartitionId;
   }
 
   public String getShuffleId() {
-    return FlinkUtils.toShuffleId(jobID, partitionDescriptor.getResultId());
+    return FlinkUtils.toShuffleId(jobID, 
partitionId.getIntermediateDataSetID());
   }
 
   public int getTaskId() {
-    return partitionDescriptor.getPartitionId().getPartitionNumber();
+    return partitionId.getPartitionNumber();
   }
 
   public String getAttemptId() {
-    return FlinkUtils.toAttemptId(producerDescriptor.getProducerExecutionId());
+    return FlinkUtils.toAttemptId(producerId);
   }
 
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder("FlinkResultPartitionInfo{");
     sb.append("jobID=").append(jobID);
-    sb.append(", 
partitionDescriptor=").append(partitionDescriptor.getPartitionId());
-    sb.append(", 
producerDescriptor=").append(producerDescriptor.getProducerExecutionId());
+    sb.append(", resultPartitionId=").append(resultPartitionId);
+    sb.append(", partitionId=").append(partitionId);
+    sb.append(", producerId=").append(producerId);
     sb.append('}');
     return sb.toString();
   }
diff --git 
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java
 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java
new file mode 100644
index 000000000..326a11985
--- /dev/null
+++ 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.tiered;
+
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemorySpec;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
+
+/**
+ * The factory class of the Celeborn client, used as a tier of flink hybrid 
shuffle tiered storage.
+ */
+public class CelebornTierFactory implements TierFactory {
+
+  private CelebornConf conf;
+
+  /**
+   * The bytes size of a single buffer, default value is 32KB, it will be set 
according to the flink
+   * configuration in {@link CelebornTierFactory#setup}.
+   */
+  private int bufferSizeBytes = -1;
+
+  /**
+   * The max bytes size of a single segment, it will determine how many buffer 
can save in a single
+   * segment.
+   */
+  private static int NUM_BYTES_PER_SEGMENT = 8 * 1024 * 1024;
+
+  private static final String CELEBORN_TIER_NAME = 
CelebornTierFactory.class.getSimpleName();
+
+  @Override
+  public void setup(Configuration configuration) {
+    conf = FlinkUtils.toCelebornConf(configuration);
+    this.bufferSizeBytes = ConfigurationParserUtils.getPageSize(configuration);
+  }
+
+  @Override
+  public TieredStorageMemorySpec getMasterAgentMemorySpec() {
+    return new TieredStorageMemorySpec(getCelebornTierName(), 0);
+  }
+
+  @Override
+  public TieredStorageMemorySpec getProducerAgentMemorySpec() {
+    return new TieredStorageMemorySpec(getCelebornTierName(), 1);
+  }
+
+  @Override
+  public TieredStorageMemorySpec getConsumerAgentMemorySpec() {
+    return new TieredStorageMemorySpec(getCelebornTierName(), 0);
+  }
+
+  @Override
+  public TierMasterAgent createMasterAgent(
+      TieredStorageResourceRegistry tieredStorageResourceRegistry) {
+    return new CelebornTierMasterAgent(conf);
+  }
+
+  @Override
+  public TierProducerAgent createProducerAgent(
+      int numPartitions,
+      int numSubpartitions,
+      TieredStoragePartitionId partitionId,
+      String dataFileBasePath,
+      boolean isBroadcastOnly,
+      TieredStorageMemoryManager storageMemoryManager,
+      TieredStorageNettyService nettyService,
+      TieredStorageResourceRegistry resourceRegistry,
+      BatchShuffleReadBufferPool bufferPool,
+      ScheduledExecutorService ioExecutor,
+      List<TierShuffleDescriptor> shuffleDescriptors,
+      int maxRequestedBuffers) {
+    // TODO impl this in the follow-up PR.
+    return null;
+  }
+
+  @Override
+  public TierConsumerAgent createConsumerAgent(
+      List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs,
+      List<TierShuffleDescriptor> shuffleDescriptors,
+      TieredStorageNettyService nettyService) {
+    // TODO impl this in the follow-up PR.
+    return null;
+  }
+
+  public static String getCelebornTierName() {
+    return CELEBORN_TIER_NAME;
+  }
+}
diff --git 
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java
 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java
new file mode 100644
index 000000000..fe10a889a
--- /dev/null
+++ 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.tiered;
+
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleHandler;
+import org.apache.flink.runtime.shuffle.JobShuffleContext;
+import org.apache.flink.runtime.shuffle.PartitionWithMetrics;
+import org.apache.flink.util.ExecutorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.LifecycleManager;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.util.JavaUtils;
+import org.apache.celeborn.common.util.ThreadUtils;
+import org.apache.celeborn.plugin.flink.FlinkResultPartitionInfo;
+import org.apache.celeborn.plugin.flink.RemoteShuffleResource;
+import org.apache.celeborn.plugin.flink.ShuffleResourceDescriptor;
+import org.apache.celeborn.plugin.flink.ShuffleResourceTracker;
+import org.apache.celeborn.plugin.flink.ShuffleTaskInfo;
+import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
+
+public class CelebornTierMasterAgent implements TierMasterAgent {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CelebornTierMasterAgent.class);
+
+  // Flink JobId -> Celeborn register shuffleIds
+  private final Map<JobID, Set<Integer>> jobShuffleIds = 
JavaUtils.newConcurrentHashMap();
+
+  private final ShuffleTaskInfo shuffleTaskInfo = new ShuffleTaskInfo();
+
+  private final ScheduledExecutorService executor =
+      
ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-client-tier-master-executor");
+  private final long lifecycleManagerTimestamp;
+
+  private final CelebornConf conf;
+
+  private ShuffleResourceTracker shuffleResourceTracker;
+
+  private String celebornAppId;
+
+  private volatile LifecycleManager lifecycleManager;
+
+  public CelebornTierMasterAgent(CelebornConf conf) {
+    this.conf = conf;
+    this.lifecycleManagerTimestamp = System.currentTimeMillis();
+  }
+
+  @Override
+  public void registerJob(JobID jobID, TierShuffleHandler tierShuffleHandler) {
+    if (lifecycleManager == null) {
+      synchronized (CelebornTierMasterAgent.class) {
+        if (lifecycleManager == null) {
+          celebornAppId = 
FlinkUtils.toCelebornAppId(lifecycleManagerTimestamp, jobID);
+          LOG.info("CelebornAppId: {}", celebornAppId);
+          // The default value of this config option is false. If set to true, 
Celeborn will use
+          // local allocated workers as candidate being checked workers, this 
is more useful for
+          // map partition to regenerate the lost data. So if not set, set to 
true as default for
+          // flink.
+          
conf.setIfMissing(CelebornConf.CLIENT_CHECKED_USE_ALLOCATED_WORKERS(), true);
+          lifecycleManager = new LifecycleManager(celebornAppId, conf);
+          this.shuffleResourceTracker = new ShuffleResourceTracker(executor, 
lifecycleManager);
+        }
+      }
+    }
+
+    Set<Integer> previousShuffleIds = jobShuffleIds.putIfAbsent(jobID, new 
HashSet<>());
+    LOG.info("Register job: {}.", jobID);
+    if (previousShuffleIds != null) {
+      throw new RuntimeException("Duplicated registration job: " + jobID);
+    }
+    shuffleResourceTracker.registerJob(getJobShuffleContext(jobID, 
tierShuffleHandler));
+  }
+
+  @Override
+  public void unregisterJob(JobID jobID) {
+    LOG.info("Unregister job: {}.", jobID);
+    Set<Integer> shuffleIds = jobShuffleIds.remove(jobID);
+    if (shuffleIds != null) {
+      executor.execute(
+          () -> {
+            for (Integer shuffleId : shuffleIds) {
+              lifecycleManager.unregisterShuffle(shuffleId);
+              shuffleTaskInfo.removeExpiredShuffle(shuffleId);
+            }
+            shuffleResourceTracker.unRegisterJob(jobID);
+          });
+    }
+  }
+
+  @Override
+  public TierShuffleDescriptor addPartitionAndGetShuffleDescriptor(
+      JobID jobID, ResultPartitionID resultPartitionID) {
+    FlinkResultPartitionInfo resultPartitionInfo =
+        new FlinkResultPartitionInfo(jobID, resultPartitionID);
+    ShuffleResourceDescriptor shuffleResourceDescriptor =
+        shuffleTaskInfo.genShuffleResourceDescriptor(
+            resultPartitionInfo.getShuffleId(),
+            resultPartitionInfo.getTaskId(),
+            resultPartitionInfo.getAttemptId());
+    Set<Integer> shuffleIds = jobShuffleIds.get(jobID);
+    if (shuffleIds == null) {
+      throw new RuntimeException("Can not find job in master agent, job: " + 
jobID);
+    }
+    shuffleIds.add(shuffleResourceDescriptor.getShuffleId());
+    shuffleResourceTracker.addPartitionResource(
+        jobID,
+        shuffleResourceDescriptor.getShuffleId(),
+        shuffleResourceDescriptor.getPartitionId(),
+        resultPartitionID);
+
+    RemoteShuffleResource remoteShuffleResource =
+        new RemoteShuffleResource(
+            lifecycleManager.getHost(),
+            lifecycleManager.getPort(),
+            lifecycleManagerTimestamp,
+            shuffleResourceDescriptor);
+    return new TierShuffleDescriptorImpl(
+        celebornAppId,
+        jobID,
+        resultPartitionInfo.getShuffleId(),
+        resultPartitionID,
+        remoteShuffleResource);
+  }
+
+  @Override
+  public void releasePartition(TierShuffleDescriptor shuffleDescriptor) {
+    checkState(shuffleDescriptor instanceof TierShuffleDescriptorImpl, "Wrong 
descriptor type.");
+    try {
+      TierShuffleDescriptorImpl descriptor = (TierShuffleDescriptorImpl) 
shuffleDescriptor;
+      RemoteShuffleResource shuffleResource = descriptor.getShuffleResource();
+      ShuffleResourceDescriptor resourceDescriptor =
+          shuffleResource.getMapPartitionShuffleDescriptor();
+      LOG.debug("release partition resource: {}.", resourceDescriptor);
+      lifecycleManager.releasePartition(
+          resourceDescriptor.getShuffleId(), 
resourceDescriptor.getPartitionId());
+      shuffleResourceTracker.removePartitionResource(
+          descriptor.getJobId(),
+          resourceDescriptor.getShuffleId(),
+          resourceDescriptor.getPartitionId());
+    } catch (Throwable throwable) {
+      LOG.debug("Failed to release data partition {}.", shuffleDescriptor, 
throwable);
+    }
+  }
+
+  @Override
+  public void close() {
+    try {
+      jobShuffleIds.clear();
+      lifecycleManager.stop();
+    } catch (Exception e) {
+      LOG.warn("Encounter exception when shutdown: {}", e.getMessage(), e);
+    }
+
+    ExecutorUtils.gracefulShutdown(10, TimeUnit.SECONDS, executor);
+  }
+
+  private JobShuffleContext getJobShuffleContext(
+      JobID jobID, TierShuffleHandler tierShuffleHandler) {
+    return new JobShuffleContext() {
+      @Override
+      public JobID getJobId() {
+        return jobID;
+      }
+
+      @Override
+      public CompletableFuture<?> stopTrackingAndReleasePartitions(
+          Collection<ResultPartitionID> resultPartitionIds) {
+        return tierShuffleHandler.onReleasePartitions(
+            resultPartitionIds.stream()
+                .map(TieredStorageIdMappingUtils::convertId)
+                .collect(Collectors.toList()));
+      }
+
+      @Override
+      public CompletableFuture<Collection<PartitionWithMetrics>> 
getPartitionWithMetrics(
+          Duration duration, Set<ResultPartitionID> set) {
+        // TODO we should impl this when we support JM failover.
+        return CompletableFuture.completedFuture(Collections.emptyList());
+      }
+
+      @Override
+      public void notifyPartitionRecoveryStarted() {
+        // TODO we should impl this when we support JM failover.
+      }
+    };
+  }
+}
diff --git 
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/TierShuffleDescriptorImpl.java
 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/TierShuffleDescriptorImpl.java
new file mode 100644
index 000000000..27450edc8
--- /dev/null
+++ 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/TierShuffleDescriptorImpl.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.tiered;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
+
+import org.apache.celeborn.plugin.flink.RemoteShuffleDescriptor;
+import org.apache.celeborn.plugin.flink.RemoteShuffleResource;
+
+/**
+ * Wrap the {@link RemoteShuffleDescriptor} to implement {@link 
TierShuffleDescriptor} interface.
+ */
+public class TierShuffleDescriptorImpl implements TierShuffleDescriptor {
+
+  private final RemoteShuffleDescriptor remoteShuffleDescriptor;
+
+  public TierShuffleDescriptorImpl(
+      String celebornAppId,
+      JobID jobId,
+      String shuffleId,
+      ResultPartitionID resultPartitionID,
+      RemoteShuffleResource shuffleResource) {
+    this.remoteShuffleDescriptor =
+        new RemoteShuffleDescriptor(
+            celebornAppId, jobId, shuffleId, resultPartitionID, 
shuffleResource);
+  }
+
+  public ResultPartitionID getResultPartitionID() {
+    return remoteShuffleDescriptor.getResultPartitionID();
+  }
+
+  public String getCelebornAppId() {
+    return remoteShuffleDescriptor.getCelebornAppId();
+  }
+
+  public JobID getJobId() {
+    return remoteShuffleDescriptor.getJobId();
+  }
+
+  public String getShuffleId() {
+    return remoteShuffleDescriptor.getShuffleId();
+  }
+
+  public RemoteShuffleResource getShuffleResource() {
+    return remoteShuffleDescriptor.getShuffleResource();
+  }
+
+  @Override
+  public String toString() {
+    return "TierShuffleDescriptorImpl{"
+        + "remoteShuffleDescriptor="
+        + remoteShuffleDescriptor
+        + '}';
+  }
+}

Reply via email to