EnricoMi commented on code in PR #1652:
URL: 
https://github.com/apache/incubator-uniffle/pull/1652#discussion_r1584291069


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/writer/TaskAttemptAssignment.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.writer;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
+import org.apache.spark.shuffle.handle.ShuffleHandleInfo;
+
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.exception.RssException;
+
+/** This class is to get the partition assignment for ShuffleWriter. */
+public class TaskAttemptAssignment {
+  private Map<Integer, List<ShuffleServerInfo>> assignment;
+  private boolean mutable = false;
+
+  public TaskAttemptAssignment(long taskAttemptId, ShuffleHandleInfo 
shuffleHandleInfo) {
+    this.assignment = 
shuffleHandleInfo.getAvailablePartitionServersForWriter();
+    if (shuffleHandleInfo instanceof MutableShuffleHandleInfo) {
+      this.mutable = true;
+    }
+  }
+
+  /**
+   * Retrieving the partition's current available shuffleServers.
+   *
+   * @param partitionId
+   * @return
+   */
+  public List<ShuffleServerInfo> retrieve(int partitionId) {
+    return assignment.get(partitionId);
+  }
+
+  public void update(ShuffleHandleInfo handle) {
+    if (!mutable) {

Review Comment:
   This is still confusing, `mutable` is derived from the handle given to the 
constructor, but `getAvailablePartitionServersForWriter` is called on the 
handle given to this method. There is no need for `mutable`, as this works 
perfectly fine with `ShuffleHandleInfo`.



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/handle/ShuffleHandleInfoBase.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.handle;
+
+import java.io.Serializable;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+
+public abstract class ShuffleHandleInfoBase implements ShuffleHandleInfo, 
Serializable {

Review Comment:
   I like `BaseShuffleHandleInfo`, but a class name like `{Interface}Base` is 
common practice for a base implementation of an interface `{Interface}`.



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.handle;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.client.PartitionDataReplicaRequirementTracking;
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.proto.RssProtos;
+
+/** This class holds the dynamic partition assignment for partition reassign 
mechanism. */
+public class MutableShuffleHandleInfo extends ShuffleHandleInfoBase {

Review Comment:
   A class should not be named after its usage, but after what it implements, 
as it has no knowledge about how it is being used, and it does not care, it 
provides only what it implements.
   
   This handle info is **used** for fault tolerance, but there is no fault 
tolerance **built into** this class. It uses per-replica server infos to 
implement `getAvailablePartitionServersForWriter()` and 
`getAllPartitionServersForReader()`.



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/writer/TaskAttemptAssignment.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.writer;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
+import org.apache.spark.shuffle.handle.ShuffleHandleInfo;
+
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.exception.RssException;
+
+/** This class is to get the partition assignment for ShuffleWriter. */
+public class TaskAttemptAssignment {
+  private Map<Integer, List<ShuffleServerInfo>> assignment;
+  private boolean mutable = false;
+
+  public TaskAttemptAssignment(long taskAttemptId, ShuffleHandleInfo 
shuffleHandleInfo) {
+    this.assignment = 
shuffleHandleInfo.getAvailablePartitionServersForWriter();

Review Comment:
   after removing `mutable` completely (see below), this should be changed to:
   ```suggestion
       this.update(shuffleHandleInfo);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to