This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 1053129ea [CELEBORN-1490][CIP-6] Introduce tier factory and master
agent in flink hybrid shuffle
1053129ea is described below
commit 1053129ea41a5ed2e5e809e53d4f1ba970bd849a
Author: Weijie Guo <[email protected]>
AuthorDate: Fri Sep 13 23:13:01 2024 +0800
[CELEBORN-1490][CIP-6] Introduce tier factory and master agent in flink
hybrid shuffle
### What changes were proposed in this pull request?
Implement `TieredFactory` interface to adapt Flink hybrid shuffle and also
introduce `CelebornTierMasterAgent`.
Note: Only the last commit need review in this PR.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No need.
Closes #2721 from reswqa/cip6-4-pr.
Authored-by: Weijie Guo <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../plugin/flink/FlinkResultPartitionInfo.java | 35 +++-
.../plugin/flink/tiered/CelebornTierFactory.java | 120 +++++++++++
.../flink/tiered/CelebornTierMasterAgent.java | 222 +++++++++++++++++++++
.../flink/tiered/TierShuffleDescriptorImpl.java | 72 +++++++
4 files changed, 438 insertions(+), 11 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/FlinkResultPartitionInfo.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/FlinkResultPartitionInfo.java
index a8591d3e2..596c1ee5c 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/FlinkResultPartitionInfo.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/FlinkResultPartitionInfo.java
@@ -18,7 +18,9 @@
package org.apache.celeborn.plugin.flink;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
@@ -26,39 +28,50 @@ import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
public class FlinkResultPartitionInfo {
private final JobID jobID;
- private final PartitionDescriptor partitionDescriptor;
- private final ProducerDescriptor producerDescriptor;
+ private final ResultPartitionID resultPartitionId;
+ private final IntermediateResultPartitionID partitionId;
+ private final ExecutionAttemptID producerId;
+
+ public FlinkResultPartitionInfo(JobID jobID, ResultPartitionID
resultPartitionId) {
+ this.jobID = jobID;
+ this.resultPartitionId = resultPartitionId;
+ this.partitionId = resultPartitionId.getPartitionId();
+ this.producerId = resultPartitionId.getProducerId();
+ }
public FlinkResultPartitionInfo(
JobID jobID, PartitionDescriptor partitionDescriptor, ProducerDescriptor
producerDescriptor) {
this.jobID = jobID;
- this.partitionDescriptor = partitionDescriptor;
- this.producerDescriptor = producerDescriptor;
+ this.resultPartitionId =
+ new ResultPartitionID(
+ partitionDescriptor.getPartitionId(),
producerDescriptor.getProducerExecutionId());
+ this.partitionId = partitionDescriptor.getPartitionId();
+ this.producerId = producerDescriptor.getProducerExecutionId();
}
public ResultPartitionID getResultPartitionId() {
- return new ResultPartitionID(
- partitionDescriptor.getPartitionId(),
producerDescriptor.getProducerExecutionId());
+ return resultPartitionId;
}
public String getShuffleId() {
- return FlinkUtils.toShuffleId(jobID, partitionDescriptor.getResultId());
+ return FlinkUtils.toShuffleId(jobID,
partitionId.getIntermediateDataSetID());
}
public int getTaskId() {
- return partitionDescriptor.getPartitionId().getPartitionNumber();
+ return partitionId.getPartitionNumber();
}
public String getAttemptId() {
- return FlinkUtils.toAttemptId(producerDescriptor.getProducerExecutionId());
+ return FlinkUtils.toAttemptId(producerId);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("FlinkResultPartitionInfo{");
sb.append("jobID=").append(jobID);
- sb.append(",
partitionDescriptor=").append(partitionDescriptor.getPartitionId());
- sb.append(",
producerDescriptor=").append(producerDescriptor.getProducerExecutionId());
+ sb.append(", resultPartitionId=").append(resultPartitionId);
+ sb.append(", partitionId=").append(partitionId);
+ sb.append(", producerId=").append(producerId);
sb.append('}');
return sb.toString();
}
diff --git
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java
new file mode 100644
index 000000000..326a11985
--- /dev/null
+++
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.tiered;
+
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemorySpec;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
+
+/**
+ * The factory class of the Celeborn client, used as a tier of flink hybrid
shuffle tiered storage.
+ */
+public class CelebornTierFactory implements TierFactory {
+
+ private CelebornConf conf;
+
+ /**
+ * The bytes size of a single buffer, default value is 32KB, it will be set
according to the flink
+ * configuration in {@link CelebornTierFactory#setup}.
+ */
+ private int bufferSizeBytes = -1;
+
+ /**
+ * The max bytes size of a single segment, it will determine how many buffer
can save in a single
+ * segment.
+ */
+ private static int NUM_BYTES_PER_SEGMENT = 8 * 1024 * 1024;
+
+ private static final String CELEBORN_TIER_NAME =
CelebornTierFactory.class.getSimpleName();
+
+ @Override
+ public void setup(Configuration configuration) {
+ conf = FlinkUtils.toCelebornConf(configuration);
+ this.bufferSizeBytes = ConfigurationParserUtils.getPageSize(configuration);
+ }
+
+ @Override
+ public TieredStorageMemorySpec getMasterAgentMemorySpec() {
+ return new TieredStorageMemorySpec(getCelebornTierName(), 0);
+ }
+
+ @Override
+ public TieredStorageMemorySpec getProducerAgentMemorySpec() {
+ return new TieredStorageMemorySpec(getCelebornTierName(), 1);
+ }
+
+ @Override
+ public TieredStorageMemorySpec getConsumerAgentMemorySpec() {
+ return new TieredStorageMemorySpec(getCelebornTierName(), 0);
+ }
+
+ @Override
+ public TierMasterAgent createMasterAgent(
+ TieredStorageResourceRegistry tieredStorageResourceRegistry) {
+ return new CelebornTierMasterAgent(conf);
+ }
+
+ @Override
+ public TierProducerAgent createProducerAgent(
+ int numPartitions,
+ int numSubpartitions,
+ TieredStoragePartitionId partitionId,
+ String dataFileBasePath,
+ boolean isBroadcastOnly,
+ TieredStorageMemoryManager storageMemoryManager,
+ TieredStorageNettyService nettyService,
+ TieredStorageResourceRegistry resourceRegistry,
+ BatchShuffleReadBufferPool bufferPool,
+ ScheduledExecutorService ioExecutor,
+ List<TierShuffleDescriptor> shuffleDescriptors,
+ int maxRequestedBuffers) {
+ // TODO impl this in the follow-up PR.
+ return null;
+ }
+
+ @Override
+ public TierConsumerAgent createConsumerAgent(
+ List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs,
+ List<TierShuffleDescriptor> shuffleDescriptors,
+ TieredStorageNettyService nettyService) {
+ // TODO impl this in the follow-up PR.
+ return null;
+ }
+
+ public static String getCelebornTierName() {
+ return CELEBORN_TIER_NAME;
+ }
+}
diff --git
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java
new file mode 100644
index 000000000..fe10a889a
--- /dev/null
+++
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.tiered;
+
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleHandler;
+import org.apache.flink.runtime.shuffle.JobShuffleContext;
+import org.apache.flink.runtime.shuffle.PartitionWithMetrics;
+import org.apache.flink.util.ExecutorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.LifecycleManager;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.util.JavaUtils;
+import org.apache.celeborn.common.util.ThreadUtils;
+import org.apache.celeborn.plugin.flink.FlinkResultPartitionInfo;
+import org.apache.celeborn.plugin.flink.RemoteShuffleResource;
+import org.apache.celeborn.plugin.flink.ShuffleResourceDescriptor;
+import org.apache.celeborn.plugin.flink.ShuffleResourceTracker;
+import org.apache.celeborn.plugin.flink.ShuffleTaskInfo;
+import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
+
+public class CelebornTierMasterAgent implements TierMasterAgent {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CelebornTierMasterAgent.class);
+
+ // Flink JobId -> Celeborn register shuffleIds
+ private final Map<JobID, Set<Integer>> jobShuffleIds =
JavaUtils.newConcurrentHashMap();
+
+ private final ShuffleTaskInfo shuffleTaskInfo = new ShuffleTaskInfo();
+
+ private final ScheduledExecutorService executor =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-client-tier-master-executor");
+ private final long lifecycleManagerTimestamp;
+
+ private final CelebornConf conf;
+
+ private ShuffleResourceTracker shuffleResourceTracker;
+
+ private String celebornAppId;
+
+ private volatile LifecycleManager lifecycleManager;
+
+ public CelebornTierMasterAgent(CelebornConf conf) {
+ this.conf = conf;
+ this.lifecycleManagerTimestamp = System.currentTimeMillis();
+ }
+
+ @Override
+ public void registerJob(JobID jobID, TierShuffleHandler tierShuffleHandler) {
+ if (lifecycleManager == null) {
+ synchronized (CelebornTierMasterAgent.class) {
+ if (lifecycleManager == null) {
+ celebornAppId =
FlinkUtils.toCelebornAppId(lifecycleManagerTimestamp, jobID);
+ LOG.info("CelebornAppId: {}", celebornAppId);
+ // The default value of this config option is false. If set to true,
Celeborn will use
+ // local allocated workers as candidate being checked workers, this
is more useful for
+ // map partition to regenerate the lost data. So if not set, set to
true as default for
+ // flink.
+
conf.setIfMissing(CelebornConf.CLIENT_CHECKED_USE_ALLOCATED_WORKERS(), true);
+ lifecycleManager = new LifecycleManager(celebornAppId, conf);
+ this.shuffleResourceTracker = new ShuffleResourceTracker(executor,
lifecycleManager);
+ }
+ }
+ }
+
+ Set<Integer> previousShuffleIds = jobShuffleIds.putIfAbsent(jobID, new
HashSet<>());
+ LOG.info("Register job: {}.", jobID);
+ if (previousShuffleIds != null) {
+ throw new RuntimeException("Duplicated registration job: " + jobID);
+ }
+ shuffleResourceTracker.registerJob(getJobShuffleContext(jobID,
tierShuffleHandler));
+ }
+
+ @Override
+ public void unregisterJob(JobID jobID) {
+ LOG.info("Unregister job: {}.", jobID);
+ Set<Integer> shuffleIds = jobShuffleIds.remove(jobID);
+ if (shuffleIds != null) {
+ executor.execute(
+ () -> {
+ for (Integer shuffleId : shuffleIds) {
+ lifecycleManager.unregisterShuffle(shuffleId);
+ shuffleTaskInfo.removeExpiredShuffle(shuffleId);
+ }
+ shuffleResourceTracker.unRegisterJob(jobID);
+ });
+ }
+ }
+
+ @Override
+ public TierShuffleDescriptor addPartitionAndGetShuffleDescriptor(
+ JobID jobID, ResultPartitionID resultPartitionID) {
+ FlinkResultPartitionInfo resultPartitionInfo =
+ new FlinkResultPartitionInfo(jobID, resultPartitionID);
+ ShuffleResourceDescriptor shuffleResourceDescriptor =
+ shuffleTaskInfo.genShuffleResourceDescriptor(
+ resultPartitionInfo.getShuffleId(),
+ resultPartitionInfo.getTaskId(),
+ resultPartitionInfo.getAttemptId());
+ Set<Integer> shuffleIds = jobShuffleIds.get(jobID);
+ if (shuffleIds == null) {
+ throw new RuntimeException("Can not find job in master agent, job: " +
jobID);
+ }
+ shuffleIds.add(shuffleResourceDescriptor.getShuffleId());
+ shuffleResourceTracker.addPartitionResource(
+ jobID,
+ shuffleResourceDescriptor.getShuffleId(),
+ shuffleResourceDescriptor.getPartitionId(),
+ resultPartitionID);
+
+ RemoteShuffleResource remoteShuffleResource =
+ new RemoteShuffleResource(
+ lifecycleManager.getHost(),
+ lifecycleManager.getPort(),
+ lifecycleManagerTimestamp,
+ shuffleResourceDescriptor);
+ return new TierShuffleDescriptorImpl(
+ celebornAppId,
+ jobID,
+ resultPartitionInfo.getShuffleId(),
+ resultPartitionID,
+ remoteShuffleResource);
+ }
+
+ @Override
+ public void releasePartition(TierShuffleDescriptor shuffleDescriptor) {
+ checkState(shuffleDescriptor instanceof TierShuffleDescriptorImpl, "Wrong
descriptor type.");
+ try {
+ TierShuffleDescriptorImpl descriptor = (TierShuffleDescriptorImpl)
shuffleDescriptor;
+ RemoteShuffleResource shuffleResource = descriptor.getShuffleResource();
+ ShuffleResourceDescriptor resourceDescriptor =
+ shuffleResource.getMapPartitionShuffleDescriptor();
+ LOG.debug("release partition resource: {}.", resourceDescriptor);
+ lifecycleManager.releasePartition(
+ resourceDescriptor.getShuffleId(),
resourceDescriptor.getPartitionId());
+ shuffleResourceTracker.removePartitionResource(
+ descriptor.getJobId(),
+ resourceDescriptor.getShuffleId(),
+ resourceDescriptor.getPartitionId());
+ } catch (Throwable throwable) {
+ LOG.debug("Failed to release data partition {}.", shuffleDescriptor,
throwable);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ jobShuffleIds.clear();
+ lifecycleManager.stop();
+ } catch (Exception e) {
+ LOG.warn("Encounter exception when shutdown: {}", e.getMessage(), e);
+ }
+
+ ExecutorUtils.gracefulShutdown(10, TimeUnit.SECONDS, executor);
+ }
+
+ private JobShuffleContext getJobShuffleContext(
+ JobID jobID, TierShuffleHandler tierShuffleHandler) {
+ return new JobShuffleContext() {
+ @Override
+ public JobID getJobId() {
+ return jobID;
+ }
+
+ @Override
+ public CompletableFuture<?> stopTrackingAndReleasePartitions(
+ Collection<ResultPartitionID> resultPartitionIds) {
+ return tierShuffleHandler.onReleasePartitions(
+ resultPartitionIds.stream()
+ .map(TieredStorageIdMappingUtils::convertId)
+ .collect(Collectors.toList()));
+ }
+
+ @Override
+ public CompletableFuture<Collection<PartitionWithMetrics>>
getPartitionWithMetrics(
+ Duration duration, Set<ResultPartitionID> set) {
+ // TODO we should impl this when we support JM failover.
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+
+ @Override
+ public void notifyPartitionRecoveryStarted() {
+ // TODO we should impl this when we support JM failover.
+ }
+ };
+ }
+}
diff --git
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/TierShuffleDescriptorImpl.java
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/TierShuffleDescriptorImpl.java
new file mode 100644
index 000000000..27450edc8
--- /dev/null
+++
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/TierShuffleDescriptorImpl.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.tiered;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
+
+import org.apache.celeborn.plugin.flink.RemoteShuffleDescriptor;
+import org.apache.celeborn.plugin.flink.RemoteShuffleResource;
+
+/**
+ * Wrap the {@link RemoteShuffleDescriptor} to implement {@link
TierShuffleDescriptor} interface.
+ */
+public class TierShuffleDescriptorImpl implements TierShuffleDescriptor {
+
+ private final RemoteShuffleDescriptor remoteShuffleDescriptor;
+
+ public TierShuffleDescriptorImpl(
+ String celebornAppId,
+ JobID jobId,
+ String shuffleId,
+ ResultPartitionID resultPartitionID,
+ RemoteShuffleResource shuffleResource) {
+ this.remoteShuffleDescriptor =
+ new RemoteShuffleDescriptor(
+ celebornAppId, jobId, shuffleId, resultPartitionID,
shuffleResource);
+ }
+
+ public ResultPartitionID getResultPartitionID() {
+ return remoteShuffleDescriptor.getResultPartitionID();
+ }
+
+ public String getCelebornAppId() {
+ return remoteShuffleDescriptor.getCelebornAppId();
+ }
+
+ public JobID getJobId() {
+ return remoteShuffleDescriptor.getJobId();
+ }
+
+ public String getShuffleId() {
+ return remoteShuffleDescriptor.getShuffleId();
+ }
+
+ public RemoteShuffleResource getShuffleResource() {
+ return remoteShuffleDescriptor.getShuffleResource();
+ }
+
+ @Override
+ public String toString() {
+ return "TierShuffleDescriptorImpl{"
+ + "remoteShuffleDescriptor="
+ + remoteShuffleDescriptor
+ + '}';
+ }
+}