This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new f3f10487 [CELEBORN-75] Initialize flink plugin module (#1027)
f3f10487 is described below

commit f3f104870c91dec60c546278d3ac7dcb7c59a1d4
Author: Shuang <[email protected]>
AuthorDate: Wed Dec 7 15:53:00 2022 +0800

    [CELEBORN-75] Initialize flink plugin module (#1027)
---
 {client => client-flink/flink-1.14}/pom.xml        |  48 ++----
 .../plugin/flink/FlinkResultPartitionInfo.java     |  52 ++++++
 .../plugin/flink/RemoteShuffleDescriptor.java      |  59 +++++++
 .../celeborn/plugin/flink/RemoteShuffleMaster.java | 148 +++++++++++++++++
 .../plugin/flink/RemoteShuffleResource.java        |  50 ++++++
 .../plugin/flink/RemoteShuffleServiceFactory.java  |  40 +++++
 .../celeborn/plugin/flink/ShuffleResource.java     |  25 +++
 .../plugin/flink/ShuffleResourceDescriptor.java    |  67 ++++++++
 .../celeborn/plugin/flink/utils/FlinkUtils.java    |  55 ++++++
 .../celeborn/plugin/flink/utils/ThreadUtils.java   |  48 ++++++
 .../plugin/flink/RemoteShuffleMasterTest.java      | 185 +++++++++++++++++++++
 client-flink/flink-shaded/pom.xml                  | 123 ++++++++++++++
 client/pom.xml                                     |   5 +
 .../apache/celeborn/client/LifecycleManager.scala  |  20 ++-
 .../apache/celeborn/client/ShuffleTaskInfo.scala   |  95 +++++++++++
 .../celeborn/client/ShuffleTaskInfoSuite.scala     |  50 ++++++
 dev/reformat                                       |   2 +
 pom.xml                                            |  45 ++++-
 .../celeborn/tests/client/ShuffleClientSuite.scala |   2 +-
 19 files changed, 1081 insertions(+), 38 deletions(-)

diff --git a/client/pom.xml b/client-flink/flink-1.14/pom.xml
similarity index 60%
copy from client/pom.xml
copy to client-flink/flink-1.14/pom.xml
index 45e8e7df..4aa5b01e 100644
--- a/client/pom.xml
+++ b/client-flink/flink-1.14/pom.xml
@@ -17,58 +17,36 @@
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <modelVersion>4.0.0</modelVersion>
-
   <parent>
     <groupId>org.apache.celeborn</groupId>
     <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
     <version>${project.version}</version>
-    <relativePath>../pom.xml</relativePath>
+    <relativePath>../../pom.xml</relativePath>
   </parent>
 
-  <artifactId>celeborn-client_${scala.binary.version}</artifactId>
+  
<artifactId>celeborn-client-flink-${flink.version}_${scala.binary.version}</artifactId>
   <packaging>jar</packaging>
-  <name>Celeborn Client</name>
+  <name>Celeborn Client for flink</name>
 
   <dependencies>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-runtime</artifactId>
+      <scope>provided</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.celeborn</groupId>
       <artifactId>celeborn-common_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty-all</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.lz4</groupId>
-      <artifactId>lz4-java</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.github.luben</groupId>
-      <artifactId>zstd-jni</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-lang3</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-core</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-slf4j-impl</artifactId>
-      <scope>test</scope>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-client_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-1.2-api</artifactId>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
   </dependencies>
diff --git 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/FlinkResultPartitionInfo.java
 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/FlinkResultPartitionInfo.java
new file mode 100644
index 00000000..588d0c7d
--- /dev/null
+++ 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/FlinkResultPartitionInfo.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+
+import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
+
+public class FlinkResultPartitionInfo {
+  private final PartitionDescriptor partitionDescriptor;
+  private final ProducerDescriptor producerDescriptor;
+
+  public FlinkResultPartitionInfo(
+      PartitionDescriptor partitionDescriptor, ProducerDescriptor 
producerDescriptor) {
+    this.partitionDescriptor = partitionDescriptor;
+    this.producerDescriptor = producerDescriptor;
+  }
+
+  public ResultPartitionID getResultPartitionId() {
+    return new ResultPartitionID(
+        partitionDescriptor.getPartitionId(), 
producerDescriptor.getProducerExecutionId());
+  }
+
+  public String getShuffleId() {
+    return FlinkUtils.toShuffleId(partitionDescriptor.getResultId());
+  }
+
+  public int getTaskId() {
+    return partitionDescriptor.getPartitionId().getPartitionNumber();
+  }
+
+  public String getAttemptId() {
+    return FlinkUtils.toAttemptId(producerDescriptor.getProducerExecutionId());
+  }
+}
diff --git 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleDescriptor.java
 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleDescriptor.java
new file mode 100644
index 00000000..12db3495
--- /dev/null
+++ 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleDescriptor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.util.Optional;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+
+public class RemoteShuffleDescriptor implements ShuffleDescriptor {
+
+  private final ResultPartitionID resultPartitionID;
+
+  private final JobID jobID;
+
+  private final ShuffleResource shuffleResource;
+
+  public RemoteShuffleDescriptor(
+      JobID jobID, ResultPartitionID resultPartitionID, ShuffleResource 
shuffleResource) {
+    this.jobID = jobID;
+    this.resultPartitionID = resultPartitionID;
+    this.shuffleResource = shuffleResource;
+  }
+
+  @Override
+  public ResultPartitionID getResultPartitionID() {
+    return resultPartitionID;
+  }
+
+  public JobID getJobID() {
+    return jobID;
+  }
+
+  public ShuffleResource getShuffleResource() {
+    return shuffleResource;
+  }
+
+  @Override
+  public Optional<ResourceID> storesLocalResourcesOn() {
+    return Optional.empty();
+  }
+}
diff --git 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
new file mode 100644
index 00000000..222c0516
--- /dev/null
+++ 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.shuffle.JobShuffleContext;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.LifecycleManager;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
+import org.apache.celeborn.plugin.flink.utils.ThreadUtils;
+
+public class RemoteShuffleMaster implements 
ShuffleMaster<RemoteShuffleDescriptor> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RemoteShuffleMaster.class);
+
+  private final ShuffleMasterContext shuffleMasterContext;
+  // Flink JobId -> Celeborn LifecycleManager
+  private Map<JobID, LifecycleManager> lifecycleManagers = new 
ConcurrentHashMap<>();
+  private final ScheduledThreadPoolExecutor executor =
+      new ScheduledThreadPoolExecutor(
+          1,
+          ThreadUtils.createFactoryWithDefaultExceptionHandler(
+              "remote-shuffle-master-executor", LOG));
+
+  public RemoteShuffleMaster(ShuffleMasterContext shuffleMasterContext) {
+    this.shuffleMasterContext = shuffleMasterContext;
+  }
+
+  @Override
+  public void registerJob(JobShuffleContext context) {
+    JobID jobId = context.getJobId();
+    Future<?> submit =
+        executor.submit(
+            () -> {
+              if (lifecycleManagers.containsKey(jobId)) {
+                throw new RuntimeException("Duplicated registration job: " + 
jobId);
+              } else {
+                CelebornConf celebornConf =
+                    
FlinkUtils.toCelebornConf(shuffleMasterContext.getConfiguration());
+                LifecycleManager lifecycleManager =
+                    new LifecycleManager(FlinkUtils.toCelebornAppId(jobId), 
celebornConf);
+                lifecycleManagers.put(jobId, lifecycleManager);
+              }
+            });
+
+    try {
+      submit.get();
+    } catch (InterruptedException e) {
+      LOG.error("Encounter interruptedException when registration job: {}.", 
jobId, e);
+      throw new RuntimeException(e);
+    } catch (ExecutionException e) {
+      LOG.error("Encounter an error when registration job: {}.", jobId, e);
+      throw new RuntimeException(e.getCause());
+    }
+  }
+
+  @Override
+  public void unregisterJob(JobID jobID) {
+    executor.execute(
+        () -> {
+          try {
+            LOG.info("Unregister job: {}.", jobID);
+            LifecycleManager lifecycleManager = 
lifecycleManagers.remove(jobID);
+            if (lifecycleManager != null) {
+              lifecycleManager.stop();
+            }
+          } catch (Throwable throwable) {
+            LOG.error("Encounter an error when unregistering job: {}.", jobID, 
throwable);
+          }
+        });
+  }
+
+  @Override
+  public CompletableFuture<RemoteShuffleDescriptor> 
registerPartitionWithProducer(
+      JobID jobID, PartitionDescriptor partitionDescriptor, ProducerDescriptor 
producerDescriptor) {
+    CompletableFuture<RemoteShuffleDescriptor> completableFuture =
+        CompletableFuture.supplyAsync(
+            () -> {
+              LifecycleManager lifecycleManager = lifecycleManagers.get(jobID);
+              FlinkResultPartitionInfo resultPartitionInfo =
+                  new FlinkResultPartitionInfo(partitionDescriptor, 
producerDescriptor);
+              LifecycleManager.ShuffleTask shuffleTask =
+                  lifecycleManager.encodeExternalShuffleTask(
+                      resultPartitionInfo.getShuffleId(),
+                      resultPartitionInfo.getTaskId(),
+                      resultPartitionInfo.getAttemptId());
+              ShuffleResourceDescriptor shuffleResourceDescriptor =
+                  new ShuffleResourceDescriptor(shuffleTask);
+              RemoteShuffleResource remoteShuffleResource =
+                  new RemoteShuffleResource(
+                      lifecycleManager.getRssMetaServiceHost(),
+                      lifecycleManager.getRssMetaServicePort(),
+                      shuffleResourceDescriptor);
+              return new RemoteShuffleDescriptor(
+                  jobID, resultPartitionInfo.getResultPartitionId(), 
remoteShuffleResource);
+            },
+            executor);
+
+    return completableFuture;
+  }
+
+  @Override
+  public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
+    // TODO
+  }
+
+  @Override
+  public void close() throws Exception {
+    try {
+      for (LifecycleManager lifecycleManager : lifecycleManagers.values()) {
+        lifecycleManager.stop();
+      }
+    } catch (Exception e) {
+      LOG.warn("Encounter exception when shutdown: " + e.getMessage(), e);
+    }
+
+    ThreadUtils.shutdownExecutors(10, executor);
+  }
+}
diff --git 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResource.java
 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResource.java
new file mode 100644
index 00000000..ad8eef06
--- /dev/null
+++ 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResource.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+public class RemoteShuffleResource implements ShuffleResource {
+
+  private static final long serialVersionUID = 6497939083185255973L;
+
+  private final String rssMetaServiceHost;
+  private final int rssMetaServicePort;
+
+  private ShuffleResourceDescriptor shuffleResourceDescriptor;
+
+  public RemoteShuffleResource(
+      String rssMetaServiceHost,
+      int rssMetaServicePort,
+      ShuffleResourceDescriptor remoteShuffleDescriptor) {
+    this.rssMetaServiceHost = rssMetaServiceHost;
+    this.rssMetaServicePort = rssMetaServicePort;
+    this.shuffleResourceDescriptor = remoteShuffleDescriptor;
+  }
+
+  @Override
+  public ShuffleResourceDescriptor getMapPartitionShuffleDescriptor() {
+    return shuffleResourceDescriptor;
+  }
+
+  public String getRssMetaServiceHost() {
+    return rssMetaServiceHost;
+  }
+
+  public int getRssMetaServicePort() {
+    return rssMetaServicePort;
+  }
+}
diff --git 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
new file mode 100644
index 00000000..6294f3da
--- /dev/null
+++ 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.shuffle.*;
+
+public class RemoteShuffleServiceFactory
+    implements ShuffleServiceFactory<
+        RemoteShuffleDescriptor, ResultPartitionWriter, IndexedInputGate> {
+
+  @Override
+  public ShuffleMaster<RemoteShuffleDescriptor> createShuffleMaster(
+      ShuffleMasterContext shuffleMasterContext) {
+    return new RemoteShuffleMaster(shuffleMasterContext);
+  }
+
+  @Override
+  public ShuffleEnvironment<ResultPartitionWriter, IndexedInputGate> 
createShuffleEnvironment(
+      ShuffleEnvironmentContext shuffleEnvironmentContext) {
+    // TODO
+    return null;
+  }
+}
diff --git 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResource.java
 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResource.java
new file mode 100644
index 00000000..2a60a75e
--- /dev/null
+++ 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResource.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.io.Serializable;
+
+public interface ShuffleResource extends Serializable {
+
+  ShuffleResourceDescriptor getMapPartitionShuffleDescriptor();
+}
diff --git 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResourceDescriptor.java
 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResourceDescriptor.java
new file mode 100644
index 00000000..050f8b83
--- /dev/null
+++ 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResourceDescriptor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.io.Serializable;
+
+import org.apache.celeborn.client.LifecycleManager;
+import org.apache.celeborn.common.util.PackedPartitionId;
+
+public class ShuffleResourceDescriptor implements Serializable {
+
+  private static final long serialVersionUID = -1251659747395561342L;
+
+  private final int shuffleId;
+  private final int mapId;
+  private final int attemptId;
+  private final int partitionId;
+
+  public ShuffleResourceDescriptor(LifecycleManager.ShuffleTask shuffleTask) {
+    this.shuffleId = shuffleTask.shuffleId();
+    this.mapId = shuffleTask.mapId();
+    this.attemptId = shuffleTask.attemptId();
+    this.partitionId = PackedPartitionId.packedPartitionId(mapId, attemptId);
+  }
+
+  public int getShuffleId() {
+    return shuffleId;
+  }
+
+  public int getMapId() {
+    return mapId;
+  }
+
+  public int getAttemptId() {
+    return attemptId;
+  }
+
+  public int getPartitionId() {
+    return partitionId;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("ShuffleResourceDescriptor{");
+    sb.append("shuffleId=").append(shuffleId);
+    sb.append(", mapId=").append(mapId);
+    sb.append(", attemptId=").append(attemptId);
+    sb.append(", partitionId=").append(partitionId);
+    sb.append('}');
+    return sb.toString();
+  }
+}
diff --git 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
new file mode 100644
index 00000000..1182c726
--- /dev/null
+++ 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.utils;
+
+import java.util.Map;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import org.apache.celeborn.common.CelebornConf;
+
+public class FlinkUtils {
+
+  public static CelebornConf toCelebornConf(Configuration configuration) {
+    CelebornConf tmpCelebornConf = new CelebornConf();
+    Map<String, String> confMap = configuration.toMap();
+    for (Map.Entry<String, String> entry : confMap.entrySet()) {
+      String key = entry.getKey();
+      if (key.startsWith("celeborn.")) {
+        tmpCelebornConf.set(entry.getKey(), entry.getValue());
+      }
+    }
+
+    return tmpCelebornConf;
+  }
+
+  public static String toCelebornAppId(JobID jobID) {
+    return jobID.toString();
+  }
+
+  public static String toShuffleId(IntermediateDataSetID dataSetID) {
+    return dataSetID.toString();
+  }
+
+  public static String toAttemptId(ExecutionAttemptID attemptID) {
+    return attemptID.toString();
+  }
+}
diff --git 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/utils/ThreadUtils.java
 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/utils/ThreadUtils.java
new file mode 100644
index 00000000..58e23ddc
--- /dev/null
+++ 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/utils/ThreadUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.utils;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flink.util.ExecutorUtils;
+import org.slf4j.Logger;
+
+public class ThreadUtils {
+
+  public static ThreadFactory createFactoryWithDefaultExceptionHandler(
+      final String executorServiceName, final Logger LOG) {
+    return new ThreadFactoryBuilder()
+        .setNameFormat(executorServiceName + "-%d")
+        .setDaemon(true)
+        .setUncaughtExceptionHandler(
+            (Thread t, Throwable e) ->
+                LOG.error(
+                    "exception in serviceName: {}, thread: {}",
+                    executorServiceName,
+                    t.getName(),
+                    e))
+        .build();
+  }
+
+  public static void shutdownExecutors(int timeoutSecs, ExecutorService 
executorService) {
+    ExecutorUtils.gracefulShutdown(timeoutSecs, TimeUnit.SECONDS, 
executorService);
+  }
+}
diff --git 
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
 
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
new file mode 100644
index 00000000..20410052
--- /dev/null
+++ 
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.shuffle.JobShuffleContext;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.util.PackedPartitionId;
+
+public class RemoteShuffleMasterTest {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RemoteShuffleMasterTest.class);
+  private RemoteShuffleMaster remoteShuffleMaster;
+
+  @Before
+  public void setUp() {
+    Configuration configuration = new Configuration();
+    remoteShuffleMaster = createShuffleMaster(configuration);
+  }
+
+  @Test
+  public void testRegisterJob() {
+    JobShuffleContext jobShuffleContext = 
createJobShuffleContext(JobID.generate());
+    remoteShuffleMaster.registerJob(jobShuffleContext);
+
+    // reRunRegister job
+    try {
+      remoteShuffleMaster.registerJob(jobShuffleContext);
+    } catch (Exception e) {
+      Assert.assertTrue(true);
+    }
+
+    // unRegister job
+    remoteShuffleMaster.unregisterJob(jobShuffleContext.getJobId());
+  }
+
+  @Test
+  public void testRegisterPartitionWithProducer()
+      throws UnknownHostException, ExecutionException, InterruptedException {
+    JobID jobID = JobID.generate();
+    JobShuffleContext jobShuffleContext = createJobShuffleContext(jobID);
+    remoteShuffleMaster.registerJob(jobShuffleContext);
+
+    IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
+    PartitionDescriptor partitionDescriptor = 
createPartitionDescriptor(intermediateDataSetID, 0);
+    ProducerDescriptor producerDescriptor = createProducerDescriptor();
+    RemoteShuffleDescriptor remoteShuffleDescriptor =
+        remoteShuffleMaster
+            .registerPartitionWithProducer(jobID, partitionDescriptor, 
producerDescriptor)
+            .get();
+    ShuffleResource shuffleResource = 
remoteShuffleDescriptor.getShuffleResource();
+    ShuffleResourceDescriptor mapPartitionShuffleDescriptor =
+        shuffleResource.getMapPartitionShuffleDescriptor();
+    System.out.println(mapPartitionShuffleDescriptor.toString());
+    Assert.assertEquals(0, mapPartitionShuffleDescriptor.getShuffleId());
+    Assert.assertEquals(0, mapPartitionShuffleDescriptor.getPartitionId());
+    Assert.assertEquals(0, mapPartitionShuffleDescriptor.getAttemptId());
+    Assert.assertEquals(0, mapPartitionShuffleDescriptor.getMapId());
+
+    // use same dataset id
+    partitionDescriptor = createPartitionDescriptor(intermediateDataSetID, 1);
+    remoteShuffleDescriptor =
+        remoteShuffleMaster
+            .registerPartitionWithProducer(jobID, partitionDescriptor, 
producerDescriptor)
+            .get();
+    mapPartitionShuffleDescriptor =
+        
remoteShuffleDescriptor.getShuffleResource().getMapPartitionShuffleDescriptor();
+    Assert.assertEquals(0, mapPartitionShuffleDescriptor.getShuffleId());
+    Assert.assertEquals(1, mapPartitionShuffleDescriptor.getMapId());
+
+    // use another attemptId
+    producerDescriptor = createProducerDescriptor();
+    remoteShuffleDescriptor =
+        remoteShuffleMaster
+            .registerPartitionWithProducer(jobID, partitionDescriptor, 
producerDescriptor)
+            .get();
+    mapPartitionShuffleDescriptor =
+        
remoteShuffleDescriptor.getShuffleResource().getMapPartitionShuffleDescriptor();
+    Assert.assertEquals(0, mapPartitionShuffleDescriptor.getShuffleId());
+    Assert.assertEquals(
+        PackedPartitionId.packedPartitionId(1, 1), 
mapPartitionShuffleDescriptor.getPartitionId());
+    Assert.assertEquals(1, mapPartitionShuffleDescriptor.getAttemptId());
+    Assert.assertEquals(1, mapPartitionShuffleDescriptor.getMapId());
+  }
+
+  @After
+  public void tearDown() {
+    if (remoteShuffleMaster != null) {
+      try {
+        remoteShuffleMaster.close();
+      } catch (Exception e) {
+        LOG.warn(e.getMessage(), e);
+      }
+    }
+  }
+
+  public RemoteShuffleMaster createShuffleMaster(Configuration configuration) {
+    remoteShuffleMaster =
+        new RemoteShuffleMaster(
+            new ShuffleMasterContext() {
+              @Override
+              public Configuration getConfiguration() {
+                return configuration;
+              }
+
+              @Override
+              public void onFatalError(Throwable throwable) {
+                System.exit(-1);
+              }
+            });
+
+    return remoteShuffleMaster;
+  }
+
+  public JobShuffleContext createJobShuffleContext(JobID jobId) {
+    return new JobShuffleContext() {
+      @Override
+      public org.apache.flink.api.common.JobID getJobId() {
+        return jobId;
+      }
+
+      @Override
+      public CompletableFuture<?> stopTrackingAndReleasePartitions(
+          Collection<ResultPartitionID> collection) {
+        return CompletableFuture.completedFuture(null);
+      }
+    };
+  }
+
+  public PartitionDescriptor createPartitionDescriptor(
+      IntermediateDataSetID intermediateDataSetId, int partitionNum) {
+    IntermediateResultPartitionID intermediateResultPartitionId =
+        new IntermediateResultPartitionID(intermediateDataSetId, partitionNum);
+    return new PartitionDescriptor(
+        intermediateDataSetId,
+        10,
+        intermediateResultPartitionId,
+        ResultPartitionType.BLOCKING,
+        5,
+        1);
+  }
+
+  public ProducerDescriptor createProducerDescriptor() throws 
UnknownHostException {
+    ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+    return new ProducerDescriptor(
+        ResourceID.generate(), executionAttemptId, InetAddress.getLocalHost(), 
100);
+  }
+}
diff --git a/client-flink/flink-shaded/pom.xml 
b/client-flink/flink-shaded/pom.xml
new file mode 100644
index 00000000..47800c07
--- /dev/null
+++ b/client-flink/flink-shaded/pom.xml
@@ -0,0 +1,123 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements.  See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License.  You may obtain a copy of the License at
+~
+~    http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.celeborn</groupId>
+    <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
+    <version>${project.version}</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  
<artifactId>celeborn-client-flink-${flink.version}-shaded_${scala.binary.version}</artifactId>
+  <packaging>jar</packaging>
+  <name>Celeborn Shaded Client for flink</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      
<artifactId>celeborn-client-flink-${flink.version}_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <configuration>
+          <relocations>
+            <relocation>
+              <pattern>com.google.protobuf</pattern>
+              
<shadedPattern>${shading.prefix}.com.google.protobuf</shadedPattern>
+            </relocation>
+            <relocation>
+              <pattern>com.google.common</pattern>
+              
<shadedPattern>${shading.prefix}.com.google.common</shadedPattern>
+            </relocation>
+            <relocation>
+              <pattern>io.netty</pattern>
+              <shadedPattern>${shading.prefix}.io.netty</shadedPattern>
+            </relocation>
+            <relocation>
+              <pattern>org.apache.commons</pattern>
+              
<shadedPattern>${shading.prefix}.org.apache.commons</shadedPattern>
+            </relocation>
+          </relocations>
+          <artifactSet>
+            <includes>
+              <include>org.apache.celeborn:*</include>
+              <include>com.google.protobuf:protobuf-java</include>
+              <include>com.google.guava:guava</include>
+              <include>io.netty:*</include>
+              <include>org.apache.commons:commons-lang3</include>
+            </includes>
+          </artifactSet>
+          <filters>
+            <filter>
+              <artifact>*:*</artifact>
+              <excludes>
+                <exclude>META-INF/*.SF</exclude>
+                <exclude>META-INF/*.DSA</exclude>
+                <exclude>META-INF/*.RSA</exclude>
+                <exclude>**/log4j.properties</exclude>
+              </excludes>
+            </filter>
+          </filters>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <version>${maven.plugin.antrun.version}</version>
+        <executions>
+          <execution>
+            <id>rename-native-library</id>
+            <goals>
+              <goal>run</goal>
+            </goals>
+            <phase>package</phase>
+            <configuration>
+              <target>
+                <echo message="unpacking netty jar"></echo>
+                <unzip dest="${project.build.directory}/unpacked/" 
src="${project.build.directory}/${artifactId}-${version}.jar"></unzip>
+                <echo message="renaming native epoll library"></echo>
+                <move includeemptydirs="false" 
todir="${project.build.directory}/unpacked/META-INF/native">
+                  <fileset 
dir="${project.build.directory}/unpacked/META-INF/native"></fileset>
+                  <mapper from="libnetty_transport_native_epoll_x86_64.so" 
to="liborg_apache_celeborn_shaded_netty_transport_native_epoll_x86_64.so" 
type="glob"></mapper>
+                </move>
+                <move includeemptydirs="false" 
todir="${project.build.directory}/unpacked/META-INF/native">
+                  <fileset 
dir="${project.build.directory}/unpacked/META-INF/native"></fileset>
+                  <mapper from="libnetty_transport_native_epoll_aarch_64.so" 
to="liborg_apache_celeborn_shaded_netty_transport_native_epoll_aarch_64.so.so" 
type="glob"></mapper>
+                </move>
+                <echo message="deleting native kqueue library"></echo>
+                <delete 
file="${project.build.directory}/unpacked/META-INF/native/libnetty_transport_native_kqueue_x86_64.jnilib"></delete>
+                <delete 
file="${project.build.directory}/unpacked/META-INF/native/libnetty_transport_native_kqueue_aarch_64.jnilib"></delete>
+                <delete 
file="${project.build.directory}/unpacked/META-INF/native/libnetty_resolver_dns_native_macos_aarch_64.jnilib"></delete>
+                <delete 
file="${project.build.directory}/unpacked/META-INF/native/libnetty_resolver_dns_native_macos_x86_64.jnilib"></delete>
+                <echo message="repackaging netty jar"></echo>
+                <jar basedir="${project.build.directory}/unpacked" 
destfile="${project.build.directory}/${artifactId}-${version}.jar"></jar>
+              </target>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/client/pom.xml b/client/pom.xml
index 45e8e7df..1ad42aeb 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -56,6 +56,11 @@
       <artifactId>commons-lang3</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 3ef99910..0ecce2ce 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -67,6 +67,7 @@ class LifecycleManager(appId: String, val conf: CelebornConf) 
extends RpcEndpoin
   val shuffleMapperAttempts = new ConcurrentHashMap[Int, Array[Int]]()
   private val reducerFileGroupsMap =
     new ConcurrentHashMap[Int, Array[Array[PartitionLocation]]]()
+  private val shuffleTaskInfo = new ShuffleTaskInfo()
   // maintain each shuffle's map relation of WorkerInfo and partition location
   val shuffleAllocatedWorkers =
     new ConcurrentHashMap[Int, ConcurrentHashMap[WorkerInfo, 
PartitionLocationInfo]]()
@@ -105,6 +106,11 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
     }
   }
 
+  case class ShuffleTask(
+      shuffleId: Int,
+      mapId: Int,
+      attemptId: Int)
+
   // register shuffle request waiting for response
   private val registeringShuffleRequest =
     new ConcurrentHashMap[Int, util.Set[RegisterCallContext]]()
@@ -204,6 +210,18 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
     shufflePartitionType.getOrDefault(shuffleId, conf.shufflePartitionType)
   }
 
+  def encodeExternalShuffleTask(
+      taskShuffleId: String,
+      mapId: Int,
+      taskAttemptId: String): ShuffleTask = {
+    val shuffleId = shuffleTaskInfo.getShuffleId(taskShuffleId)
+    val attemptId = shuffleTaskInfo.getAttemptId(taskShuffleId, mapId, 
taskAttemptId)
+    logInfo(
+      s"encode task from " + s"($taskShuffleId, $mapId, $taskAttemptId) to 
($shuffleId, $mapId, " +
+        s"$attemptId)")
+    ShuffleTask(shuffleId, mapId, attemptId)
+  }
+
   override def receive: PartialFunction[Any, Unit] = {
     case RemoveExpiredShuffle =>
       removeExpiredShuffle()
@@ -1065,7 +1083,7 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
         latestPartitionLocation.remove(shuffleId)
         commitManager.removeExpiredShuffle(shuffleId)
         changePartitionManager.removeExpiredShuffle(shuffleId)
-
+        shuffleTaskInfo.remove(shuffleId)
         requestUnregisterShuffle(
           rssHARetryClient,
           UnregisterShuffle(appId, shuffleId, RssHARetryClient.genRequestId()))
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/ShuffleTaskInfo.scala 
b/client/src/main/scala/org/apache/celeborn/client/ShuffleTaskInfo.scala
new file mode 100644
index 00000000..1742ab86
--- /dev/null
+++ b/client/src/main/scala/org/apache/celeborn/client/ShuffleTaskInfo.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.function
+import javax.annotation.concurrent.ThreadSafe
+
+@ThreadSafe
+class ShuffleTaskInfo {
+
+  private var currentShuffleIndex: Int = 0
+  // task shuffle id -> mapId_taskAttemptId -> attemptIdx
+  private val taskShuffleAttemptIdToAttemptId =
+    new ConcurrentHashMap[String, ConcurrentHashMap[String, Int]]
+  // map attemptId index
+  private val taskShuffleAttemptIdIndex =
+    new ConcurrentHashMap[String, ConcurrentHashMap[Int, Int]]
+  // task shuffle id -> celeborn shuffle id
+  private val taskShuffleIdToShuffleId = new ConcurrentHashMap[String, Int]
+  // celeborn shuffle id -> task shuffle id
+  private val shuffleIdToTaskShuffleId = new ConcurrentHashMap[Int, String]
+
+  val newMapFunc: function.Function[String, ConcurrentHashMap[Int, Int]] =
+    new function.Function[String, ConcurrentHashMap[Int, Int]]() {
+      override def apply(s: String): ConcurrentHashMap[Int, Int] = {
+        new ConcurrentHashMap[Int, Int]()
+      }
+    }
+
+  val newMapFunc2: function.Function[String, ConcurrentHashMap[String, Int]] =
+    new function.Function[String, ConcurrentHashMap[String, Int]]() {
+      override def apply(s: String): ConcurrentHashMap[String, Int] = {
+        new ConcurrentHashMap[String, Int]()
+      }
+    }
+
+  def getShuffleId(taskShuffleId: String): Int = {
+    taskShuffleIdToShuffleId.synchronized {
+      if (taskShuffleIdToShuffleId.containsKey(taskShuffleId)) {
+        taskShuffleIdToShuffleId.get(taskShuffleId)
+      } else {
+        taskShuffleIdToShuffleId.put(taskShuffleId, currentShuffleIndex)
+        shuffleIdToTaskShuffleId.put(currentShuffleIndex, taskShuffleId)
+        val tempShuffleIndex = currentShuffleIndex
+        currentShuffleIndex = currentShuffleIndex + 1
+        tempShuffleIndex
+      }
+    }
+  }
+
+  def getAttemptId(taskShuffleId: String, mapId: Int, attemptId: String): Int 
= {
+    val attemptIndex = 
taskShuffleAttemptIdIndex.computeIfAbsent(taskShuffleId, newMapFunc)
+    val attemptIdMap =
+      taskShuffleAttemptIdToAttemptId.computeIfAbsent(taskShuffleId, 
newMapFunc2)
+    val mapAttemptId = mapId + "_" + attemptId
+    attemptIndex.synchronized {
+      if (!attemptIdMap.containsKey(mapAttemptId)) {
+        if (attemptIndex.containsKey(mapId)) {
+          val index = attemptIndex.get(mapId)
+          attemptIdMap.put(mapAttemptId, index + 1)
+          attemptIndex.put(mapId, index + 1)
+        } else {
+          attemptIdMap.put(mapAttemptId, 0)
+          attemptIndex.put(mapId, 0)
+        }
+      }
+    }
+
+    attemptIdMap.get(mapAttemptId)
+  }
+
+  def remove(shuffleId: Int): Unit = {
+    val taskShuffleId = shuffleIdToTaskShuffleId.remove(shuffleId)
+    taskShuffleIdToShuffleId.remove(taskShuffleId)
+    taskShuffleAttemptIdIndex.remove(shuffleId)
+    taskShuffleAttemptIdToAttemptId.remove(taskShuffleId)
+  }
+}
diff --git 
a/client/src/test/scala/org/apache/celeborn/client/ShuffleTaskInfoSuite.scala 
b/client/src/test/scala/org/apache/celeborn/client/ShuffleTaskInfoSuite.scala
new file mode 100644
index 00000000..abe2e5f5
--- /dev/null
+++ 
b/client/src/test/scala/org/apache/celeborn/client/ShuffleTaskInfoSuite.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client
+
+import org.scalatest.funsuite.AnyFunSuite
+
+class ShuffleTaskInfoSuite extends AnyFunSuite {
+
+  test("encode shuffle id & map attemptId") {
+    val shuffleTaskInfo = new ShuffleTaskInfo
+    val encodeShuffleId = shuffleTaskInfo.getShuffleId("shuffleId")
+    assert(encodeShuffleId == 0)
+
+    // another shuffle
+    val encodeShuffleId1 = shuffleTaskInfo.getShuffleId("shuffleId1")
+    assert(encodeShuffleId1 == 1)
+
+    val encodeShuffleId0 = shuffleTaskInfo.getShuffleId("shuffleId")
+    assert(encodeShuffleId0 == 0)
+
+    val encodeAttemptId011 = shuffleTaskInfo.getAttemptId("shuffleId", 1, 
"attempt1")
+    val encodeAttemptId112 = shuffleTaskInfo.getAttemptId("shuffleId1", 1, 
"attempt2")
+    val encodeAttemptId021 = shuffleTaskInfo.getAttemptId("shuffleId", 2, 
"attempt1")
+    val encodeAttemptId012 = shuffleTaskInfo.getAttemptId("shuffleId", 1, 
"attempt2")
+    assert(encodeAttemptId011 == 0)
+    assert(encodeAttemptId112 == 0)
+    assert(encodeAttemptId021 == 0)
+    assert(encodeAttemptId012 == 1)
+
+    // remove shuffleId and reEncode
+    shuffleTaskInfo.remove(encodeShuffleId)
+    val encodeShuffleIdNew = shuffleTaskInfo.getShuffleId("shuffleId")
+    assert(encodeShuffleIdNew == 2)
+  }
+}
diff --git a/dev/reformat b/dev/reformat
index 2d5ba343..486af06c 100755
--- a/dev/reformat
+++ b/dev/reformat
@@ -23,3 +23,5 @@ RSS_HOME="$(cd "`dirname "$0"`/.."; pwd)"
 ${RSS_HOME}/build/mvn spotless:apply -Pspark-2.4
 
 ${RSS_HOME}/build/mvn spotless:apply -Pspark-3.1
+
+${RSS_HOME}/build/mvn spotless:apply -Pflink-1.14
diff --git a/pom.xml b/pom.xml
index 7d68e396..829a2469 100644
--- a/pom.xml
+++ b/pom.xml
@@ -36,7 +36,6 @@
     <module>service</module>
     <module>master</module>
     <module>worker</module>
-    <module>tests/spark-it</module>
   </modules>
 
   <distributionManagement>
@@ -84,6 +83,9 @@
     <slf4j.version>1.7.36</slf4j.version>
     <roaringbitmap.version>0.9.32</roaringbitmap.version>
     <snakeyaml.version>1.30</snakeyaml.version>
+    <!--  default flink version  -->
+    <flink.version>1.14.0</flink.version>
+
     <!--  default hadoop version  -->
     <hadoop.version>3.2.1</hadoop.version>
     <shading.prefix>org.apache.celeborn.shaded</shading.prefix>
@@ -141,6 +143,25 @@
         <artifactId>slf4j-api</artifactId>
         <version>${slf4j.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-runtime</artifactId>
+        <version>${flink.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>com.tysafe</groupId>
+            <artifactId>*</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>*</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
       <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>jul-to-slf4j</artifactId>
@@ -851,6 +872,7 @@
         <module>client-spark/common</module>
         <module>client-spark/spark-2</module>
         <module>client-spark/spark-2-shaded</module>
+        <module>tests/spark-it</module>
       </modules>
       <properties>
         <jackson.version>2.6.7</jackson.version>
@@ -881,6 +903,7 @@
         <module>client-spark/common</module>
         <module>client-spark/spark-3</module>
         <module>client-spark/spark-3-shaded</module>
+        <module>tests/spark-it</module>
       </modules>
       <properties>
         <jackson.version>2.10.0</jackson.version>
@@ -899,6 +922,7 @@
         <module>client-spark/common</module>
         <module>client-spark/spark-3</module>
         <module>client-spark/spark-3-shaded</module>
+        <module>tests/spark-it</module>
       </modules>
       <properties>
         <jackson.version>2.10.0</jackson.version>
@@ -917,6 +941,7 @@
         <module>client-spark/common</module>
         <module>client-spark/spark-3</module>
         <module>client-spark/spark-3-shaded</module>
+        <module>tests/spark-it</module>
       </modules>
       <properties>
         <jackson.version>2.12.3</jackson.version>
@@ -935,6 +960,7 @@
         <module>client-spark/common</module>
         <module>client-spark/spark-3</module>
         <module>client-spark/spark-3-shaded</module>
+        <module>tests/spark-it</module>
       </modules>
       <properties>
         <jackson.version>2.13.4</jackson.version>
@@ -947,6 +973,23 @@
       </properties>
     </profile>
 
+    <profile>
+      <id>flink-1.14</id>
+      <modules>
+        <module>client-flink/flink-1.14</module>
+        <module>client-flink/flink-shaded</module>
+      </modules>
+      <properties>
+        <jackson.version>2.6.7</jackson.version>
+        <jackson.databind.version>2.6.7.3</jackson.databind.version>
+        <lz4-java.version>1.4.0</lz4-java.version>
+        <scala.binary.version>2.12</scala.binary.version>
+        <zstd-jni.version>1.4.4-3</zstd-jni.version>
+        <scala.version>2.12.15</scala.version>
+        <flink.version>1.14.0</flink.version>
+      </properties>
+    </profile>
+
     <profile>
       <id>google-mirror</id>
       <properties>
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ShuffleClientSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ShuffleClientSuite.scala
index ea7e7fe2..4e857e17 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ShuffleClientSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ShuffleClientSuite.scala
@@ -24,7 +24,7 @@ import org.junit.Assert
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.funsuite.AnyFunSuite
 
-import org.apache.celeborn.client.{LifecycleManager, ShuffleClient, 
ShuffleClientImpl}
+import org.apache.celeborn.client.{LifecycleManager, ShuffleClientImpl}
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.identity.UserIdentifier
 import org.apache.celeborn.common.util.PackedPartitionId

Reply via email to