This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new f3f10487 [CELEBORN-75] Initialize flink plugin module (#1027)
f3f10487 is described below
commit f3f104870c91dec60c546278d3ac7dcb7c59a1d4
Author: Shuang <[email protected]>
AuthorDate: Wed Dec 7 15:53:00 2022 +0800
[CELEBORN-75] Initialize flink plugin module (#1027)
---
{client => client-flink/flink-1.14}/pom.xml | 48 ++----
.../plugin/flink/FlinkResultPartitionInfo.java | 52 ++++++
.../plugin/flink/RemoteShuffleDescriptor.java | 59 +++++++
.../celeborn/plugin/flink/RemoteShuffleMaster.java | 148 +++++++++++++++++
.../plugin/flink/RemoteShuffleResource.java | 50 ++++++
.../plugin/flink/RemoteShuffleServiceFactory.java | 40 +++++
.../celeborn/plugin/flink/ShuffleResource.java | 25 +++
.../plugin/flink/ShuffleResourceDescriptor.java | 67 ++++++++
.../celeborn/plugin/flink/utils/FlinkUtils.java | 55 ++++++
.../celeborn/plugin/flink/utils/ThreadUtils.java | 48 ++++++
.../plugin/flink/RemoteShuffleMasterTest.java | 185 +++++++++++++++++++++
client-flink/flink-shaded/pom.xml | 123 ++++++++++++++
client/pom.xml | 5 +
.../apache/celeborn/client/LifecycleManager.scala | 20 ++-
.../apache/celeborn/client/ShuffleTaskInfo.scala | 95 +++++++++++
.../celeborn/client/ShuffleTaskInfoSuite.scala | 50 ++++++
dev/reformat | 2 +
pom.xml | 45 ++++-
.../celeborn/tests/client/ShuffleClientSuite.scala | 2 +-
19 files changed, 1081 insertions(+), 38 deletions(-)
diff --git a/client/pom.xml b/client-flink/flink-1.14/pom.xml
similarity index 60%
copy from client/pom.xml
copy to client-flink/flink-1.14/pom.xml
index 45e8e7df..4aa5b01e 100644
--- a/client/pom.xml
+++ b/client-flink/flink-1.14/pom.xml
@@ -17,58 +17,36 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
-
<parent>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-parent_${scala.binary.version}</artifactId>
<version>${project.version}</version>
- <relativePath>../pom.xml</relativePath>
+ <relativePath>../../pom.xml</relativePath>
</parent>
- <artifactId>celeborn-client_${scala.binary.version}</artifactId>
+
<artifactId>celeborn-client-flink-${flink.version}_${scala.binary.version}</artifactId>
<packaging>jar</packaging>
- <name>Celeborn Client</name>
+ <name>Celeborn Client for flink</name>
<dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- <dependency>
- <groupId>org.lz4</groupId>
- <artifactId>lz4-java</artifactId>
- </dependency>
- <dependency>
- <groupId>com.github.luben</groupId>
- <artifactId>zstd-jni</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-slf4j-impl</artifactId>
- <scope>test</scope>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-client_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-1.2-api</artifactId>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
diff --git
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/FlinkResultPartitionInfo.java
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/FlinkResultPartitionInfo.java
new file mode 100644
index 00000000..588d0c7d
--- /dev/null
+++
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/FlinkResultPartitionInfo.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+
+import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
+
+public class FlinkResultPartitionInfo {
+ private final PartitionDescriptor partitionDescriptor;
+ private final ProducerDescriptor producerDescriptor;
+
+ public FlinkResultPartitionInfo(
+ PartitionDescriptor partitionDescriptor, ProducerDescriptor
producerDescriptor) {
+ this.partitionDescriptor = partitionDescriptor;
+ this.producerDescriptor = producerDescriptor;
+ }
+
+ public ResultPartitionID getResultPartitionId() {
+ return new ResultPartitionID(
+ partitionDescriptor.getPartitionId(),
producerDescriptor.getProducerExecutionId());
+ }
+
+ public String getShuffleId() {
+ return FlinkUtils.toShuffleId(partitionDescriptor.getResultId());
+ }
+
+ public int getTaskId() {
+ return partitionDescriptor.getPartitionId().getPartitionNumber();
+ }
+
+ public String getAttemptId() {
+ return FlinkUtils.toAttemptId(producerDescriptor.getProducerExecutionId());
+ }
+}
diff --git
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleDescriptor.java
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleDescriptor.java
new file mode 100644
index 00000000..12db3495
--- /dev/null
+++
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleDescriptor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.util.Optional;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+
+public class RemoteShuffleDescriptor implements ShuffleDescriptor {
+
+ private final ResultPartitionID resultPartitionID;
+
+ private final JobID jobID;
+
+ private final ShuffleResource shuffleResource;
+
+ public RemoteShuffleDescriptor(
+ JobID jobID, ResultPartitionID resultPartitionID, ShuffleResource
shuffleResource) {
+ this.jobID = jobID;
+ this.resultPartitionID = resultPartitionID;
+ this.shuffleResource = shuffleResource;
+ }
+
+ @Override
+ public ResultPartitionID getResultPartitionID() {
+ return resultPartitionID;
+ }
+
+ public JobID getJobID() {
+ return jobID;
+ }
+
+ public ShuffleResource getShuffleResource() {
+ return shuffleResource;
+ }
+
+ @Override
+ public Optional<ResourceID> storesLocalResourcesOn() {
+ return Optional.empty();
+ }
+}
diff --git
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
new file mode 100644
index 00000000..222c0516
--- /dev/null
+++
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.shuffle.JobShuffleContext;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.LifecycleManager;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
+import org.apache.celeborn.plugin.flink.utils.ThreadUtils;
+
+public class RemoteShuffleMaster implements
ShuffleMaster<RemoteShuffleDescriptor> {
+ private static final Logger LOG =
LoggerFactory.getLogger(RemoteShuffleMaster.class);
+
+ private final ShuffleMasterContext shuffleMasterContext;
+ // Flink JobId -> Celeborn LifecycleManager
+ private Map<JobID, LifecycleManager> lifecycleManagers = new
ConcurrentHashMap<>();
+ private final ScheduledThreadPoolExecutor executor =
+ new ScheduledThreadPoolExecutor(
+ 1,
+ ThreadUtils.createFactoryWithDefaultExceptionHandler(
+ "remote-shuffle-master-executor", LOG));
+
+ public RemoteShuffleMaster(ShuffleMasterContext shuffleMasterContext) {
+ this.shuffleMasterContext = shuffleMasterContext;
+ }
+
+ @Override
+ public void registerJob(JobShuffleContext context) {
+ JobID jobId = context.getJobId();
+ Future<?> submit =
+ executor.submit(
+ () -> {
+ if (lifecycleManagers.containsKey(jobId)) {
+ throw new RuntimeException("Duplicated registration job: " +
jobId);
+ } else {
+ CelebornConf celebornConf =
+
FlinkUtils.toCelebornConf(shuffleMasterContext.getConfiguration());
+ LifecycleManager lifecycleManager =
+ new LifecycleManager(FlinkUtils.toCelebornAppId(jobId),
celebornConf);
+ lifecycleManagers.put(jobId, lifecycleManager);
+ }
+ });
+
+ try {
+ submit.get();
+ } catch (InterruptedException e) {
+ LOG.error("Encounter interruptedException when registration job: {}.",
jobId, e);
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ LOG.error("Encounter an error when registration job: {}.", jobId, e);
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
+ @Override
+ public void unregisterJob(JobID jobID) {
+ executor.execute(
+ () -> {
+ try {
+ LOG.info("Unregister job: {}.", jobID);
+ LifecycleManager lifecycleManager =
lifecycleManagers.remove(jobID);
+ if (lifecycleManager != null) {
+ lifecycleManager.stop();
+ }
+ } catch (Throwable throwable) {
+ LOG.error("Encounter an error when unregistering job: {}.", jobID,
throwable);
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<RemoteShuffleDescriptor>
registerPartitionWithProducer(
+ JobID jobID, PartitionDescriptor partitionDescriptor, ProducerDescriptor
producerDescriptor) {
+ CompletableFuture<RemoteShuffleDescriptor> completableFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ LifecycleManager lifecycleManager = lifecycleManagers.get(jobID);
+ FlinkResultPartitionInfo resultPartitionInfo =
+ new FlinkResultPartitionInfo(partitionDescriptor,
producerDescriptor);
+ LifecycleManager.ShuffleTask shuffleTask =
+ lifecycleManager.encodeExternalShuffleTask(
+ resultPartitionInfo.getShuffleId(),
+ resultPartitionInfo.getTaskId(),
+ resultPartitionInfo.getAttemptId());
+ ShuffleResourceDescriptor shuffleResourceDescriptor =
+ new ShuffleResourceDescriptor(shuffleTask);
+ RemoteShuffleResource remoteShuffleResource =
+ new RemoteShuffleResource(
+ lifecycleManager.getRssMetaServiceHost(),
+ lifecycleManager.getRssMetaServicePort(),
+ shuffleResourceDescriptor);
+ return new RemoteShuffleDescriptor(
+ jobID, resultPartitionInfo.getResultPartitionId(),
remoteShuffleResource);
+ },
+ executor);
+
+ return completableFuture;
+ }
+
+ @Override
+ public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
+ // TODO
+ }
+
+ @Override
+ public void close() throws Exception {
+ try {
+ for (LifecycleManager lifecycleManager : lifecycleManagers.values()) {
+ lifecycleManager.stop();
+ }
+ } catch (Exception e) {
+ LOG.warn("Encounter exception when shutdown: " + e.getMessage(), e);
+ }
+
+ ThreadUtils.shutdownExecutors(10, executor);
+ }
+}
diff --git
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResource.java
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResource.java
new file mode 100644
index 00000000..ad8eef06
--- /dev/null
+++
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResource.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+public class RemoteShuffleResource implements ShuffleResource {
+
+ private static final long serialVersionUID = 6497939083185255973L;
+
+ private final String rssMetaServiceHost;
+ private final int rssMetaServicePort;
+
+ private ShuffleResourceDescriptor shuffleResourceDescriptor;
+
+ public RemoteShuffleResource(
+ String rssMetaServiceHost,
+ int rssMetaServicePort,
+ ShuffleResourceDescriptor remoteShuffleDescriptor) {
+ this.rssMetaServiceHost = rssMetaServiceHost;
+ this.rssMetaServicePort = rssMetaServicePort;
+ this.shuffleResourceDescriptor = remoteShuffleDescriptor;
+ }
+
+ @Override
+ public ShuffleResourceDescriptor getMapPartitionShuffleDescriptor() {
+ return shuffleResourceDescriptor;
+ }
+
+ public String getRssMetaServiceHost() {
+ return rssMetaServiceHost;
+ }
+
+ public int getRssMetaServicePort() {
+ return rssMetaServicePort;
+ }
+}
diff --git
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
new file mode 100644
index 00000000..6294f3da
--- /dev/null
+++
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.shuffle.*;
+
+public class RemoteShuffleServiceFactory
+ implements ShuffleServiceFactory<
+ RemoteShuffleDescriptor, ResultPartitionWriter, IndexedInputGate> {
+
+ @Override
+ public ShuffleMaster<RemoteShuffleDescriptor> createShuffleMaster(
+ ShuffleMasterContext shuffleMasterContext) {
+ return new RemoteShuffleMaster(shuffleMasterContext);
+ }
+
+ @Override
+ public ShuffleEnvironment<ResultPartitionWriter, IndexedInputGate>
createShuffleEnvironment(
+ ShuffleEnvironmentContext shuffleEnvironmentContext) {
+ // TODO
+ return null;
+ }
+}
diff --git
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResource.java
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResource.java
new file mode 100644
index 00000000..2a60a75e
--- /dev/null
+++
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResource.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.io.Serializable;
+
+public interface ShuffleResource extends Serializable {
+
+ ShuffleResourceDescriptor getMapPartitionShuffleDescriptor();
+}
diff --git
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResourceDescriptor.java
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResourceDescriptor.java
new file mode 100644
index 00000000..050f8b83
--- /dev/null
+++
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResourceDescriptor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.io.Serializable;
+
+import org.apache.celeborn.client.LifecycleManager;
+import org.apache.celeborn.common.util.PackedPartitionId;
+
+public class ShuffleResourceDescriptor implements Serializable {
+
+ private static final long serialVersionUID = -1251659747395561342L;
+
+ private final int shuffleId;
+ private final int mapId;
+ private final int attemptId;
+ private final int partitionId;
+
+ public ShuffleResourceDescriptor(LifecycleManager.ShuffleTask shuffleTask) {
+ this.shuffleId = shuffleTask.shuffleId();
+ this.mapId = shuffleTask.mapId();
+ this.attemptId = shuffleTask.attemptId();
+ this.partitionId = PackedPartitionId.packedPartitionId(mapId, attemptId);
+ }
+
+ public int getShuffleId() {
+ return shuffleId;
+ }
+
+ public int getMapId() {
+ return mapId;
+ }
+
+ public int getAttemptId() {
+ return attemptId;
+ }
+
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("ShuffleResourceDescriptor{");
+ sb.append("shuffleId=").append(shuffleId);
+ sb.append(", mapId=").append(mapId);
+ sb.append(", attemptId=").append(attemptId);
+ sb.append(", partitionId=").append(partitionId);
+ sb.append('}');
+ return sb.toString();
+ }
+}
diff --git
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
new file mode 100644
index 00000000..1182c726
--- /dev/null
+++
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.utils;
+
+import java.util.Map;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import org.apache.celeborn.common.CelebornConf;
+
+public class FlinkUtils {
+
+ public static CelebornConf toCelebornConf(Configuration configuration) {
+ CelebornConf tmpCelebornConf = new CelebornConf();
+ Map<String, String> confMap = configuration.toMap();
+ for (Map.Entry<String, String> entry : confMap.entrySet()) {
+ String key = entry.getKey();
+ if (key.startsWith("celeborn.")) {
+ tmpCelebornConf.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ return tmpCelebornConf;
+ }
+
+ public static String toCelebornAppId(JobID jobID) {
+ return jobID.toString();
+ }
+
+ public static String toShuffleId(IntermediateDataSetID dataSetID) {
+ return dataSetID.toString();
+ }
+
+ public static String toAttemptId(ExecutionAttemptID attemptID) {
+ return attemptID.toString();
+ }
+}
diff --git
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/utils/ThreadUtils.java
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/utils/ThreadUtils.java
new file mode 100644
index 00000000..58e23ddc
--- /dev/null
+++
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/utils/ThreadUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.utils;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flink.util.ExecutorUtils;
+import org.slf4j.Logger;
+
+public class ThreadUtils {
+
+ public static ThreadFactory createFactoryWithDefaultExceptionHandler(
+ final String executorServiceName, final Logger LOG) {
+ return new ThreadFactoryBuilder()
+ .setNameFormat(executorServiceName + "-%d")
+ .setDaemon(true)
+ .setUncaughtExceptionHandler(
+ (Thread t, Throwable e) ->
+ LOG.error(
+ "exception in serviceName: {}, thread: {}",
+ executorServiceName,
+ t.getName(),
+ e))
+ .build();
+ }
+
+ public static void shutdownExecutors(int timeoutSecs, ExecutorService
executorService) {
+ ExecutorUtils.gracefulShutdown(timeoutSecs, TimeUnit.SECONDS,
executorService);
+ }
+}
diff --git
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
new file mode 100644
index 00000000..20410052
--- /dev/null
+++
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.shuffle.JobShuffleContext;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.util.PackedPartitionId;
+
+public class RemoteShuffleMasterTest {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RemoteShuffleMasterTest.class);
+ private RemoteShuffleMaster remoteShuffleMaster;
+
+ @Before
+ public void setUp() {
+ Configuration configuration = new Configuration();
+ remoteShuffleMaster = createShuffleMaster(configuration);
+ }
+
+ @Test
+ public void testRegisterJob() {
+ JobShuffleContext jobShuffleContext =
createJobShuffleContext(JobID.generate());
+ remoteShuffleMaster.registerJob(jobShuffleContext);
+
+ // reRunRegister job
+ try {
+ remoteShuffleMaster.registerJob(jobShuffleContext);
+ } catch (Exception e) {
+ Assert.assertTrue(true);
+ }
+
+ // unRegister job
+ remoteShuffleMaster.unregisterJob(jobShuffleContext.getJobId());
+ }
+
+ @Test
+ public void testRegisterPartitionWithProducer()
+ throws UnknownHostException, ExecutionException, InterruptedException {
+ JobID jobID = JobID.generate();
+ JobShuffleContext jobShuffleContext = createJobShuffleContext(jobID);
+ remoteShuffleMaster.registerJob(jobShuffleContext);
+
+ IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
+ PartitionDescriptor partitionDescriptor =
createPartitionDescriptor(intermediateDataSetID, 0);
+ ProducerDescriptor producerDescriptor = createProducerDescriptor();
+ RemoteShuffleDescriptor remoteShuffleDescriptor =
+ remoteShuffleMaster
+ .registerPartitionWithProducer(jobID, partitionDescriptor,
producerDescriptor)
+ .get();
+ ShuffleResource shuffleResource =
remoteShuffleDescriptor.getShuffleResource();
+ ShuffleResourceDescriptor mapPartitionShuffleDescriptor =
+ shuffleResource.getMapPartitionShuffleDescriptor();
+ System.out.println(mapPartitionShuffleDescriptor.toString());
+ Assert.assertEquals(0, mapPartitionShuffleDescriptor.getShuffleId());
+ Assert.assertEquals(0, mapPartitionShuffleDescriptor.getPartitionId());
+ Assert.assertEquals(0, mapPartitionShuffleDescriptor.getAttemptId());
+ Assert.assertEquals(0, mapPartitionShuffleDescriptor.getMapId());
+
+ // use same dataset id
+ partitionDescriptor = createPartitionDescriptor(intermediateDataSetID, 1);
+ remoteShuffleDescriptor =
+ remoteShuffleMaster
+ .registerPartitionWithProducer(jobID, partitionDescriptor,
producerDescriptor)
+ .get();
+ mapPartitionShuffleDescriptor =
+
remoteShuffleDescriptor.getShuffleResource().getMapPartitionShuffleDescriptor();
+ Assert.assertEquals(0, mapPartitionShuffleDescriptor.getShuffleId());
+ Assert.assertEquals(1, mapPartitionShuffleDescriptor.getMapId());
+
+ // use another attemptId
+ producerDescriptor = createProducerDescriptor();
+ remoteShuffleDescriptor =
+ remoteShuffleMaster
+ .registerPartitionWithProducer(jobID, partitionDescriptor,
producerDescriptor)
+ .get();
+ mapPartitionShuffleDescriptor =
+
remoteShuffleDescriptor.getShuffleResource().getMapPartitionShuffleDescriptor();
+ Assert.assertEquals(0, mapPartitionShuffleDescriptor.getShuffleId());
+ Assert.assertEquals(
+ PackedPartitionId.packedPartitionId(1, 1),
mapPartitionShuffleDescriptor.getPartitionId());
+ Assert.assertEquals(1, mapPartitionShuffleDescriptor.getAttemptId());
+ Assert.assertEquals(1, mapPartitionShuffleDescriptor.getMapId());
+ }
+
+ @After
+ public void tearDown() {
+ if (remoteShuffleMaster != null) {
+ try {
+ remoteShuffleMaster.close();
+ } catch (Exception e) {
+ LOG.warn(e.getMessage(), e);
+ }
+ }
+ }
+
+ public RemoteShuffleMaster createShuffleMaster(Configuration configuration) {
+ remoteShuffleMaster =
+ new RemoteShuffleMaster(
+ new ShuffleMasterContext() {
+ @Override
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ @Override
+ public void onFatalError(Throwable throwable) {
+ System.exit(-1);
+ }
+ });
+
+ return remoteShuffleMaster;
+ }
+
+ public JobShuffleContext createJobShuffleContext(JobID jobId) {
+ return new JobShuffleContext() {
+ @Override
+ public org.apache.flink.api.common.JobID getJobId() {
+ return jobId;
+ }
+
+ @Override
+ public CompletableFuture<?> stopTrackingAndReleasePartitions(
+ Collection<ResultPartitionID> collection) {
+ return CompletableFuture.completedFuture(null);
+ }
+ };
+ }
+
+ public PartitionDescriptor createPartitionDescriptor(
+ IntermediateDataSetID intermediateDataSetId, int partitionNum) {
+ IntermediateResultPartitionID intermediateResultPartitionId =
+ new IntermediateResultPartitionID(intermediateDataSetId, partitionNum);
+ return new PartitionDescriptor(
+ intermediateDataSetId,
+ 10,
+ intermediateResultPartitionId,
+ ResultPartitionType.BLOCKING,
+ 5,
+ 1);
+ }
+
+ public ProducerDescriptor createProducerDescriptor() throws
UnknownHostException {
+ ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+ return new ProducerDescriptor(
+ ResourceID.generate(), executionAttemptId, InetAddress.getLocalHost(),
100);
+ }
+}
diff --git a/client-flink/flink-shaded/pom.xml
b/client-flink/flink-shaded/pom.xml
new file mode 100644
index 00000000..47800c07
--- /dev/null
+++ b/client-flink/flink-shaded/pom.xml
@@ -0,0 +1,123 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements. See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License. You may obtain a copy of the License at
+~
+~ http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+
<artifactId>celeborn-client-flink-${flink.version}-shaded_${scala.binary.version}</artifactId>
+ <packaging>jar</packaging>
+ <name>Celeborn Shaded Client for flink</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+
<artifactId>celeborn-client-flink-${flink.version}_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>com.google.protobuf</pattern>
+
<shadedPattern>${shading.prefix}.com.google.protobuf</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.google.common</pattern>
+
<shadedPattern>${shading.prefix}.com.google.common</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.netty</pattern>
+ <shadedPattern>${shading.prefix}.io.netty</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.commons</pattern>
+
<shadedPattern>${shading.prefix}.org.apache.commons</shadedPattern>
+ </relocation>
+ </relocations>
+ <artifactSet>
+ <includes>
+ <include>org.apache.celeborn:*</include>
+ <include>com.google.protobuf:protobuf-java</include>
+ <include>com.google.guava:guava</include>
+ <include>io.netty:*</include>
+ <include>org.apache.commons:commons-lang3</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ <exclude>**/log4j.properties</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>${maven.plugin.antrun.version}</version>
+ <executions>
+ <execution>
+ <id>rename-native-library</id>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <target>
+ <echo message="unpacking netty jar"></echo>
+ <unzip dest="${project.build.directory}/unpacked/"
src="${project.build.directory}/${artifactId}-${version}.jar"></unzip>
+ <echo message="renaming native epoll library"></echo>
+ <move includeemptydirs="false"
todir="${project.build.directory}/unpacked/META-INF/native">
+ <fileset
dir="${project.build.directory}/unpacked/META-INF/native"></fileset>
+ <mapper from="libnetty_transport_native_epoll_x86_64.so"
to="liborg_apache_celeborn_shaded_netty_transport_native_epoll_x86_64.so"
type="glob"></mapper>
+ </move>
+ <move includeemptydirs="false"
todir="${project.build.directory}/unpacked/META-INF/native">
+ <fileset
dir="${project.build.directory}/unpacked/META-INF/native"></fileset>
+ <mapper from="libnetty_transport_native_epoll_aarch_64.so"
to="liborg_apache_celeborn_shaded_netty_transport_native_epoll_aarch_64.so.so"
type="glob"></mapper>
+ </move>
+ <echo message="deleting native kqueue library"></echo>
+ <delete
file="${project.build.directory}/unpacked/META-INF/native/libnetty_transport_native_kqueue_x86_64.jnilib"></delete>
+ <delete
file="${project.build.directory}/unpacked/META-INF/native/libnetty_transport_native_kqueue_aarch_64.jnilib"></delete>
+ <delete
file="${project.build.directory}/unpacked/META-INF/native/libnetty_resolver_dns_native_macos_aarch_64.jnilib"></delete>
+ <delete
file="${project.build.directory}/unpacked/META-INF/native/libnetty_resolver_dns_native_macos_x86_64.jnilib"></delete>
+ <echo message="repackaging netty jar"></echo>
+ <jar basedir="${project.build.directory}/unpacked"
destfile="${project.build.directory}/${artifactId}-${version}.jar"></jar>
+ </target>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/client/pom.xml b/client/pom.xml
index 45e8e7df..1ad42aeb 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -56,6 +56,11 @@
<artifactId>commons-lang3</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 3ef99910..0ecce2ce 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -67,6 +67,7 @@ class LifecycleManager(appId: String, val conf: CelebornConf)
extends RpcEndpoin
val shuffleMapperAttempts = new ConcurrentHashMap[Int, Array[Int]]()
private val reducerFileGroupsMap =
new ConcurrentHashMap[Int, Array[Array[PartitionLocation]]]()
+ private val shuffleTaskInfo = new ShuffleTaskInfo()
// maintain each shuffle's map relation of WorkerInfo and partition location
val shuffleAllocatedWorkers =
new ConcurrentHashMap[Int, ConcurrentHashMap[WorkerInfo,
PartitionLocationInfo]]()
@@ -105,6 +106,11 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
}
}
+ case class ShuffleTask(
+ shuffleId: Int,
+ mapId: Int,
+ attemptId: Int)
+
// register shuffle request waiting for response
private val registeringShuffleRequest =
new ConcurrentHashMap[Int, util.Set[RegisterCallContext]]()
@@ -204,6 +210,18 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
shufflePartitionType.getOrDefault(shuffleId, conf.shufflePartitionType)
}
+ def encodeExternalShuffleTask(
+ taskShuffleId: String,
+ mapId: Int,
+ taskAttemptId: String): ShuffleTask = {
+ val shuffleId = shuffleTaskInfo.getShuffleId(taskShuffleId)
+ val attemptId = shuffleTaskInfo.getAttemptId(taskShuffleId, mapId,
taskAttemptId)
+ logInfo(
+ s"encode task from " + s"($taskShuffleId, $mapId, $taskAttemptId) to
($shuffleId, $mapId, " +
+ s"$attemptId)")
+ ShuffleTask(shuffleId, mapId, attemptId)
+ }
+
override def receive: PartialFunction[Any, Unit] = {
case RemoveExpiredShuffle =>
removeExpiredShuffle()
@@ -1065,7 +1083,7 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
latestPartitionLocation.remove(shuffleId)
commitManager.removeExpiredShuffle(shuffleId)
changePartitionManager.removeExpiredShuffle(shuffleId)
-
+ shuffleTaskInfo.remove(shuffleId)
requestUnregisterShuffle(
rssHARetryClient,
UnregisterShuffle(appId, shuffleId, RssHARetryClient.genRequestId()))
diff --git
a/client/src/main/scala/org/apache/celeborn/client/ShuffleTaskInfo.scala
b/client/src/main/scala/org/apache/celeborn/client/ShuffleTaskInfo.scala
new file mode 100644
index 00000000..1742ab86
--- /dev/null
+++ b/client/src/main/scala/org/apache/celeborn/client/ShuffleTaskInfo.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.function
+import javax.annotation.concurrent.ThreadSafe
+
+@ThreadSafe
+class ShuffleTaskInfo {
+
+ private var currentShuffleIndex: Int = 0
+ // task shuffle id -> mapId_taskAttemptId -> attemptIdx
+ private val taskShuffleAttemptIdToAttemptId =
+ new ConcurrentHashMap[String, ConcurrentHashMap[String, Int]]
+ // map attemptId index
+ private val taskShuffleAttemptIdIndex =
+ new ConcurrentHashMap[String, ConcurrentHashMap[Int, Int]]
+ // task shuffle id -> celeborn shuffle id
+ private val taskShuffleIdToShuffleId = new ConcurrentHashMap[String, Int]
+ // celeborn shuffle id -> task shuffle id
+ private val shuffleIdToTaskShuffleId = new ConcurrentHashMap[Int, String]
+
+ val newMapFunc: function.Function[String, ConcurrentHashMap[Int, Int]] =
+ new function.Function[String, ConcurrentHashMap[Int, Int]]() {
+ override def apply(s: String): ConcurrentHashMap[Int, Int] = {
+ new ConcurrentHashMap[Int, Int]()
+ }
+ }
+
+ val newMapFunc2: function.Function[String, ConcurrentHashMap[String, Int]] =
+ new function.Function[String, ConcurrentHashMap[String, Int]]() {
+ override def apply(s: String): ConcurrentHashMap[String, Int] = {
+ new ConcurrentHashMap[String, Int]()
+ }
+ }
+
+ def getShuffleId(taskShuffleId: String): Int = {
+ taskShuffleIdToShuffleId.synchronized {
+ if (taskShuffleIdToShuffleId.containsKey(taskShuffleId)) {
+ taskShuffleIdToShuffleId.get(taskShuffleId)
+ } else {
+ taskShuffleIdToShuffleId.put(taskShuffleId, currentShuffleIndex)
+ shuffleIdToTaskShuffleId.put(currentShuffleIndex, taskShuffleId)
+ val tempShuffleIndex = currentShuffleIndex
+ currentShuffleIndex = currentShuffleIndex + 1
+ tempShuffleIndex
+ }
+ }
+ }
+
+ def getAttemptId(taskShuffleId: String, mapId: Int, attemptId: String): Int
= {
+ val attemptIndex =
taskShuffleAttemptIdIndex.computeIfAbsent(taskShuffleId, newMapFunc)
+ val attemptIdMap =
+ taskShuffleAttemptIdToAttemptId.computeIfAbsent(taskShuffleId,
newMapFunc2)
+ val mapAttemptId = mapId + "_" + attemptId
+ attemptIndex.synchronized {
+ if (!attemptIdMap.containsKey(mapAttemptId)) {
+ if (attemptIndex.containsKey(mapId)) {
+ val index = attemptIndex.get(mapId)
+ attemptIdMap.put(mapAttemptId, index + 1)
+ attemptIndex.put(mapId, index + 1)
+ } else {
+ attemptIdMap.put(mapAttemptId, 0)
+ attemptIndex.put(mapId, 0)
+ }
+ }
+ }
+
+ attemptIdMap.get(mapAttemptId)
+ }
+
+ def remove(shuffleId: Int): Unit = {
+ val taskShuffleId = shuffleIdToTaskShuffleId.remove(shuffleId)
+ taskShuffleIdToShuffleId.remove(taskShuffleId)
+ taskShuffleAttemptIdIndex.remove(shuffleId)
+ taskShuffleAttemptIdToAttemptId.remove(taskShuffleId)
+ }
+}
diff --git
a/client/src/test/scala/org/apache/celeborn/client/ShuffleTaskInfoSuite.scala
b/client/src/test/scala/org/apache/celeborn/client/ShuffleTaskInfoSuite.scala
new file mode 100644
index 00000000..abe2e5f5
--- /dev/null
+++
b/client/src/test/scala/org/apache/celeborn/client/ShuffleTaskInfoSuite.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client
+
+import org.scalatest.funsuite.AnyFunSuite
+
+class ShuffleTaskInfoSuite extends AnyFunSuite {
+
+ test("encode shuffle id & map attemptId") {
+ val shuffleTaskInfo = new ShuffleTaskInfo
+ val encodeShuffleId = shuffleTaskInfo.getShuffleId("shuffleId")
+ assert(encodeShuffleId == 0)
+
+ // another shuffle
+ val encodeShuffleId1 = shuffleTaskInfo.getShuffleId("shuffleId1")
+ assert(encodeShuffleId1 == 1)
+
+ val encodeShuffleId0 = shuffleTaskInfo.getShuffleId("shuffleId")
+ assert(encodeShuffleId0 == 0)
+
+ val encodeAttemptId011 = shuffleTaskInfo.getAttemptId("shuffleId", 1,
"attempt1")
+ val encodeAttemptId112 = shuffleTaskInfo.getAttemptId("shuffleId1", 1,
"attempt2")
+ val encodeAttemptId021 = shuffleTaskInfo.getAttemptId("shuffleId", 2,
"attempt1")
+ val encodeAttemptId012 = shuffleTaskInfo.getAttemptId("shuffleId", 1,
"attempt2")
+ assert(encodeAttemptId011 == 0)
+ assert(encodeAttemptId112 == 0)
+ assert(encodeAttemptId021 == 0)
+ assert(encodeAttemptId012 == 1)
+
+ // remove shuffleId and reEncode
+ shuffleTaskInfo.remove(encodeShuffleId)
+ val encodeShuffleIdNew = shuffleTaskInfo.getShuffleId("shuffleId")
+ assert(encodeShuffleIdNew == 2)
+ }
+}
diff --git a/dev/reformat b/dev/reformat
index 2d5ba343..486af06c 100755
--- a/dev/reformat
+++ b/dev/reformat
@@ -23,3 +23,5 @@ RSS_HOME="$(cd "`dirname "$0"`/.."; pwd)"
${RSS_HOME}/build/mvn spotless:apply -Pspark-2.4
${RSS_HOME}/build/mvn spotless:apply -Pspark-3.1
+
+${RSS_HOME}/build/mvn spotless:apply -Pflink-1.14
diff --git a/pom.xml b/pom.xml
index 7d68e396..829a2469 100644
--- a/pom.xml
+++ b/pom.xml
@@ -36,7 +36,6 @@
<module>service</module>
<module>master</module>
<module>worker</module>
- <module>tests/spark-it</module>
</modules>
<distributionManagement>
@@ -84,6 +83,9 @@
<slf4j.version>1.7.36</slf4j.version>
<roaringbitmap.version>0.9.32</roaringbitmap.version>
<snakeyaml.version>1.30</snakeyaml.version>
+ <!-- default flink version -->
+ <flink.version>1.14.0</flink.version>
+
<!-- default hadoop version -->
<hadoop.version>3.2.1</hadoop.version>
<shading.prefix>org.apache.celeborn.shaded</shading.prefix>
@@ -141,6 +143,25 @@
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <version>${flink.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.tysafe</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
@@ -851,6 +872,7 @@
<module>client-spark/common</module>
<module>client-spark/spark-2</module>
<module>client-spark/spark-2-shaded</module>
+ <module>tests/spark-it</module>
</modules>
<properties>
<jackson.version>2.6.7</jackson.version>
@@ -881,6 +903,7 @@
<module>client-spark/common</module>
<module>client-spark/spark-3</module>
<module>client-spark/spark-3-shaded</module>
+ <module>tests/spark-it</module>
</modules>
<properties>
<jackson.version>2.10.0</jackson.version>
@@ -899,6 +922,7 @@
<module>client-spark/common</module>
<module>client-spark/spark-3</module>
<module>client-spark/spark-3-shaded</module>
+ <module>tests/spark-it</module>
</modules>
<properties>
<jackson.version>2.10.0</jackson.version>
@@ -917,6 +941,7 @@
<module>client-spark/common</module>
<module>client-spark/spark-3</module>
<module>client-spark/spark-3-shaded</module>
+ <module>tests/spark-it</module>
</modules>
<properties>
<jackson.version>2.12.3</jackson.version>
@@ -935,6 +960,7 @@
<module>client-spark/common</module>
<module>client-spark/spark-3</module>
<module>client-spark/spark-3-shaded</module>
+ <module>tests/spark-it</module>
</modules>
<properties>
<jackson.version>2.13.4</jackson.version>
@@ -947,6 +973,23 @@
</properties>
</profile>
+ <profile>
+ <id>flink-1.14</id>
+ <modules>
+ <module>client-flink/flink-1.14</module>
+ <module>client-flink/flink-shaded</module>
+ </modules>
+ <properties>
+ <jackson.version>2.6.7</jackson.version>
+ <jackson.databind.version>2.6.7.3</jackson.databind.version>
+ <lz4-java.version>1.4.0</lz4-java.version>
+ <scala.binary.version>2.12</scala.binary.version>
+ <zstd-jni.version>1.4.4-3</zstd-jni.version>
+ <scala.version>2.12.15</scala.version>
+ <flink.version>1.14.0</flink.version>
+ </properties>
+ </profile>
+
<profile>
<id>google-mirror</id>
<properties>
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ShuffleClientSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ShuffleClientSuite.scala
index ea7e7fe2..4e857e17 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ShuffleClientSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ShuffleClientSuite.scala
@@ -24,7 +24,7 @@ import org.junit.Assert
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite
-import org.apache.celeborn.client.{LifecycleManager, ShuffleClient,
ShuffleClientImpl}
+import org.apache.celeborn.client.{LifecycleManager, ShuffleClientImpl}
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.util.PackedPartitionId