This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 132715785 Reuse zkclient in BestPossibleExternalViewVerifier and fix 
resource leak (#2180)
132715785 is described below

commit 132715785e90803ad8991da491f4621db1668fb8
Author: Qi (Quincy) Qu <[email protected]>
AuthorDate: Mon Aug 1 10:49:12 2022 -0700

    Reuse zkclient in BestPossibleExternalViewVerifier and fix resource leak 
(#2180)
    
    Reuse zkclient in BestPossibleExternalViewVerifier and fix resource leak
    
    Reuse zkclient in verifier and improve resource closure logic to avoid 
resource leak.
---
 .../rebalancer/waged/ReadOnlyWagedRebalancer.java  |  5 ---
 .../helix/manager/zk/ZkBucketDataAccessor.java     | 39 ++++++++++---------
 .../BestPossibleExternalViewVerifier.java          | 44 ++++++++++------------
 .../ClusterVerifiers/ZkHelixClusterVerifier.java   |  7 +++-
 .../apache/helix/rest/server/ServerContext.java    |  2 +-
 5 files changed, 46 insertions(+), 51 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
index d1075d47b..e94148e99 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
@@ -88,10 +88,5 @@ public class ReadOnlyWagedRebalancer extends WagedRebalancer 
{
       _bestPossibleAssignment = bestPossibleAssignment;
       return true;
     }
-
-    @Override
-    // BucketDataAccessor will be reused, won't be closed here.
-    public void close() {
-    }
   }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
index 521e3d720..c20b3f902 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -41,12 +41,13 @@ import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.serializer.ByteArraySerializer;
+import 
org.apache.helix.zookeeper.datamodel.serializer.ZNRecordJacksonSerializer;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
 import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
 import org.apache.helix.zookeeper.util.GZipCompressionUtil;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
-import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
 import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 import org.slf4j.Logger;
@@ -75,10 +76,11 @@ public class ZkBucketDataAccessor implements 
BucketDataAccessor, AutoCloseable {
 
   private final int _bucketSize;
   private final long _versionTTLms;
-  private ZkSerializer _zkSerializer;
-  private RealmAwareZkClient _zkClient;
-  private ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
-  private Map<String, ScheduledFuture> _gcTaskFutureMap = new HashMap<>();
+  private final ZkSerializer _zkSerializer;
+  private final RealmAwareZkClient _zkClient;
+  private final ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
+  private final Map<String, ScheduledFuture> _gcTaskFutureMap = new 
HashMap<>();
+  private boolean _usesExternalZkClient = false;
 
   /**
    * Constructor that allows a custom bucket size.
@@ -87,25 +89,21 @@ public class ZkBucketDataAccessor implements 
BucketDataAccessor, AutoCloseable {
    * @param versionTTLms in ms
    */
   public ZkBucketDataAccessor(String zkAddr, int bucketSize, long 
versionTTLms) {
-    _zkClient = createRealmAwareZkClient(zkAddr);
-    _zkClient.setZkSerializer(new ZkSerializer() {
-      @Override
-      public byte[] serialize(Object data) throws ZkMarshallingError {
-        if (data instanceof byte[]) {
-          return (byte[]) data;
-        }
-        throw new HelixException("ZkBucketDataAccesor only supports a byte 
array as an argument!");
-      }
+    this(createRealmAwareZkClient(zkAddr), bucketSize, versionTTLms, false);
+  }
 
-      @Override
-      public Object deserialize(byte[] data) throws ZkMarshallingError {
-        return data;
-      }
-    });
+  public ZkBucketDataAccessor(RealmAwareZkClient zkClient) {
+    this(zkClient, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL, true);
+  }
+
+  private ZkBucketDataAccessor(RealmAwareZkClient zkClient, int bucketSize, 
long versionTTLms,
+      boolean usesExternalZkClient) {
+    _zkClient = zkClient;
     _zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
     _zkSerializer = new ZNRecordJacksonSerializer();
     _bucketSize = bucketSize;
     _versionTTLms = versionTTLms;
+    _usesExternalZkClient = usesExternalZkClient;
   }
 
   /**
@@ -135,6 +133,7 @@ public class ZkBucketDataAccessor implements 
BucketDataAccessor, AutoCloseable {
       zkClient = DedicatedZkClientFactory.getInstance()
           .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
     }
+    zkClient.setZkSerializer(new ByteArraySerializer());
     return zkClient;
   }
 
@@ -258,7 +257,7 @@ public class ZkBucketDataAccessor implements 
BucketDataAccessor, AutoCloseable {
 
   @Override
   public void disconnect() {
-    if (!_zkClient.isClosed()) {
+    if (!_usesExternalZkClient && _zkClient != null && !_zkClient.isClosed()) {
       _zkClient.close();
     }
   }
diff --git 
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
 
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 3b133a138..1997bea06 100644
--- 
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ 
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -34,6 +34,7 @@ import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.controller.common.PartitionStateMap;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer;
 import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
@@ -91,6 +92,7 @@ public class BestPossibleExternalViewVerifier extends 
ZkHelixClusterVerifier {
     _resources = resources;
     _expectLiveInstances = expectLiveInstances;
     _dataProvider = new ResourceControllerDataProvider();
+    // _zkClient should be closed with BestPossibleExternalViewVerifier
   }
 
   /**
@@ -105,7 +107,7 @@ public class BestPossibleExternalViewVerifier extends 
ZkHelixClusterVerifier {
   public BestPossibleExternalViewVerifier(RealmAwareZkClient zkClient, String 
clusterName,
       Set<String> resources, Map<String, Map<String, String>> errStates,
       Set<String> expectLiveInstances) {
-    this(zkClient, clusterName, resources, errStates, expectLiveInstances, 0);
+    this(zkClient, clusterName, errStates, resources, expectLiveInstances, 0, 
true);
   }
 
   @Deprecated
@@ -114,11 +116,7 @@ public class BestPossibleExternalViewVerifier extends 
ZkHelixClusterVerifier {
       Set<String> expectLiveInstances, int waitTillVerify) {
     // usesExternalZkClient = true because ZkClient is given by the caller
     // at close(), we will not close this ZkClient because it might be being 
used elsewhere
-    super(zkClient, clusterName, true, waitTillVerify);
-    _errStates = errStates;
-    _resources = resources;
-    _expectLiveInstances = expectLiveInstances;
-    _dataProvider = new ResourceControllerDataProvider();
+    this(zkClient, clusterName, errStates, resources, expectLiveInstances, 
waitTillVerify, true);
   }
 
   private BestPossibleExternalViewVerifier(RealmAwareZkClient zkClient, String 
clusterName,
@@ -144,7 +142,6 @@ public class BestPossibleExternalViewVerifier extends 
ZkHelixClusterVerifier {
     private Set<String> _resources;
     private Set<String> _expectLiveInstances;
     private RealmAwareZkClient _zkClient;
-    private boolean _usesExternalZkClient = false; // false by default
 
     public Builder(String clusterName) {
       _clusterName = clusterName;
@@ -155,11 +152,12 @@ public class BestPossibleExternalViewVerifier extends 
ZkHelixClusterVerifier {
         throw new IllegalArgumentException("Cluster name is missing!");
       }
 
+      // _usesExternalZkClient == true
       if (_zkClient != null) {
-        return new BestPossibleExternalViewVerifier(_zkClient, _clusterName, 
_resources, _errStates,
-            _expectLiveInstances, _waitPeriodTillVerify);
+        return new BestPossibleExternalViewVerifier(_zkClient, _clusterName, 
_errStates, _resources,
+            _expectLiveInstances, _waitPeriodTillVerify, true);
       }
-
+      // _usesExternalZkClient == false
       if (_realmAwareZkConnectionConfig == null || _realmAwareZkClientConfig 
== null) {
         // For backward-compatibility
         return new BestPossibleExternalViewVerifier(_zkAddress, _clusterName, 
_resources,
@@ -170,7 +168,7 @@ public class BestPossibleExternalViewVerifier extends 
ZkHelixClusterVerifier {
       return new BestPossibleExternalViewVerifier(
           createZkClient(RealmAwareZkClient.RealmMode.SINGLE_REALM, 
_realmAwareZkConnectionConfig,
               _realmAwareZkClientConfig, _zkAddress), _clusterName, 
_errStates, _resources,
-          _expectLiveInstances, _waitPeriodTillVerify, _usesExternalZkClient);
+          _expectLiveInstances, _waitPeriodTillVerify, false);
     }
 
     public String getClusterName() {
@@ -210,7 +208,6 @@ public class BestPossibleExternalViewVerifier extends 
ZkHelixClusterVerifier {
 
     public Builder setZkClient(RealmAwareZkClient zkClient) {
       _zkClient = zkClient;
-      _usesExternalZkClient = true; // Set the flag since external ZkClient is 
used
       return this;
     }
   }
@@ -435,18 +432,15 @@ public class BestPossibleExternalViewVerifier extends 
ZkHelixClusterVerifier {
 
     RebalanceUtil.runStage(event, new CurrentStateComputationStage());
     // Note the readOnlyWagedRebalancer is just for one time usage
-    DryrunWagedRebalancer dryrunWagedRebalancer =
-        new DryrunWagedRebalancer(_zkClient.getServers(), 
cache.getClusterName(),
-            cache.getClusterConfig().getGlobalRebalancePreference());
-    event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(), 
dryrunWagedRebalancer);
-    try {
+
+    try (ZkBucketDataAccessor zkBucketDataAccessor = new 
ZkBucketDataAccessor(_zkClient);
+        DryrunWagedRebalancer dryrunWagedRebalancer = new 
DryrunWagedRebalancer(zkBucketDataAccessor,
+            cache.getClusterName(), 
cache.getClusterConfig().getGlobalRebalancePreference())) {
+      event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(), 
dryrunWagedRebalancer);
       RebalanceUtil.runStage(event, new BestPossibleStateCalcStage());
-    } finally {
-      dryrunWagedRebalancer.close();
     }
 
-    BestPossibleStateOutput output = 
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
-    return output;
+    return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
   }
 
   @Override
@@ -456,15 +450,17 @@ public class BestPossibleExternalViewVerifier extends 
ZkHelixClusterVerifier {
        + (_resources != null ? Arrays.toString(_resources.toArray()) : "") + 
"])";
   }
 
+  // TODO: to clean up, finalize is deprecated in Java 9
   @Override
   public void finalize() {
     close();
+    super.finalize();
   }
 
-  private class DryrunWagedRebalancer extends 
org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer {
-    public DryrunWagedRebalancer(String metadataStoreAddress, String 
clusterName,
+  private static class DryrunWagedRebalancer extends ReadOnlyWagedRebalancer 
implements AutoCloseable {
+    public DryrunWagedRebalancer(ZkBucketDataAccessor zkBucketDataAccessor, 
String clusterName,
         Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
-      super(new ZkBucketDataAccessor(metadataStoreAddress), clusterName, 
preferences);
+      super(zkBucketDataAccessor, clusterName, preferences);
     }
 
     @Override
diff --git 
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
 
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
index 623a91ecd..11071d3ee 100644
--- 
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
+++ 
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
@@ -44,7 +44,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public abstract class ZkHelixClusterVerifier
-    implements IZkChildListener, IZkDataListener, HelixClusterVerifier {
+    implements IZkChildListener, IZkDataListener, HelixClusterVerifier, 
AutoCloseable {
   private static Logger LOG = 
LoggerFactory.getLogger(ZkHelixClusterVerifier.class);
   protected static int DEFAULT_TIMEOUT = 300 * 1000;
   protected static int DEFAULT_PERIOD = 500;
@@ -229,6 +229,11 @@ public abstract class ZkHelixClusterVerifier
     return verifyByPolling(DEFAULT_TIMEOUT, DEFAULT_PERIOD);
   }
 
+  /**
+   * Implement close() for {@link AutoCloseable} and {@link 
HelixClusterVerifier}.
+   * Non-external resources should be closed in this method to prevent 
resource leak.
+   */
+  @Override
   public void close() {
     if (_zkClient != null && !_usesExternalZkClient) {
       _zkClient.close();
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java 
b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
index a1cfb6695..27921f6e2 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
@@ -285,7 +285,7 @@ public class ServerContext implements IZkDataListener, 
IZkChildListener, IZkStat
     if (_zkBucketDataAccessor == null) {
       synchronized (this) {
         if (_zkBucketDataAccessor == null) {
-          _zkBucketDataAccessor = new ZkBucketDataAccessor(_zkAddr);
+          _zkBucketDataAccessor = new 
ZkBucketDataAccessor(getByteArrayRealmAwareZkClient());
         }
       }
     }

Reply via email to