advancedxy commented on code in PR #1838:
URL: 
https://github.com/apache/incubator-uniffle/pull/1838#discussion_r1687352014


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssFetchFailedIterator.java:
##########
@@ -52,8 +51,7 @@ public static class Builder {
     private int shuffleId;
     private int partitionId;
     private int stageAttemptId;
-    private String reportServerHost;
-    private int reportServerPort;
+    private ExpireCloseableSupplier<ShuffleManagerClient> 
managerClientSupplier;

Review Comment:
   ditto.



##########
common/src/main/java/org/apache/uniffle/common/util/ExpireCloseableSupplier.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.util;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExpireCloseableSupplier<T extends Closeable> implements 
Supplier<T>, Serializable {
+  private static final long serialVersionUID = 0;
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpireCloseableSupplier.class);
+  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+  private volatile T t;
+  private final Supplier<T> delegate;
+  private transient volatile long freshTime;
+  private final long delayCloseInterval;
+
+  public ExpireCloseableSupplier(Supplier<T> delegate) {
+    this(delegate, 10000);
+  }
+
+  public ExpireCloseableSupplier(Supplier<T> delegate, long 
delayCloseInterval) {
+    this.delegate = delegate;
+    this.delayCloseInterval = delayCloseInterval;
+  }
+
+  public synchronized T get() {
+    freshTime = System.currentTimeMillis();
+    if (t == null) {
+      t = delegate.get();
+    }
+    executor.schedule(this::close, delayCloseInterval, TimeUnit.MILLISECONDS);
+    return t;

Review Comment:
   Like the previous comment, it's not the problem of this Supplier. But once T 
is closed outside, there's no way for this Supplier to detect that.
   
   I can think of two ways to solve this problem:
   1. Define a new interface which is `HasClosed` to indicate whether a 
Closable object has been closed or not,  then implements that for 
ShuffleManagerClient.
   2. leave the code as it is and add a new centrical method in 
ShuffleManagerClientFactory to return a `ExpiringCloseableSupplier` which 
should states clearly that close method is actually called by the `Supplier`.



##########
client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java:
##########
@@ -606,25 +609,28 @@ protected synchronized MutableShuffleHandleInfo 
getRemoteShuffleHandleInfoWithBl
     RssPartitionToShuffleServerRequest rssPartitionToShuffleServerRequest =
         new RssPartitionToShuffleServerRequest(shuffleId);
     RssReassignOnBlockSendFailureResponse rpcPartitionToShufflerServer =
-        getOrCreateShuffleManagerClient()
+        getOrCreateShuffleManagerClientWrapper()
+            .get()
             
.getPartitionToShufflerServerWithBlockRetry(rssPartitionToShuffleServerRequest);
     MutableShuffleHandleInfo shuffleHandleInfo =
         
MutableShuffleHandleInfo.fromProto(rpcPartitionToShufflerServer.getHandle());
     return shuffleHandleInfo;
   }
 
-  // todo: automatic close client when the client is idle to avoid too much 
connections for spark
-  // driver.
-  protected ShuffleManagerClient getOrCreateShuffleManagerClient() {
-    if (shuffleManagerClient == null) {
+  protected synchronized ExpireCloseableSupplier<ShuffleManagerClient>
+      getOrCreateShuffleManagerClientWrapper() {
+    if (managerClientSupplier == null) {
       RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
       String driver = rssConf.getString("driver.host", "");
       int port = rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT);
-      this.shuffleManagerClient =
-          ShuffleManagerClientFactory.getInstance()
-              .createShuffleManagerClient(ClientType.GRPC, driver, port);
+      long rpcTimeout = 
rssConf.getLong(RssBaseConf.RSS_CLIENT_TYPE_GRPC_TIMEOUT_MS);

Review Comment:
   Could you add more comments to illustrate your code?
   It seems a bit unintuitive to use rpc timeout as a delay interval?



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java:
##########
@@ -346,6 +343,7 @@ public static boolean isStageResubmitSupported() {
   }
 
   public static RssException reportRssFetchFailedException(
+      ExpireCloseableSupplier<ShuffleManagerClient> managerClientSupplier,

Review Comment:
   I think we can simply use `Supplier<ShuffleMangerClient>` here?
   
   The delayed close is an implementation detail and doesn't have to be exposed 
to the API side.
   
   One defeat of this approach is that callers may close the 
`ShuffleManagerClient` explicitly, which might break how 
`ExpireCloseableSupplier<ShuffleManagerClient>` work: once it's closed by a 
caller, it cannot be reused again, however, the supplier has no idea it's 
already been closed.
   
   For that case, I think we should callout in the comment/javadoc to indicate 
that ShuffleManagerClients are self-closeable.



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssFetchFailedIterator.java:
##########
@@ -98,19 +91,8 @@ static Builder newBuilder() {
     return new Builder();
   }
 
-  private static ShuffleManagerClient createShuffleManagerClient(String host, 
int port)
-      throws IOException {
-    ClientType grpc = ClientType.GRPC;
-    // host is passed from spark.driver.bindAddress, which would be set when 
SparkContext is
-    // constructed.
-    return 
ShuffleManagerClientFactory.getInstance().createShuffleManagerClient(grpc, 
host, port);
-  }
-
   private RssException generateFetchFailedIfNecessary(RssFetchFailedException 
e) {
-    String driver = builder.reportServerHost;
-    int port = builder.reportServerPort;
-    // todo: reuse this manager client if this is a bottleneck.
-    try (ShuffleManagerClient client = createShuffleManagerClient(driver, 
port)) {
+    try (ShuffleManagerClient client = builder.managerClientSupplier.get()) {

Review Comment:
   This is exactly what I'm referring to in the previous comment. If the client 
is closed/auto-closed, it's not useable anymore.



##########
common/src/main/java/org/apache/uniffle/common/util/ExpireCloseableSupplier.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.util;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExpireCloseableSupplier<T extends Closeable> implements 
Supplier<T>, Serializable {
+  private static final long serialVersionUID = 0;
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpireCloseableSupplier.class);
+  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+  private volatile T t;
+  private final Supplier<T> delegate;
+  private transient volatile long freshTime;
+  private final long delayCloseInterval;
+
+  public ExpireCloseableSupplier(Supplier<T> delegate) {
+    this(delegate, 10000);

Review Comment:
   10000 could be a `private static final int DEFAULT_DELAY`.



##########
common/src/main/java/org/apache/uniffle/common/util/ExpireCloseableSupplier.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.util;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExpireCloseableSupplier<T extends Closeable> implements 
Supplier<T>, Serializable {
+  private static final long serialVersionUID = 0;
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpireCloseableSupplier.class);
+  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+  private volatile T t;
+  private final Supplier<T> delegate;
+  private transient volatile long freshTime;
+  private final long delayCloseInterval;
+
+  public ExpireCloseableSupplier(Supplier<T> delegate) {
+    this(delegate, 10000);
+  }
+
+  public ExpireCloseableSupplier(Supplier<T> delegate, long 
delayCloseInterval) {
+    this.delegate = delegate;
+    this.delayCloseInterval = delayCloseInterval;
+  }
+
+  public synchronized T get() {
+    freshTime = System.currentTimeMillis();
+    if (t == null) {
+      t = delegate.get();
+    }
+    executor.schedule(this::close, delayCloseInterval, TimeUnit.MILLISECONDS);

Review Comment:
   I think it will schedule too much runnable for each get call.
   
   It should move to L50-L52, only scheduling the close when a new T is fetched.



##########
client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -586,6 +587,7 @@ public boolean unregisterShuffle(int shuffleId) {
 
   @Override
   public void stop() {
+    super.stop();

Review Comment:
   So this is a bug that didn't exposed earlier?



##########
common/src/main/java/org/apache/uniffle/common/util/ExpireCloseableSupplier.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.util;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExpireCloseableSupplier<T extends Closeable> implements 
Supplier<T>, Serializable {

Review Comment:
   Let's call it `ExpiringCloseableSupplier`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to